Skip to content

Commit

Permalink
CASSANDRA-19042: Repair fuzz tests fail with paxos_variant: v2
Browse files Browse the repository at this point in the history
  • Loading branch information
dcapwell committed Mar 25, 2024
1 parent 35d6e6c commit 8c906d9
Show file tree
Hide file tree
Showing 24 changed files with 588 additions and 310 deletions.
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/gms/Gossiper.java
Expand Up @@ -1328,6 +1328,7 @@ public int compareEndpointStartup(InetAddressAndPort addr1, InetAddressAndPort a
return ep1.getHeartBeatState().getGeneration() - ep2.getHeartBeatState().getGeneration();
}

@Override
public void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> remoteEpStateMap)
{
for (Entry<InetAddressAndPort, EndpointState> entry : remoteEpStateMap.entrySet())
Expand Down Expand Up @@ -1624,6 +1625,7 @@ private static Iterable<Entry<InetAddressAndPort, EndpointState>> order(Map<Inet
}

@VisibleForTesting
@Override
public void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap)
{
checkProperThreadForStateMutation();
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/gms/IGossiper.java
Expand Up @@ -18,6 +18,7 @@

package org.apache.cassandra.gms;

import java.util.Map;
import javax.annotation.Nullable;

import org.apache.cassandra.locator.InetAddressAndPort;
Expand All @@ -30,6 +31,8 @@ public interface IGossiper

@Nullable
EndpointState getEndpointStateForEndpoint(InetAddressAndPort ep);
void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> remoteEpStateMap);
void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap);
@Nullable
default CassandraVersion getReleaseVersion(InetAddressAndPort ep)
{
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/net/MessageDelivery.java
Expand Up @@ -18,6 +18,7 @@

package org.apache.cassandra.net;

import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.concurrent.Future;

Expand All @@ -28,4 +29,8 @@ public interface MessageDelivery
public <REQ, RSP> void sendWithCallback(Message<REQ> message, InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType specifyConnection);
public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ> message, InetAddressAndPort to);
public <V> void respond(V response, Message<?> message);
public default void respondWithFailure(RequestFailureReason reason, Message<?> message)
{
send(Message.failureResponse(message.id(), message.expiresAtNanos(), reason), message.respondTo());
}
}
5 changes: 0 additions & 5 deletions src/java/org/apache/cassandra/net/MessagingService.java
Expand Up @@ -449,11 +449,6 @@ public <V> void respond(V response, Message<?> message)
send(message.responseWith(response), message.respondTo());
}

public <V> void respondWithFailure(RequestFailureReason reason, Message<?> message)
{
send(Message.failureResponse(message.id(), message.expiresAtNanos(), reason), message.respondTo());
}

