Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-19042: Repair fuzz tests fail with paxos_variant: v2 #3100

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/gms/Gossiper.java
Expand Up @@ -1328,6 +1328,7 @@ public int compareEndpointStartup(InetAddressAndPort addr1, InetAddressAndPort a
return ep1.getHeartBeatState().getGeneration() - ep2.getHeartBeatState().getGeneration();
}

@Override
public void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> remoteEpStateMap)
{
for (Entry<InetAddressAndPort, EndpointState> entry : remoteEpStateMap.entrySet())
Expand Down Expand Up @@ -1624,6 +1625,7 @@ private static Iterable<Entry<InetAddressAndPort, EndpointState>> order(Map<Inet
}

@VisibleForTesting
@Override
public void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap)
{
checkProperThreadForStateMutation();
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/gms/IGossiper.java
Expand Up @@ -18,6 +18,7 @@

package org.apache.cassandra.gms;

import java.util.Map;
import javax.annotation.Nullable;

import org.apache.cassandra.locator.InetAddressAndPort;
Expand All @@ -30,6 +31,8 @@ public interface IGossiper

@Nullable
EndpointState getEndpointStateForEndpoint(InetAddressAndPort ep);
void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> remoteEpStateMap);
void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap);
Comment on lines +34 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two can benefit from some javadoc

@Nullable
default CassandraVersion getReleaseVersion(InetAddressAndPort ep)
{
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/net/MessageDelivery.java
Expand Up @@ -18,6 +18,7 @@

package org.apache.cassandra.net;

import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.concurrent.Future;

Expand All @@ -28,4 +29,8 @@ public interface MessageDelivery
public <REQ, RSP> void sendWithCallback(Message<REQ> message, InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType specifyConnection);
public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ> message, InetAddressAndPort to);
public <V> void respond(V response, Message<?> message);
public default void respondWithFailure(RequestFailureReason reason, Message<?> message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove the public modifiers from this interface, please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be consistent with the rest of the file, I left public there...

{
send(Message.failureResponse(message.id(), message.expiresAtNanos(), reason), message.respondTo());
}
}
5 changes: 0 additions & 5 deletions src/java/org/apache/cassandra/net/MessagingService.java
Expand Up @@ -449,11 +449,6 @@ public <V> void respond(V response, Message<?> message)
send(message.responseWith(response), message.respondTo());
}

public <V> void respondWithFailure(RequestFailureReason reason, Message<?> message)
{
send(Message.failureResponse(message.id(), message.expiresAtNanos(), reason), message.respondTo());
}

