Skip to content

Commit

Permalink
wire up consensus migration option into accord repair
Browse files Browse the repository at this point in the history
  • Loading branch information
bdeggleston committed Apr 10, 2024
1 parent 8d55d09 commit 2ad4c30
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 9 deletions.
Expand Up @@ -80,6 +80,7 @@ private List<RepairSession> submitRepairSessions(TimeUUID parentSession,
options.repairPaxos(),
options.paxosOnly(),
options.accordOnly(),
options.isConsensusMigration(),
executor,
validationScheduler,
cfnames);
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/repair/RepairJob.java
Expand Up @@ -199,7 +199,7 @@ public void onFailure(Throwable t)
accordRepair = paxosRepair.flatMap(unused -> {
logger.info("{} {}.{} starting accord repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
IPartitioner partitioner = metadata.partitioner;
AccordRepair repair = new AccordRepair(ctx, cfs, partitioner, desc.keyspace, desc.ranges, false, allEndpoints);
AccordRepair repair = new AccordRepair(ctx, cfs, partitioner, desc.keyspace, desc.ranges, session.isConsensusMigration, allEndpoints);
return repair.repair(taskExecutor);
}, taskExecutor);
}
Expand Down
4 changes: 3 additions & 1 deletion src/java/org/apache/cassandra/repair/RepairSession.java
Expand Up @@ -124,6 +124,7 @@ public class RepairSession extends AsyncFuture<RepairSessionResult> implements I
public final boolean paxosOnly;

public final boolean accordOnly;
public final boolean isConsensusMigration;

public final boolean excludedDeadNodes;

Expand Down Expand Up @@ -169,13 +170,14 @@ public RepairSession(SharedContext ctx,
boolean optimiseStreams,
boolean repairPaxos,
boolean paxosOnly,
boolean accordOnly,
boolean accordOnly, boolean isConsensusMigration,
String... cfnames)
{
this.ctx = ctx;
this.validationScheduler = validationScheduler;
this.repairPaxos = repairPaxos;
this.paxosOnly = paxosOnly;
this.isConsensusMigration = isConsensusMigration;
assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
this.state = new SessionState(ctx.clock(), parentRepairSession, keyspace, cfnames, commonRange);
this.parallelismDegree = parallelismDegree;
Expand Down
11 changes: 9 additions & 2 deletions src/java/org/apache/cassandra/repair/messages/RepairOption.java
Expand Up @@ -225,7 +225,7 @@ public static RepairOption parse(Map<String, String> options, IPartitioner parti

boolean asymmetricSyncing = Boolean.parseBoolean(options.get(OPTIMISE_STREAMS_KEY));

RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, pullRepair, force, previewKind, asymmetricSyncing, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly, accordOnly);
RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, pullRepair, force, previewKind, asymmetricSyncing, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly, accordOnly, false);

// data centers
String dataCentersStr = options.get(DATACENTERS_KEY);
Expand Down Expand Up @@ -308,20 +308,22 @@ else if (ranges.isEmpty())
private final boolean paxosOnly;

private final boolean accordOnly;
private final boolean isConsensusMigration;

private final Collection<String> columnFamilies = new HashSet<>();
private final Collection<String> dataCenters = new HashSet<>();
private final Collection<String> hosts = new HashSet<>();
private final Collection<Range<Token>> ranges = new HashSet<>();

public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean pullRepair, boolean forceRepair, PreviewKind previewKind, boolean optimiseStreams, boolean ignoreUnreplicatedKeyspaces, boolean repairPaxos, boolean paxosOnly, boolean accordOnly)
public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean pullRepair, boolean forceRepair, PreviewKind previewKind, boolean optimiseStreams, boolean ignoreUnreplicatedKeyspaces, boolean repairPaxos, boolean paxosOnly, boolean accordOnly, boolean isConsensusMigration)
{

this.parallelism = parallelism;
this.primaryRange = primaryRange;
this.incremental = incremental;
this.trace = trace;
this.jobThreads = jobThreads;
this.isConsensusMigration = isConsensusMigration;
this.ranges.addAll(ranges);
this.pullRepair = pullRepair;
this.forceRepair = forceRepair;
Expand Down Expand Up @@ -446,6 +448,11 @@ public boolean accordOnly()
return accordOnly;
}