public void send(Message message, InetAddressAndPort to, ConnectionType specifyConnection)
{
if (logger.isTraceEnabled())
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/repair/RepairJob.java
Expand Up @@ -129,7 +129,7 @@ public void run()
{
logger.info("{} {}.{} starting paxos repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
TableMetadata metadata = Schema.instance.getTableMetadata(desc.keyspace, desc.columnFamily);
paxosRepair = PaxosCleanup.cleanup(allEndpoints, metadata, desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor);
paxosRepair = PaxosCleanup.cleanup(ctx, allEndpoints, metadata, desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor);
}
else
{
Expand Down
54 changes: 54 additions & 0 deletions src/java/org/apache/cassandra/repair/SharedContext.java
Expand Up @@ -37,6 +37,8 @@
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
Expand All @@ -57,6 +59,8 @@ public interface SharedContext
ExecutorFactory executorFactory();
MBeanWrapper mbean();
ScheduledExecutorPlus optionalTasks();
ScheduledExecutorPlus nonPeriodicTasks();
ScheduledExecutorPlus scheduledTasks();

MessageDelivery messaging();
default SharedContext withMessaging(MessageDelivery messaging)
Expand All @@ -77,6 +81,8 @@ public MessageDelivery messaging()
IValidationManager validationManager();
TableRepairManager repairManager(ColumnFamilyStore store);
StreamExecutor streamExecutor();
PendingRangeCalculatorService pendingRangeCalculator();
PaxosRepairState paxosRepairState();

class Global implements SharedContext
{
Expand Down Expand Up @@ -118,6 +124,18 @@ public ScheduledExecutorPlus optionalTasks()
return ScheduledExecutors.optionalTasks;
}

@Override
public ScheduledExecutorPlus nonPeriodicTasks()
{
return ScheduledExecutors.nonPeriodicTasks;
}

@Override
public ScheduledExecutorPlus scheduledTasks()
{
return ScheduledExecutors.scheduledTasks;
}

@Override
public MessageDelivery messaging()
{
Expand Down Expand Up @@ -171,6 +189,18 @@ public StreamExecutor streamExecutor()
{
return StreamPlan::execute;
}

@Override
public PendingRangeCalculatorService pendingRangeCalculator()
{
return PendingRangeCalculatorService.instance;
}

@Override
public PaxosRepairState paxosRepairState()
{
return PaxosRepairState.instance();
}
}

class ForwardingSharedContext implements SharedContext
Expand Down Expand Up @@ -223,6 +253,18 @@ public ScheduledExecutorPlus optionalTasks()
return delegate().optionalTasks();
}

@Override
public ScheduledExecutorPlus nonPeriodicTasks()
{
return delegate().nonPeriodicTasks();
}

@Override
public ScheduledExecutorPlus scheduledTasks()
{
return delegate().scheduledTasks();
}

@Override
public MessageDelivery messaging()
{
Expand Down Expand Up @@ -276,5 +318,17 @@ public StreamExecutor streamExecutor()
{
return delegate().streamExecutor();
}

@Override
public PendingRangeCalculatorService pendingRangeCalculator()
{
return delegate().pendingRangeCalculator();
}

@Override
public PaxosRepairState paxosRepairState()
{
return delegate().paxosRepairState();
}
}
}
Expand Up @@ -1136,7 +1136,7 @@ public Future<?> repairPaxosForTopologyChange(String ksName, Collection<Range<To
range, table.keyspace, table.name, pending, PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE.getKey()));

}
Future<Void> future = PaxosCleanup.cleanup(endpoints, table, Collections.singleton(range), false, repairCommandExecutor());
Future<Void> future = PaxosCleanup.cleanup(ctx, endpoints, table, Collections.singleton(range), false, repairCommandExecutor());
futures.add(future);
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/java/org/apache/cassandra/service/StorageService.java
Expand Up @@ -180,6 +180,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.RepairCoordinator;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.schema.KeyspaceMetadata;
Expand All @@ -199,7 +200,7 @@
import org.apache.cassandra.service.paxos.PaxosRepair;
import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupLocalCoordinator;
import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs;
import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
import org.apache.cassandra.service.snapshot.SnapshotManager;
import org.apache.cassandra.service.snapshot.TableSnapshot;
import org.apache.cassandra.streaming.StreamManager;
Expand Down Expand Up @@ -4863,7 +4864,7 @@ public Future<?> autoRepairPaxos(TableId tableId)
return ImmediateFuture.success(null);

List<Range<Token>> ranges = getLocalAndPendingRanges(table.keyspace);
PaxosCleanupLocalCoordinator coordinator = PaxosCleanupLocalCoordinator.createForAutoRepair(tableId, ranges);
PaxosCleanupLocalCoordinator coordinator = PaxosCleanupLocalCoordinator.createForAutoRepair(SharedContext.Global.instance, tableId, ranges);
ScheduledExecutors.optionalTasks.submit(coordinator::start);
return coordinator;
}
Expand Down Expand Up @@ -7349,7 +7350,7 @@ public void evictHungRepairs()
public void clearPaxosRepairs()
{
logger.info("StorageService#clearPaxosRepairs called via jmx");
PaxosTableRepairs.clearRepairs();
PaxosRepairState.instance().clearRepairs();
}