public void send(Message message, InetAddressAndPort to, ConnectionType specifyConnection)
{
if (logger.isTraceEnabled())
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/repair/RepairJob.java
Expand Up @@ -129,7 +129,7 @@ public void run()
{
logger.info("{} {}.{} starting paxos repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily);
TableMetadata metadata = Schema.instance.getTableMetadata(desc.keyspace, desc.columnFamily);
paxosRepair = PaxosCleanup.cleanup(allEndpoints, metadata, desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor);
paxosRepair = PaxosCleanup.cleanup(ctx, allEndpoints, metadata, desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor);
}
else
{
Expand Down
54 changes: 54 additions & 0 deletions src/java/org/apache/cassandra/repair/SharedContext.java
Expand Up @@ -37,6 +37,8 @@
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
Expand All @@ -57,6 +59,8 @@ public interface SharedContext
ExecutorFactory executorFactory();
MBeanWrapper mbean();
ScheduledExecutorPlus optionalTasks();
ScheduledExecutorPlus nonPeriodicTasks();
ScheduledExecutorPlus scheduledTasks();

MessageDelivery messaging();
default SharedContext withMessaging(MessageDelivery messaging)
Expand All @@ -77,6 +81,8 @@ public MessageDelivery messaging()
IValidationManager validationManager();
TableRepairManager repairManager(ColumnFamilyStore store);
StreamExecutor streamExecutor();
PendingRangeCalculatorService pendingRangeCalculator();
PaxosRepairState paxosRepairState();

class Global implements SharedContext
{
Expand Down Expand Up @@ -118,6 +124,18 @@ public ScheduledExecutorPlus optionalTasks()
return ScheduledExecutors.optionalTasks;
}

@Override
public ScheduledExecutorPlus nonPeriodicTasks()
{
return ScheduledExecutors.nonPeriodicTasks;
}

@Override
public ScheduledExecutorPlus scheduledTasks()
{
return ScheduledExecutors.scheduledTasks;
}

@Override
public MessageDelivery messaging()
{
Expand Down Expand Up @@ -171,6 +189,18 @@ public StreamExecutor streamExecutor()
{
return StreamPlan::execute;
}

@Override
public PendingRangeCalculatorService pendingRangeCalculator()
{
return PendingRangeCalculatorService.instance;
}

@Override
public PaxosRepairState paxosRepairState()
{
return PaxosRepairState.instance();
}
}

class ForwardingSharedContext implements SharedContext
Expand Down Expand Up @@ -223,6 +253,18 @@ public ScheduledExecutorPlus optionalTasks()
return delegate().optionalTasks();
}

@Override
public ScheduledExecutorPlus nonPeriodicTasks()
{
return delegate().nonPeriodicTasks();
}

@Override
public ScheduledExecutorPlus scheduledTasks()
{
return delegate().scheduledTasks();
}

@Override
public MessageDelivery messaging()
{
Expand Down Expand Up @@ -276,5 +318,17 @@ public StreamExecutor streamExecutor()
{
return delegate().streamExecutor();
}

@Override
public PendingRangeCalculatorService pendingRangeCalculator()
{
return delegate().pendingRangeCalculator();
}

@Override
public PaxosRepairState paxosRepairState()
{
return delegate().paxosRepairState();
}
}
}
Expand Up @@ -1136,7 +1136,7 @@ public Future<?> repairPaxosForTopologyChange(String ksName, Collection<Range<To
range, table.keyspace, table.name, pending, PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE.getKey()));

}
Future<Void> future = PaxosCleanup.cleanup(endpoints, table, Collections.singleton(range), false, repairCommandExecutor());
Future<Void> future = PaxosCleanup.cleanup(ctx, endpoints, table, Collections.singleton(range), false, repairCommandExecutor());
futures.add(future);
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/java/org/apache/cassandra/service/StorageService.java
Expand Up @@ -180,6 +180,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.RepairCoordinator;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.schema.KeyspaceMetadata;
Expand All @@ -199,7 +200,7 @@
import org.apache.cassandra.service.paxos.PaxosRepair;
import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupLocalCoordinator;
import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs;
import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
import org.apache.cassandra.service.snapshot.SnapshotManager;
import org.apache.cassandra.service.snapshot.TableSnapshot;
import org.apache.cassandra.streaming.StreamManager;
Expand Down Expand Up @@ -4863,7 +4864,7 @@ public Future<?> autoRepairPaxos(TableId tableId)
return ImmediateFuture.success(null);

List<Range<Token>> ranges = getLocalAndPendingRanges(table.keyspace);
PaxosCleanupLocalCoordinator coordinator = PaxosCleanupLocalCoordinator.createForAutoRepair(tableId, ranges);
PaxosCleanupLocalCoordinator coordinator = PaxosCleanupLocalCoordinator.createForAutoRepair(SharedContext.Global.instance, tableId, ranges);
ScheduledExecutors.optionalTasks.submit(coordinator::start);
return coordinator;
}
Expand Down Expand Up @@ -7349,7 +7350,7 @@ public void evictHungRepairs()
public void clearPaxosRepairs()
{
logger.info("StorageService#clearPaxosRepairs called via jmx");
PaxosTableRepairs.clearRepairs();
PaxosRepairState.instance().clearRepairs();
}

public void setSkipPaxosRepairCompatibilityCheck(boolean v)
Expand Down
12 changes: 9 additions & 3 deletions src/java/org/apache/cassandra/service/paxos/Paxos.java
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -85,9 +86,9 @@
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.FailureRecordingCallback.AsMap;
import org.apache.cassandra.service.paxos.Commit.Proposal;
import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
import org.apache.cassandra.service.reads.DataResolver;
import org.apache.cassandra.service.reads.repair.NoopReadRepair;
import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.triggers.TriggerExecutor;
import org.apache.cassandra.utils.CassandraVersion;
Expand Down Expand Up @@ -385,13 +386,18 @@ public EndpointsForToken readCandidates()
}

