Skip to content

Commit

Permalink
accord repair integration
Browse files Browse the repository at this point in the history
  • Loading branch information
bdeggleston committed Mar 29, 2024
1 parent 548924f commit c6881e1
Show file tree
Hide file tree
Showing 23 changed files with 647 additions and 251 deletions.
Expand Up @@ -246,7 +246,7 @@ public void finished()
checkNotNull(minVersion, "Unable to determine minimum cluster version");
IAccordService accordService = AccordService.instance();
if (session.streamOperation().requiresBarrierTransaction()
&& cfs.metadata().isAccordEnabled()
&& cfs.metadata().requiresAccordSupport()
&& CassandraVersion.CASSANDRA_5_0.compareTo(minVersion) >= 0)
accordService.postStreamReceivingBarrier(cfs, ranges);

Expand Down
66 changes: 0 additions & 66 deletions src/java/org/apache/cassandra/repair/AbstractRepairJob.java

This file was deleted.

Expand Up @@ -79,7 +79,7 @@ private List<RepairSession> submitRepairSessions(TimeUUID parentSession,
options.optimiseStreams(),
options.repairPaxos(),
options.paxosOnly(),
options.accordRepair(),
options.accordOnly(),
executor,
validationScheduler,
cfnames);
Expand Down
Expand Up @@ -40,16 +40,20 @@
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.asymmetric.DifferenceHolder;
import org.apache.cassandra.repair.asymmetric.HostDifferences;
import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter;
import org.apache.cassandra.repair.asymmetric.ReduceHelper;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.repair.state.JobState;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.accord.repair.AccordRepair;
import org.apache.cassandra.service.consensus.migration.ConsensusMigrationRepairResult;
import org.apache.cassandra.service.paxos.cleanup.PaxosCleanup;
import org.apache.cassandra.streaming.PreviewKind;
Expand All @@ -59,6 +63,7 @@
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTrees;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
Expand All @@ -70,11 +75,14 @@
/**
* RepairJob runs repair on given ColumnFamily.
*/
public class CassandraRepairJob extends AbstractRepairJob
public class RepairJob extends AsyncFuture<RepairResult> implements Runnable
{
private static final Logger logger = LoggerFactory.getLogger(CassandraRepairJob.class);
private static final Logger logger = LoggerFactory.getLogger(RepairJob.class);

protected final Keyspace ks;
protected final ColumnFamilyStore cfs;
private final SharedContext ctx;
public final JobState state;
private final RepairJobDesc desc;
private final RepairSession session;
private final RepairParallelism parallelismDegree;
Expand All @@ -91,14 +99,24 @@ public class CassandraRepairJob extends AbstractRepairJob
* @param session RepairSession that this RepairJob belongs
* @param columnFamily name of the ColumnFamily to repair
*/
public CassandraRepairJob(RepairSession session, String columnFamily)
public RepairJob(RepairSession session, String columnFamily)
{
super(session, columnFamily);
this.ctx = session.ctx;
this.session = session;
this.taskExecutor = session.taskExecutor;
this.parallelismDegree = session.parallelismDegree;
this.desc = new RepairJobDesc(session.state.parentRepairSession, session.getId(), session.state.keyspace, columnFamily, session.state.commonRange.ranges);
this.ks = Keyspace.open(desc.keyspace);
this.cfs = ks.getColumnFamilyStore(columnFamily);
this.state = new JobState(ctx.clock(), desc, session.state.commonRange.endpoints);

TableMetadata metadata = this.cfs.metadata();
if (session.paxosOnly && !metadata.supportsPaxosOperations())
throw new IllegalArgumentException(String.format("Cannot run paxos only repair on %s.%s, which isn't configured for paxos operations", cfs.keyspace.getName(), cfs.name));

if (session.accordOnly && !metadata.requiresAccordSupport())
throw new IllegalArgumentException(String.format("Cannot run accord only repair on %s.%s, which isn't configured for accord operations", cfs.keyspace.getName(), cfs.name));

}

public long getNowInSeconds()
Expand All @@ -114,25 +132,39 @@ public long getNowInSeconds()
}
}

@Override
public void run()
{
state.phase.start();
cfs.metric.repairsStarted.inc();
runRepair();
}

/**
* Runs repair job.
* <p/>
* This sets up necessary task and runs them on given {@code taskExecutor}.
* After submitting all tasks, waits until validation with replica completes.
*/
@Override
protected void runRepair()
{
List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.state.commonRange.endpoints);
allEndpoints.add(ctx.broadcastAddressAndPort());

TableMetadata metadata = cfs.metadata();
Future<Void> paxosRepair;
Epoch repairStartingEpoch = ClusterMetadata.current().epoch;
boolean doPaxosRepair = paxosRepairEnabled() && (((useV2() || isMetadataKeyspace()) && session.repairPaxos) || session.paxosOnly);

Preconditions.checkArgument(!session.paxosOnly || !session.accordOnly);
boolean doPaxosRepair = paxosRepairEnabled()
&& (((useV2() || isMetadataKeyspace()) && session.repairPaxos) || session.paxosOnly)
&& metadata.supportsPaxosOperations()
&& !session.accordOnly;
boolean doAccordRepair = metadata.requiresAccordSupport() && !session.paxosOnly;