public void setSkipPaxosRepairCompatibilityCheck(boolean v)
Expand Down
12 changes: 9 additions & 3 deletions src/java/org/apache/cassandra/service/paxos/Paxos.java
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -85,9 +86,9 @@
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.FailureRecordingCallback.AsMap;
import org.apache.cassandra.service.paxos.Commit.Proposal;
import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
import org.apache.cassandra.service.reads.DataResolver;
import org.apache.cassandra.service.reads.repair.NoopReadRepair;
import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.triggers.TriggerExecutor;
import org.apache.cassandra.utils.CassandraVersion;
Expand Down Expand Up @@ -385,13 +386,18 @@ public EndpointsForToken readCandidates()
}

static Participants get(TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus)
{
return get(table, token, consistencyForConsensus, FailureDetector.isReplicaAlive);
}

static Participants get(TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus, Predicate<Replica> isReplicaAlive)
{
Keyspace keyspace = Keyspace.open(table.keyspace);
ReplicaLayout.ForTokenWrite all = forTokenWriteLiveAndDown(keyspace, token);
ReplicaLayout.ForTokenWrite electorate = consistencyForConsensus.isDatacenterLocal()
? all.filter(InOurDc.replicas()) : all;

EndpointsForToken live = all.all().filter(FailureDetector.isReplicaAlive);
EndpointsForToken live = all.all().filter(isReplicaAlive);

return new Participants(keyspace, consistencyForConsensus, all, electorate, live);
}
Expand Down Expand Up @@ -1255,6 +1261,6 @@ static boolean isOldParticipant(Replica replica)

public static void evictHungRepairs()
{
PaxosTableRepairs.evictHungRepairs();
PaxosRepairState.instance().evictHungRepairs();
}
}
43 changes: 10 additions & 33 deletions src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
Expand Up @@ -42,10 +42,7 @@
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.gms.IGossiper;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
Expand All @@ -55,6 +52,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallbackWithFailure;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
Expand Down Expand Up @@ -659,45 +657,24 @@ static boolean validateVersionCompatibility(CassandraVersion version)
return (version.major == 4 && version.minor > 0) || version.major > 4;
}

static String getPeerVersion(InetAddressAndPort peer)
static boolean validatePeerCompatibility(IGossiper gossiper, Replica peer)
{
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(peer);
if (epState == null)
return null;

VersionedValue value = epState.getApplicationState(ApplicationState.RELEASE_VERSION);
if (value == null)
return null;

try
{
return value.value;
}
catch (IllegalArgumentException e)
{
return null;
}
}

static boolean validatePeerCompatibility(Replica peer)
{
String versionString = getPeerVersion(peer.endpoint());
CassandraVersion version = versionString != null ? new CassandraVersion(versionString) : null;
CassandraVersion version = gossiper.getReleaseVersion(peer.endpoint());
boolean result = validateVersionCompatibility(version);
if (!result)
logger.info("PaxosRepair isn't supported by {} on version {}", peer, versionString);
logger.info("PaxosRepair isn't supported by {} on version {}", peer, version);
return result;
}

static boolean validatePeerCompatibility(TableMetadata table, Range<Token> range)
static boolean validatePeerCompatibility(SharedContext ctx, TableMetadata table, Range<Token> range)
{
Participants participants = Participants.get(table, range.right, ConsistencyLevel.SERIAL);
return Iterables.all(participants.all, PaxosRepair::validatePeerCompatibility);
Participants participants = Participants.get(table, range.right, ConsistencyLevel.SERIAL, r -> ctx.failureDetector().isAlive(r.endpoint()));
return Iterables.all(participants.all, r -> validatePeerCompatibility(ctx.gossiper(), r));
}

public static boolean validatePeerCompatibility(TableMetadata table, Collection<Range<Token>> ranges)
public static boolean validatePeerCompatibility(SharedContext ctx, TableMetadata table, Collection<Range<Token>> ranges)
{
return Iterables.all(ranges, range -> validatePeerCompatibility(table, range));
return Iterables.all(ranges, range -> validatePeerCompatibility(ctx, table, range));
}

public static void shutdownAndWait(long timeout, TimeUnit units) throws InterruptedException, TimeoutException
Expand Down

0 comments on commit 8c906d9

Please sign in to comment.