Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debug trace #1062

Open
wants to merge 9 commits into
base: 3.11
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ allprojects {
compile 'com.sun.jersey:jersey-json:1.19.4'
compile 'com.sun.jersey:jersey-bundle:1.19.4'
compile 'com.sun.jersey.contribs:jersey-guice:1.19.4'
compile 'com.google.guava:guava:21.0'
//compile 'com.google.guava:guava:21.0'
compile 'com.google.code.findbugs:jsr305:3.0.2'

// AWS Services
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import javax.inject.Inject;
import javax.inject.Provider;
import org.apache.commons.collections4.iterators.FilterIterator;
import org.apache.commons.collections4.iterators.TransformIterator;
Expand All @@ -63,6 +62,7 @@ public abstract class AbstractFileSystem implements IBackupFileSystem {
private final IConfiguration configuration;
protected final BackupMetrics backupMetrics;
private final Set<Path> tasksQueued;

private final ListeningExecutorService fileUploadExecutor;
private final ThreadPoolExecutor fileDownloadExecutor;
private final BackupNotificationMgr backupNotificationMgr;
Expand All @@ -71,7 +71,6 @@ public abstract class AbstractFileSystem implements IBackupFileSystem {
// file system. This is to ensure that we don't make too many API calls to remote file system.
private final Cache<Path, Boolean> objectCache;

@Inject
public AbstractFileSystem(
IConfiguration configuration,
BackupMetrics backupMetrics,
Expand All @@ -89,28 +88,44 @@ public AbstractFileSystem(
files for "sync" feature which might compete with backups for scheduling.
Also, we may want to have different TIMEOUT for each kind of operation (upload/download) based on our file system choices.
*/

logger.info("Initializing AbstractFileSystem ...");
// Get the current thread's stack trace
StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();

// Loop through each stack trace element
for (StackTraceElement element : stackTraceElements) {
logger.info(element.toString());
}

BlockingQueue<Runnable> uploadQueue =
new ArrayBlockingQueue<>(configuration.getBackupQueueSize());
PolledMeter.using(backupMetrics.getRegistry())
.withName(backupMetrics.uploadQueueSize)
.monitorSize(uploadQueue);
this.fileUploadExecutor =
MoreExecutors.listeningDecorator(
new BlockingSubmitThreadPoolExecutor(
configuration.getBackupThreads(),
uploadQueue,
configuration.getUploadTimeout()));

BlockingSubmitThreadPoolExecutor uploadExecutor = new BlockingSubmitThreadPoolExecutor(
configuration.getBackupThreads(),
uploadQueue,
configuration.getUploadTimeout());
logger.info("uploadExecutor: {}", uploadExecutor);

this.fileUploadExecutor = MoreExecutors.listeningDecorator(uploadExecutor);

logger.info("fileUploadExecutor: {}", this.fileUploadExecutor);

BlockingQueue<Runnable> downloadQueue =
new ArrayBlockingQueue<>(configuration.getDownloadQueueSize());
PolledMeter.using(backupMetrics.getRegistry())
.withName(backupMetrics.downloadQueueSize)
.monitorSize(downloadQueue);
this.fileDownloadExecutor =
new BlockingSubmitThreadPoolExecutor(
configuration.getRestoreThreads(),
downloadQueue,
configuration.getDownloadTimeout());
BlockingSubmitThreadPoolExecutor downloadExecutor = new BlockingSubmitThreadPoolExecutor(
configuration.getRestoreThreads(),
downloadQueue,
configuration.getDownloadTimeout());
logger.info("downloadExecutor: {}", downloadExecutor);

this.fileDownloadExecutor = downloadExecutor;
}

@Override
Expand Down Expand Up @@ -158,20 +173,35 @@ protected abstract void downloadFileImpl(final AbstractBackupPath path, String s
public ListenableFuture<AbstractBackupPath> uploadAndDelete(
final AbstractBackupPath path, Instant target, boolean async)
throws RejectedExecutionException, BackupRestoreException {
logger.info(String.format("uploadAndDelete path: %s, async: %s", Paths.get(path.getBackupFile().getAbsolutePath()), async));
if (async) {

return fileUploadExecutor.submit(
() -> uploadAndDeleteInternal(path, target, 10 /* retries */));
() -> {
// logger.info("uploadAndDeleteCallableV2 path: {}", Paths.get(path.getBackupFile().getAbsolutePath()));
try {
return uploadAndDeleteInternal(path, target, 10);
} catch (BackupRestoreException e) {
logger.info("uploadAndDeleteCallable exception path: {}", Paths.get(path.getBackupFile().getAbsolutePath()), e);
throw e;
}
});

} else {
return Futures.immediateFuture(uploadAndDeleteInternal(path, target, 10 /* retries */));
return Futures.immediateFuture(uploadAndDeleteInternal(path, target, 10));
}
}


@VisibleForTesting
public AbstractBackupPath uploadAndDeleteInternal(
final AbstractBackupPath path, Instant target, int retry)
throws RejectedExecutionException, BackupRestoreException {
Path localPath = Paths.get(path.getBackupFile().getAbsolutePath());
File localFile = localPath.toFile();

// logger.info(String.format("uploadAndDeleteInternal: %s", localPath));

Preconditions.checkArgument(
localFile.exists(), String.format("Can't upload nonexistent %s", localPath));
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public BackupFileSystemContext(
this.encryptedFs = encryptedFs;
}

public BackupFileSystemContext(
@Named("backup") IBackupFileSystem fs) {
this.fs = fs;
}

public IBackupFileSystem getFileStrategy(IConfiguration config) {

if (!config.isEncryptBackupEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@
import com.netflix.priam.compress.CompressionType;
import com.netflix.priam.config.BackupsToCompress;
import com.netflix.priam.config.IConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Set;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Provider;

public class BackupHelperImpl implements BackupHelper {

private static final Logger logger = LoggerFactory.getLogger(BackupHelperImpl.class);
private static final String COMPRESSION_SUFFIX = "-CompressionInfo.db";
private static final String DATA_SUFFIX = "-Data.db";
private final Provider<AbstractBackupPath> pathFactory;
Expand Down Expand Up @@ -55,8 +59,11 @@ public ImmutableList<ListenableFuture<AbstractBackupPath>> uploadAndDeleteAllFil
throws Exception {
final ImmutableList.Builder<ListenableFuture<AbstractBackupPath>> futures =
ImmutableList.builder();

for (AbstractBackupPath bp : getBackupPaths(parent, type)) {
futures.add(fs.uploadAndDelete(bp, target, async));
logger.info(String.format("After AbstractBackupPath: %s, localPath: %s", bp, Paths.get(bp.getBackupFile().getAbsolutePath())));

}
return futures.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.netflix.priam.backup;

import com.google.common.util.concurrent.ListenableFuture;

import java.io.FileNotFoundException;
import java.nio.file.Path;
import java.time.Instant;
Expand Down Expand Up @@ -71,18 +72,18 @@ default void uploadAndDelete(AbstractBackupPath path, boolean async)
* guaranteed as we try to avoid lock on read/write of the files-in-progress. Once uploaded,
* files are deleted. Uploads are retried 10 times.
*
* @param path AbstractBackupPath to be used to send backup notifications only.
* @param path AbstractBackupPath to be used to send backup notifications only.
* @param target The target time of completion of all files in the upload.
* @param async boolean to determine whether the call should block or return immediately and
* upload asynchronously
* @param async boolean to determine whether the call should block or return immediately and
* upload asynchronously
* @return The future of the async job to monitor the progress of the job. This will be null if
* file was de-duped for upload.
* @throws BackupRestoreException in case of failure to upload for any reason including file not
* readable or remote file system errors.
* @throws FileNotFoundException If a file as denoted by localPath is not available or is a
* directory.
* file was de-duped for upload.
* @throws BackupRestoreException in case of failure to upload for any reason including file not
* readable or remote file system errors.
* @throws FileNotFoundException If a file as denoted by localPath is not available or is a
* directory.
* @throws RejectedExecutionException if the queue is full and TIMEOUT is reached while trying
* to add the work to the queue.
* to add the work to the queue.
*/
ListenableFuture<AbstractBackupPath> uploadAndDelete(
final AbstractBackupPath path, Instant target, boolean async)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@
* This service will run on CRON as specified by {@link
* IBackupRestoreConfig#getSnapshotMetaServiceCronExpression()} The intent of this service is to run
* a full snapshot on Cassandra, get the list of the SSTables on disk and then create a
* manifest.json file which will encapsulate the list of the files i.e. capture filesystem at a
* manifest.json file which will e
* ncapsulate the list of the files i.e. capture filesystem at a
* moment in time. This manifest.json file will ensure the true filesystem status is exposed (for
* external entities) and will be used in future for Priam Backup Version 2 where a file is not
* uploaded to backup file system unless SSTable has been modified. This will lead to huge reduction
Expand Down Expand Up @@ -287,15 +288,16 @@ private void uploadAllFiles(final File backupDir) throws Exception {
// We do not want to wait for completion and we just want to add them to queue. This
// is to ensure that next run happens on time.
AbstractBackupPath.BackupFileType type = AbstractBackupPath.BackupFileType.SST_V2;

backupHelper
.uploadAndDeleteAllFiles(snapshotDirectory, type, target, true)
.uploadAndDeleteAllFiles(snapshotDirectory, type, target, config.enableAsyncSnapshot())
.forEach(future -> addCallback(future, snapshotDirectory));

// Next, upload secondary indexes
type = AbstractBackupPath.BackupFileType.SECONDARY_INDEX_V2;
ImmutableList<ListenableFuture<AbstractBackupPath>> futures;
for (File subDir : getSecondaryIndexDirectories(snapshotDirectory)) {
futures = backupHelper.uploadAndDeleteAllFiles(subDir, type, target, true);
futures = backupHelper.uploadAndDeleteAllFiles(subDir, type, target, config.enableAsyncSnapshot());
if (futures.isEmpty()) {
deleteIfEmpty(subDir);
}
Expand Down Expand Up @@ -436,7 +438,9 @@ private static void addCallback(ListenableFuture<AbstractBackupPath> future, Fil
FutureCallback<AbstractBackupPath> callback =
new FutureCallback<AbstractBackupPath>() {
@Override
public void onSuccess(AbstractBackupPath result) {}
public void onSuccess(AbstractBackupPath result) {
logger.info("Successfully uploaded contents of snapshotDir {}", snapshotDir);
}

@Override
public void onFailure(Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,19 @@
import com.netflix.spectator.api.Registry;
import org.quartz.SchedulerFactory;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PriamGuiceModule extends AbstractModule {
protected static final Logger logger = LoggerFactory.getLogger(PriamGuiceModule.class);

@Override
protected void configure() {
bind(SchedulerFactory.class).to(StdSchedulerFactory.class).asEagerSingleton();

bind(IBackupFileSystem.class).annotatedWith(Names.named("backup")).to(S3FileSystem.class);
logger.info("[chengw]: {}", Thread.currentThread().getStackTrace());

bind(IBackupFileSystem.class)
.annotatedWith(Names.named("encryptedbackup"))
.to(S3EncryptedFileSystem.class);
Expand Down