if (doPaxosRepair)
{
logger.info("{} {}.{} starting paxos repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
TableMetadata metadata = Schema.instance.getTableMetadata(desc.keyspace, desc.columnFamily);
paxosRepair = PaxosCleanup.cleanup(allEndpoints, metadata, desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor);
}
else
Expand All @@ -141,14 +173,15 @@ protected void runRepair()
paxosRepair = ImmediateFuture.success(null);
}


if (session.paxosOnly)
{
paxosRepair.addCallback(new FutureCallback<>()
{
public void onSuccess(Void ignored)
{
logger.info("{} {}.{} paxos repair completed", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
trySuccess(new RepairResult(desc, Collections.emptyList(), ConsensusMigrationRepairResult.fromCassandraRepair(repairStartingEpoch, false)));
trySuccess(new RepairResult(desc, Collections.emptyList(), ConsensusMigrationRepairResult.fromPaxosOnlyRepair(repairStartingEpoch, session.excludedDeadNodes)));
}

/**
Expand All @@ -163,19 +196,59 @@ public void onFailure(Throwable t)
return;
}

Future<Void> accordRepair;
if (doAccordRepair)
{
accordRepair = paxosRepair.flatMap(unused -> {
logger.info("{} {}.{} starting accord repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
IPartitioner partitioner = desc.ranges.iterator().next().left.getPartitioner();
AccordRepair repair = new AccordRepair(null, partitioner, desc.keyspace, desc.ranges);
return repair.repair(taskExecutor);
}, taskExecutor);
}
else
{
accordRepair = paxosRepair.flatMap(unused -> {
logger.info("{} {}.{} not running accord repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
return ImmediateFuture.success(null);
});
}

if (session.accordOnly)
{
accordRepair.addCallback(new FutureCallback<Void>()
{
public void onSuccess(Void ignored)
{
logger.info("{} {}.{} accord repair completed", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
trySuccess(new RepairResult(desc, Collections.emptyList(), ConsensusMigrationRepairResult.fromAccordOnlyRepair(repairStartingEpoch, session.excludedDeadNodes)));
}

/**
* Snapshot, validation and sync failures are all handled here
*/
public void onFailure(Throwable t)
{
logger.warn("{} {}.{} accord repair failed", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
tryFailure(t);
}
}, taskExecutor);
return;
}

// Create a snapshot at all nodes unless we're using pure parallel repairs
final Future<?> allSnapshotTasks;
if (parallelismDegree != RepairParallelism.PARALLEL)
{
if (session.isIncremental)
{
// consistent repair does it's own "snapshotting"
allSnapshotTasks = paxosRepair.map(input -> allEndpoints);
allSnapshotTasks = accordRepair.map(input -> allEndpoints);
}
else
{
// Request snapshot to all replica
allSnapshotTasks = paxosRepair.flatMap(input -> {
allSnapshotTasks = accordRepair.flatMap(input -> {
List<Future<InetAddressAndPort>> snapshotTasks = new ArrayList<>(allEndpoints.size());
state.phase.snapshotsSubmitted();
for (InetAddressAndPort endpoint : allEndpoints)
Expand All @@ -198,7 +271,7 @@ public void onFailure(Throwable t)

// Run validations and the creation of sync tasks in the scheduler, so it can limit the number of Merkle trees
// that there are in memory at once. When all validations complete, submit sync tasks out of the scheduler.
Future<List<SyncStat>> syncResults = session.validationScheduler.schedule(() -> createSyncTasks(paxosRepair, allSnapshotTasks, allEndpoints), taskExecutor)
Future<List<SyncStat>> syncResults = session.validationScheduler.schedule(() -> createSyncTasks(accordRepair, allSnapshotTasks, allEndpoints), taskExecutor)
.flatMap(this::executeTasks, taskExecutor);

// When all sync complete, set the final result
Expand All @@ -215,7 +288,7 @@ public void onSuccess(List<SyncStat> stats)
}
cfs.metric.repairsCompleted.inc();
logger.info("Completing repair with excludedDeadNodes {}", session.excludedDeadNodes);
trySuccess(new RepairResult(desc, stats, ConsensusMigrationRepairResult.fromCassandraRepair(repairStartingEpoch, doPaxosRepair && !session.excludedDeadNodes)));
trySuccess(new RepairResult(desc, stats, ConsensusMigrationRepairResult.fromRepair(repairStartingEpoch, doPaxosRepair, doAccordRepair, session.excludedDeadNodes)));
}

/**
Expand All @@ -240,7 +313,7 @@ public void onFailure(Throwable t)
}, taskExecutor);
}

private Future<List<SyncTask>> createSyncTasks(Future<Void> paxosRepair, Future<?> allSnapshotTasks, List<InetAddressAndPort> allEndpoints)
private Future<List<SyncTask>> createSyncTasks(Future<Void> accordRepair, Future<?> allSnapshotTasks, List<InetAddressAndPort> allEndpoints)
{
Future<List<TreeResponse>> treeResponses;
if (allSnapshotTasks != null)
Expand All @@ -256,7 +329,7 @@ private Future<List<SyncTask>> createSyncTasks(Future<Void> paxosRepair, Future<
else
{
// If not sequential, just send validation request to all replica
treeResponses = paxosRepair.flatMap(input -> sendValidationRequest(allEndpoints));
treeResponses = accordRepair.flatMap(input -> sendValidationRequest(allEndpoints));
}

treeResponses = treeResponses.map(a -> {
Expand Down

0 comments on commit c6881e1

Please sign in to comment.