public boolean isConsensusMigration()
{
return isConsensusMigration;
}

@Override
public String toString()
{
Expand Down
Expand Up @@ -455,6 +455,7 @@ public RepairSession submitRepairSession(TimeUUID parentRepairSession,
boolean repairPaxos,
boolean paxosOnly,
boolean accordOnly,
boolean isConsensusMigration,
ExecutorPlus executor,
Scheduler validationScheduler,
String... cfnames)
Expand All @@ -472,7 +473,7 @@ public RepairSession submitRepairSession(TimeUUID parentRepairSession,
range, excludedDeadNodes, keyspace,
parallelismDegree, isIncremental, pullRepair,
previewKind, optimiseStreams, repairPaxos, paxosOnly,
accordOnly, cfnames);
accordOnly, isConsensusMigration, cfnames);
repairs.getIfPresent(parentRepairSession).register(session.state);

sessions.put(session.getId(), session);
Expand Down
Expand Up @@ -309,7 +309,7 @@ private static RepairOption getRepairOption(Collection<TableMigrationState> tabl
boolean repairPaxos = !accordRepair;
boolean paxosOnly = false;
boolean accordOnly = false;
RepairOption repairOption = new RepairOption(RepairParallelism.PARALLEL, primaryRange, incremental, trace, numJobThreads, intersectingRanges, pullRepair, forceRepair, PreviewKind.NONE, optimiseStreams, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly, accordOnly);
RepairOption repairOption = new RepairOption(RepairParallelism.PARALLEL, primaryRange, incremental, trace, numJobThreads, intersectingRanges, pullRepair, forceRepair, PreviewKind.NONE, optimiseStreams, ignoreUnreplicatedKeyspaces, repairPaxos, paxosOnly, accordOnly, true);
tables.forEach(table -> repairOption.getColumnFamilies().add(table.tableName));
return repairOption;
}
Expand Down
Expand Up @@ -97,7 +97,7 @@ private static void invokeRepair(String keyspaceName, boolean repairPaxos, boole
{
Collection<Range<Token>> ranges = rangesSupplier.call();
// no need to wait for completion, as we track all task submissions and message exchanges, and ensure they finish before continuing to next action
StorageService.instance.repair(keyspaceName, new RepairOption(RepairParallelism.SEQUENTIAL, isPrimaryRangeOnly, false, false, 1, ranges, false, force, PreviewKind.NONE, false, true, repairPaxos, repairOnlyPaxos, false), singletonList((tag, event) -> {
StorageService.instance.repair(keyspaceName, new RepairOption(RepairParallelism.SEQUENTIAL, isPrimaryRangeOnly, false, false, 1, ranges, false, force, PreviewKind.NONE, false, true, repairPaxos, repairOnlyPaxos, false, false), singletonList((tag, event) -> {
if (event.getType() == ProgressEventType.COMPLETE)
listener.run();
}));
Expand Down
2 changes: 1 addition & 1 deletion test/unit/org/apache/cassandra/repair/RepairJobTest.java
Expand Up @@ -129,7 +129,7 @@ public MeasureableRepairSession(TimeUUID parentRepairSession, CommonRange common
{
super(SharedContext.Global.instance, new Scheduler.NoopScheduler(),
parentRepairSession, commonRange, excludedDeadNodes, keyspace, parallelismDegree, isIncremental, pullRepair,
previewKind, optimiseStreams, repairPaxos, paxosOnly, false, cfnames);
previewKind, optimiseStreams, repairPaxos, paxosOnly, false, false, cfnames);
}

@Override
Expand Down
Expand Up @@ -67,7 +67,7 @@ public void testConviction() throws Exception
new CommonRange(endpoints, Collections.emptySet(), Arrays.asList(repairRange)),
false, "Keyspace1", RepairParallelism.SEQUENTIAL,
false, false, PreviewKind.NONE, false,
false, false, false, "Standard1");
false, false, false, false, "Standard1");

// perform convict
session.convict(remote, Double.MAX_VALUE);
Expand Down

0 comments on commit 2ad4c30

Please sign in to comment.