static Participants get(TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this here as Paxos uses it... I updated the stuff that mattered for the existing tests to use the new API.

I did see PaxosRepair touches this as well, but I assume it didn't matter as we don't have data in paxos so that logic prob didn't get touched?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird, I don't know why my IDE Intellij Idea emits a warning that the Config class is deprecated. Not after your patch, though; it's just observation.

{
return get(table, token, consistencyForConsensus, FailureDetector.isReplicaAlive);
}

static Participants get(TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus, Predicate<Replica> isReplicaAlive)
{
Keyspace keyspace = Keyspace.open(table.keyspace);
ReplicaLayout.ForTokenWrite all = forTokenWriteLiveAndDown(keyspace, token);
ReplicaLayout.ForTokenWrite electorate = consistencyForConsensus.isDatacenterLocal()
? all.filter(InOurDc.replicas()) : all;

EndpointsForToken live = all.all().filter(FailureDetector.isReplicaAlive);
EndpointsForToken live = all.all().filter(isReplicaAlive);

return new Participants(keyspace, consistencyForConsensus, all, electorate, live);
}
Expand Down Expand Up @@ -1255,6 +1261,6 @@ static boolean isOldParticipant(Replica replica)

public static void evictHungRepairs()
{
PaxosTableRepairs.evictHungRepairs();
PaxosRepairState.instance().evictHungRepairs();
}
}
43 changes: 10 additions & 33 deletions src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
Expand Up @@ -42,10 +42,7 @@
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.gms.IGossiper;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
Expand All @@ -55,6 +52,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallbackWithFailure;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
Expand Down Expand Up @@ -659,45 +657,24 @@ static boolean validateVersionCompatibility(CassandraVersion version)
return (version.major == 4 && version.minor > 0) || version.major > 4;
}

static String getPeerVersion(InetAddressAndPort peer)
static boolean validatePeerCompatibility(IGossiper gossiper, Replica peer)
{
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(peer);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is duplicate logic from Gossiper, the only difference was

try
        {
            return value.value;
        }
        catch (IllegalArgumentException e)
        {
            return null;
        }

Not sure how a field access leads to IllegalArgumentException, so removed in favor of the gossip function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know about the IllegalArgumentException, probably some leftover

if (epState == null)
return null;

VersionedValue value = epState.getApplicationState(ApplicationState.RELEASE_VERSION);
if (value == null)
return null;

try
{
return value.value;
}
catch (IllegalArgumentException e)
{
return null;
}
}

static boolean validatePeerCompatibility(Replica peer)
{
String versionString = getPeerVersion(peer.endpoint());
CassandraVersion version = versionString != null ? new CassandraVersion(versionString) : null;
CassandraVersion version = gossiper.getReleaseVersion(peer.endpoint());
boolean result = validateVersionCompatibility(version);
if (!result)
logger.info("PaxosRepair isn't supported by {} on version {}", peer, versionString);
logger.info("PaxosRepair isn't supported by {} on version {}", peer, version);
return result;
}

static boolean validatePeerCompatibility(TableMetadata table, Range<Token> range)
static boolean validatePeerCompatibility(SharedContext ctx, TableMetadata table, Range<Token> range)
{
Participants participants = Participants.get(table, range.right, ConsistencyLevel.SERIAL);
return Iterables.all(participants.all, PaxosRepair::validatePeerCompatibility);
Participants participants = Participants.get(table, range.right, ConsistencyLevel.SERIAL, r -> ctx.failureDetector().isAlive(r.endpoint()));
return Iterables.all(participants.all, r -> validatePeerCompatibility(ctx.gossiper(), r));
}

public static boolean validatePeerCompatibility(TableMetadata table, Collection<Range<Token>> ranges)
public static boolean validatePeerCompatibility(SharedContext ctx, TableMetadata table, Collection<Range<Token>> ranges)
{
return Iterables.all(ranges, range -> validatePeerCompatibility(table, range));
return Iterables.all(ranges, range -> validatePeerCompatibility(ctx, table, range));
}

public static void shutdownAndWait(long timeout, TimeUnit units) throws InterruptedException, TimeoutException
Expand Down