Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bypass cassandra streaming #837

Open
wants to merge 1 commit into
base: 3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 49 additions & 21 deletions priam/src/main/java/com/netflix/priam/PriamServer.java
Expand Up @@ -22,6 +22,7 @@
import com.netflix.priam.backup.BackupService;
import com.netflix.priam.backupv2.BackupV2Service;
import com.netflix.priam.cluster.management.ClusterManagementService;
import com.netflix.priam.config.IBackupRestoreConfig;
import com.netflix.priam.config.IConfiguration;
import com.netflix.priam.config.PriamConfigurationPersister;
import com.netflix.priam.defaultimpl.ICassandraProcess;
Expand All @@ -42,6 +43,7 @@
public class PriamServer implements IService {
private final PriamScheduler scheduler;
private final IConfiguration config;
private final IBackupRestoreConfig backupRestoreConfig;
private final InstanceIdentity instanceIdentity;
private final Sleeper sleeper;
private final ICassandraProcess cassProcess;
Expand All @@ -56,6 +58,7 @@ public class PriamServer implements IService {
@Inject
public PriamServer(
IConfiguration config,
IBackupRestoreConfig backupRestoreConfig,
PriamScheduler scheduler,
InstanceIdentity id,
Sleeper sleeper,
Expand All @@ -66,6 +69,7 @@ public PriamServer(
CassandraTunerService cassandraTunerService,
ClusterManagementService clusterManagementService) {
this.config = config;
this.backupRestoreConfig = backupRestoreConfig;
this.scheduler = scheduler;
this.instanceIdentity = id;
this.sleeper = sleeper;
Expand Down Expand Up @@ -111,25 +115,48 @@ public void scheduleService() throws Exception {
UpdateSecuritySettings.getTimer(instanceIdentity));
}

// Set up cassandra tuning.
cassandraTunerService.scheduleService();
// Set up the background configuration dumping thread
scheduleTask(
scheduler,
PriamConfigurationPersister.class,
PriamConfigurationPersister.getTimer(config));

boolean shouldStartCassandra = false;

// Determine if we need to restore from backup else start cassandra.
if (restoreContext.isRestoreEnabled()) {
// Determine if we need to restore from backup.
if (restoreContext.isRestoreEnabled(config, instanceIdentity.getInstanceInfo())) {
restoreContext.restore();
} else { // no restores needed
logger.info("No restore needed, task not scheduled");
if (!config.doesCassandraStartManually()) cassProcess.start(true); // Start cassandra.
else
logger.info(
"config.doesCassandraStartManually() is set to True, hence Cassandra needs to be started manually ...");
// Start cassandra only if restore is successful.
shouldStartCassandra = true;
} else {
if (instanceIdentity.isReplace()
&& backupRestoreConfig.enableBypassCassandraStreaming()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does Priam determine whether Cassandra hasn't successfully bootstrapped? I'm looked for an existing check, but I didn't see one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TokenRetrieverUtils.inferTokenOwnerFromGossip is used to fetch the instance identity. That method should tell correctly if Cassandra had already bootstrapped successfully.

logger.info("Trying to download data instead of streaming from Cassandra.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add "from backup", as in "Trying to download data instead of streaming from Cassandra"

try {
restoreContext.restore();
instanceIdentity.setReplacedIp("");
} catch (Exception e) {
logger.error(
"Error while trying to rebuild the node from backup. Maybe backup not available or disk full? Trying normal path of cassandra streaming");
// Clean the data folder.
SystemUtils.cleanupDir(config.getDataFileLocation(), null);
} finally {
shouldStartCassandra = true;
}
} else {
// no restores needed
logger.info("No restore needed, task not scheduled");
shouldStartCassandra = true;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we exit this block with shouldStartCassandra being false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the original requested restore was a failure (that is when Priam starts in restore mode during weekly restore refresh). `` // Start cassandra only if restore is successful.
shouldStartCassandra = true;```

But yes with the recent refactoring of restore we will throw exception there and thus we don't need that variable. good catch.


/*
* Run the delayed task (after 10 seconds) to Monitor Cassandra
* If Restore option is chosen, then Running Cassandra instance is stopped
* Hence waiting for Cassandra to stop
*/
// Tune Cassandra.
cassandraTunerService.scheduleService();

// Start Cassandra.
if (shouldStartCassandra) startCassandra();

// Run the delayed task (after 10 seconds) to Monitor Cassandra
scheduler.addTaskWithDelay(
CassandraMonitor.JOBNAME,
CassandraMonitor.class,
Expand All @@ -139,19 +166,20 @@ public void scheduleService() throws Exception {
// Set up management services like flush, compactions etc.
clusterManagementService.scheduleService();

// Set up the background configuration dumping thread
scheduleTask(
scheduler,
PriamConfigurationPersister.class,
PriamConfigurationPersister.getTimer(config));

// Set up V1 Snapshot Service
backupService.scheduleService();

// Set up V2 Snapshot Service
backupV2Service.scheduleService();
}

private void startCassandra() throws IOException {
if (!config.doesCassandraStartManually()) cassProcess.start(true); // Start cassandra.
else
logger.info(
"config.doesCassandraStartManually() is set to True, hence Cassandra needs to be started manually ...");
}

@Override
public void updateServicePre() throws Exception {}

Expand Down
Expand Up @@ -97,4 +97,18 @@ default int getBackupVerificationSLOInHours() {
default boolean enableV2Restore() {
return false;
}

/**
* Build the instance from backups by using restore process in case of an instance replacements.
* Note that we prefer this when data size is HUGE. C* streaming is super slow and for instances
* with big data size can lead to C* streaming for multiple days. Note that this is a little bit
* dangerous as you "will" some of the writes accepted by old instance but not uploaded to
arunagrawal84 marked this conversation as resolved.
Show resolved Hide resolved
* backup file system. Also we do not plan to run local repair on the replaced instance, so data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that not running repair is acceptable for a first iteration. Hypothetically though, how would we do it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we should be deferring that task to the repair service. Where that repair service sits, how it gets executed is a different conversation though.

* will be stale. We hope that repair will take care of the inconsistency.
*
* @return use restore for replacements (bypassing cassandra streaming), if backup is available.
*/
default boolean enableBypassCassandraStreaming() {
return true;
}
}
Expand Up @@ -345,6 +345,7 @@ public String getReplacedIp() {
public void setReplacedIp(String replacedIp) {
this.replacedIp = replacedIp;
if (!replacedIp.isEmpty()) this.isReplace = true;
else this.isReplace = false;
}

private static boolean isInstanceDummy(PriamInstance instance) {
Expand Down
32 changes: 6 additions & 26 deletions priam/src/main/java/com/netflix/priam/restore/AbstractRestore.java
Expand Up @@ -26,8 +26,6 @@
import com.netflix.priam.defaultimpl.ICassandraProcess;
import com.netflix.priam.health.InstanceState;
import com.netflix.priam.identity.InstanceIdentity;
import com.netflix.priam.identity.config.InstanceInfo;
import com.netflix.priam.scheduler.Task;
import com.netflix.priam.utils.*;
import java.io.File;
import java.io.IOException;
Expand All @@ -38,19 +36,16 @@
import java.util.*;
import java.util.concurrent.Future;
import javax.inject.Named;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A means to perform a restore. This class contains the following characteristics: - It is agnostic
* to the source type of the restore, this is determine by the injected IBackupFileSystem. - This
* class can be scheduled, i.e. it is a "Task". - When this class is executed, it uses its own
* thread pool to execute the restores.
* to the source type of the restore, this is determine by the injected IBackupFileSystem. When this
* class is executed, it uses its own thread pool to restore the restores.
*/
public abstract class AbstractRestore extends Task implements IRestoreStrategy {
public abstract class AbstractRestore implements IRestoreStrategy {
private static final Logger logger = LoggerFactory.getLogger(AbstractRestore.class);
private static final String JOBNAME = "AbstractRestore";
private static final String SYSTEM_KEYSPACE = "system";
Expand All @@ -65,6 +60,7 @@ public abstract class AbstractRestore extends Task implements IRestoreStrategy {
private final InstanceState instanceState;
private final MetaData metaData;
private final IPostRestoreHook postRestoreHook;
private final IConfiguration config;

@Inject
@Named("v1")
Expand All @@ -88,7 +84,7 @@ public AbstractRestore(
MetaData metaData,
InstanceState instanceState,
IPostRestoreHook postRestoreHook) {
super(config);
this.config = config;
this.fs = fs;
this.sleeper = sleeper;
this.pathProvider = pathProvider;
Expand All @@ -103,14 +99,6 @@ public AbstractRestore(
this.postRestoreHook = postRestoreHook;
}

public static final boolean isRestoreEnabled(IConfiguration conf, InstanceInfo instanceInfo) {
boolean isRestoreMode = StringUtils.isNotBlank(conf.getRestoreSnapshot());
boolean isBackedupRac =
(CollectionUtils.isEmpty(conf.getBackupRacs())
|| conf.getBackupRacs().contains(instanceInfo.getRac()));
return (isRestoreMode && isBackedupRac);
}

public void setRestoreConfiguration(String restoreIncludeCFList, String restoreExcludeCFList) {
backupRestoreUtil.setFilters(restoreIncludeCFList, restoreExcludeCFList);
}
Expand Down Expand Up @@ -171,9 +159,7 @@ private void stopCassProcess() throws IOException {
}

@Override
public void execute() throws Exception {
if (!isRestoreEnabled(config, instanceIdentity.getInstanceInfo())) return;

public void restore() throws Exception {
logger.info("Starting restore for {}", config.getRestoreSnapshot());
final DateUtil.DateRange dateRange = new DateUtil.DateRange(config.getRestoreSnapshot());
new RetryableCallable<Void>() {
Expand Down Expand Up @@ -280,12 +266,6 @@ public void restore(DateUtil.DateRange dateRange) throws Exception {
// Declare restore as finished.
instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now());
instanceState.setRestoreStatus(Status.FINISHED);

// Start cassandra if restore is successful.
if (!config.doesCassandraStartManually()) cassProcess.start(true);
else
logger.info(
"config.doesCassandraStartManually() is set to True, hence Cassandra needs to be started manually ...");
} catch (Exception e) {
instanceState.setRestoreStatus(Status.FAILED);
instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now());
Expand Down
Expand Up @@ -27,8 +27,6 @@
import com.netflix.priam.defaultimpl.ICassandraProcess;
import com.netflix.priam.health.InstanceState;
import com.netflix.priam.identity.InstanceIdentity;
import com.netflix.priam.scheduler.SimpleTimer;
import com.netflix.priam.scheduler.TaskTimer;
import com.netflix.priam.utils.Sleeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,9 +76,4 @@ public AwsCrossAccountCryptographyRestoreStrategy(
instanceState,
postRestoreHook);
}

/** @return a timer used by the scheduler to determine when "this" should be run. */
public static TaskTimer getTimer() {
return new SimpleTimer(JOBNAME);
}
}
Expand Up @@ -27,8 +27,6 @@
import com.netflix.priam.defaultimpl.ICassandraProcess;
import com.netflix.priam.health.InstanceState;
import com.netflix.priam.identity.InstanceIdentity;
import com.netflix.priam.scheduler.SimpleTimer;
import com.netflix.priam.scheduler.TaskTimer;
import com.netflix.priam.utils.Sleeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -73,11 +71,4 @@ public EncryptedRestoreStrategy(
instanceState,
postRestoreHook);
}

/*
* @return a timer used by the scheduler to determine when "this" should be run.
*/
public static TaskTimer getTimer() {
return new SimpleTimer(JOBNAME);
}
}
Expand Up @@ -27,8 +27,6 @@
import com.netflix.priam.defaultimpl.ICassandraProcess;
import com.netflix.priam.health.InstanceState;
import com.netflix.priam.identity.InstanceIdentity;
import com.netflix.priam.scheduler.SimpleTimer;
import com.netflix.priam.scheduler.TaskTimer;
import com.netflix.priam.utils.Sleeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -70,9 +68,4 @@ public GoogleCryptographyRestoreStrategy(
instanceState,
postRestoreHook);
}

/** @return a timer used by the scheduler to determine when "this" should be run. */
public static TaskTimer getTimer() {
return new SimpleTimer(JOBNAME);
}
}
Expand Up @@ -17,5 +17,7 @@
* A means to restore C* files from various source types (e.g. Google, AWS bucket whose objects are not owned by the current IAM role), and encrypted / non-encrypted data.
*/
public interface IRestoreStrategy {
// public void restore(Date startTime, Date endTime) throws Exception;
void restore() throws Exception;

String getName();
}
6 changes: 0 additions & 6 deletions priam/src/main/java/com/netflix/priam/restore/Restore.java
Expand Up @@ -27,8 +27,6 @@
import com.netflix.priam.defaultimpl.ICassandraProcess;
import com.netflix.priam.health.InstanceState;
import com.netflix.priam.identity.InstanceIdentity;
import com.netflix.priam.scheduler.SimpleTimer;
import com.netflix.priam.scheduler.TaskTimer;
import com.netflix.priam.utils.Sleeper;
import java.io.File;
import java.nio.file.Path;
Expand Down Expand Up @@ -76,10 +74,6 @@ protected final Future<Path> downloadFile(
Paths.get(path.getRemotePath()), Paths.get(restoreLocation.getAbsolutePath()), 5);
}

public static TaskTimer getTimer() {
return new SimpleTimer(JOBNAME);
}

@Override
public String getName() {
return JOBNAME;
Expand Down