Skip to content

Commit

Permalink
CEP-15 (C*) Integrate accord with repair
Browse files Browse the repository at this point in the history
Patch by Blake Eggleston; Reviewed by Ariel Weisberg and David Capwell for CASSANDRA-19472
  • Loading branch information
bdeggleston committed Apr 24, 2024
1 parent a5b8c06 commit 9becefa
Show file tree
Hide file tree
Showing 39 changed files with 1,212 additions and 369 deletions.
4 changes: 2 additions & 2 deletions .gitmodules
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
url = https://github.com/apache/cassandra-accord.git
branch = trunk
url = ../cassandra-accord.git
branch = accord-IR
Expand Up @@ -35,6 +35,7 @@
/** A class that extracts system properties for the cassandra node it runs within. */
public enum CassandraRelevantProperties
{
ACCORD_AGENT_CLASS("cassandra.test.accord.agent"),
ACCORD_REPAIR_RANGE_STEP_UPDATE_INTERVAL("cassandra.accord.repair.range_step_update_interval", "100"),
ACQUIRE_RETRY_SECONDS("cassandra.acquire_retry_seconds", "60"),
ACQUIRE_SLEEP_MS("cassandra.acquire_sleep_ms", "1000"),
Expand Down
Expand Up @@ -246,7 +246,7 @@ public void finished()
checkNotNull(minVersion, "Unable to determine minimum cluster version");
IAccordService accordService = AccordService.instance();
if (session.streamOperation().requiresBarrierTransaction()
&& cfs.metadata().isAccordEnabled()
&& cfs.metadata().requiresAccordSupport()
&& CassandraVersion.CASSANDRA_5_0.compareTo(minVersion) >= 0)
accordService.postStreamReceivingBarrier(cfs, ranges);

Expand Down
12 changes: 6 additions & 6 deletions src/java/org/apache/cassandra/metrics/TableMetrics.java
Expand Up @@ -186,9 +186,9 @@ public class TableMetrics
/** Latency for locally run key migrations **/
public final LatencyMetrics keyMigration;
/** Latency for range migrations run by locally coordinated Accord repairs **/
public final LatencyMetrics rangeMigration;
public final TableMeter rangeMigrationUnexpectedFailures;
public final TableMeter rangeMigrationDependencyLimitFailures;
public final LatencyMetrics accordRepair;
public final TableMeter accordRepairUnexpectedFailures;
public final TableMeter accordRepairDependencyLimitFailures;
/** percent of the data that is repaired */
public final Gauge<Double> percentRepaired;
/** Reports the size of sstables in repaired, unrepaired, and any ongoing repair buckets */
Expand Down Expand Up @@ -813,9 +813,9 @@ public Long getValue()
casPropose = createLatencyMetrics("CasPropose", cfs.keyspace.metric.casPropose);
casCommit = createLatencyMetrics("CasCommit", cfs.keyspace.metric.casCommit);
keyMigration = createLatencyMetrics("KeyMigration", cfs.keyspace.metric.keyMigration, GLOBAL_KEY_MIGRATION_LATENCY);
rangeMigration = createLatencyMetrics("RangeMigration", cfs.keyspace.metric.rangeMigration, GLOBAL_RANGE_MIGRATION_LATENCY);
rangeMigrationUnexpectedFailures = createTableMeter("RangeMigrationUnexpectedFailures", cfs.keyspace.metric.rangeMigrationUnexpectedFailures);
rangeMigrationDependencyLimitFailures = createTableMeter("RangeMigrationDependencyLimitFaiures", cfs.keyspace.metric.rangeMigrationDependencyLimitFailures);
accordRepair = createLatencyMetrics("AccordRepair", cfs.keyspace.metric.rangeMigration, GLOBAL_RANGE_MIGRATION_LATENCY);
accordRepairUnexpectedFailures = createTableMeter("AccordRepairUnexpectedFailures", cfs.keyspace.metric.rangeMigrationUnexpectedFailures);
accordRepairDependencyLimitFailures = createTableMeter("AccordRepairDependencyLimitFaiures", cfs.keyspace.metric.rangeMigrationDependencyLimitFailures);

repairsStarted = createTableCounter("RepairJobsStarted");
repairsCompleted = createTableCounter("RepairJobsCompleted");
Expand Down
66 changes: 0 additions & 66 deletions src/java/org/apache/cassandra/repair/AbstractRepairJob.java

This file was deleted.

3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/repair/AbstractRepairTask.java
Expand Up @@ -79,7 +79,8 @@ private List<RepairSession> submitRepairSessions(TimeUUID parentSession,
options.optimiseStreams(),
options.repairPaxos(),
options.paxosOnly(),
options.accordRepair(),
options.accordOnly(),
options.isConsensusMigration(),
executor,
validationScheduler,
cfnames);
Expand Down
18 changes: 18 additions & 0 deletions src/java/org/apache/cassandra/repair/RepairCoordinator.java
Expand Up @@ -66,6 +66,7 @@
import org.apache.cassandra.repair.state.CoordinatorState;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
import org.apache.cassandra.service.ClientState;
Expand Down Expand Up @@ -282,12 +283,29 @@ public void run()
}
}

private void validate(RepairOption options)
{
if (options.paxosOnly() && options.accordOnly())
throw new IllegalArgumentException("Cannot specify a repair as both paxos only and accord only");

for (ColumnFamilyStore cfs : columnFamilies)
{
TableMetadata metadata = cfs.metadata();
if (options.paxosOnly() && !metadata.supportsPaxosOperations())
throw new IllegalArgumentException(String.format("Cannot run paxos only repair on %s.%s, which isn't configured for paxos operations", cfs.keyspace.getName(), cfs.name));

if (options.accordOnly() && !metadata.requiresAccordSupport())
throw new IllegalArgumentException(String.format("Cannot run accord only repair on %s.%s, which isn't configured for accord operations", cfs.keyspace.getName(), cfs.name));
}
}

private void runMayThrow() throws Throwable
{
state.phase.setup();
ctx.repair().recordRepairStatus(state.cmd, ParentRepairStatus.IN_PROGRESS, ImmutableList.of());

populateColumnFamilies();
validate(state.options);

this.traceState = maybeCreateTraceState(columnFamilies);
notifyStarting();
Expand Down

0 comments on commit 9becefa

Please sign in to comment.