New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CASSANDRA-19042: Repair fuzz tests fail with paxos_variant: v2 #3100
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
|
||
package org.apache.cassandra.net; | ||
|
||
import org.apache.cassandra.exceptions.RequestFailureReason; | ||
import org.apache.cassandra.locator.InetAddressAndPort; | ||
import org.apache.cassandra.utils.concurrent.Future; | ||
|
||
|
@@ -28,4 +29,8 @@ public interface MessageDelivery | |
public <REQ, RSP> void sendWithCallback(Message<REQ> message, InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType specifyConnection); | ||
public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ> message, InetAddressAndPort to); | ||
public <V> void respond(V response, Message<?> message); | ||
public default void respondWithFailure(RequestFailureReason reason, Message<?> message) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you remove the public modifiers from this interface, please? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to be consistent with the rest of the file, I left |
||
{ | ||
send(Message.failureResponse(message.id(), message.expiresAtNanos(), reason), message.respondTo()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ | |
import java.util.Set; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Function; | ||
import java.util.function.Predicate; | ||
import java.util.function.Supplier; | ||
|
||
import javax.annotation.Nullable; | ||
|
@@ -85,9 +86,9 @@ | |
import org.apache.cassandra.service.ClientState; | ||
import org.apache.cassandra.service.FailureRecordingCallback.AsMap; | ||
import org.apache.cassandra.service.paxos.Commit.Proposal; | ||
import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; | ||
import org.apache.cassandra.service.reads.DataResolver; | ||
import org.apache.cassandra.service.reads.repair.NoopReadRepair; | ||
import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs; | ||
import org.apache.cassandra.tracing.Tracing; | ||
import org.apache.cassandra.triggers.TriggerExecutor; | ||
import org.apache.cassandra.utils.CassandraVersion; | ||
|
@@ -385,13 +386,18 @@ public EndpointsForToken readCandidates() | |
} | ||
|
||
static Participants get(TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I left this here as I did see There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Weird, I don't know why my IDE Intellij Idea emits a warning that the Config class is deprecated. Not after your patch, though; it's just observation. |
||
{ | ||
return get(table, token, consistencyForConsensus, FailureDetector.isReplicaAlive); | ||
} | ||
|
||
static Participants get(TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus, Predicate<Replica> isReplicaAlive) | ||
{ | ||
Keyspace keyspace = Keyspace.open(table.keyspace); | ||
ReplicaLayout.ForTokenWrite all = forTokenWriteLiveAndDown(keyspace, token); | ||
ReplicaLayout.ForTokenWrite electorate = consistencyForConsensus.isDatacenterLocal() | ||
? all.filter(InOurDc.replicas()) : all; | ||
|
||
EndpointsForToken live = all.all().filter(FailureDetector.isReplicaAlive); | ||
EndpointsForToken live = all.all().filter(isReplicaAlive); | ||
|
||
return new Participants(keyspace, consistencyForConsensus, all, electorate, live); | ||
} | ||
|
@@ -1255,6 +1261,6 @@ static boolean isOldParticipant(Replica replica) | |
|
||
public static void evictHungRepairs() | ||
{ | ||
PaxosTableRepairs.evictHungRepairs(); | ||
PaxosRepairState.instance().evictHungRepairs(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,10 +42,7 @@ | |
import org.apache.cassandra.dht.Token; | ||
import org.apache.cassandra.exceptions.RequestFailureReason; | ||
import org.apache.cassandra.exceptions.UnavailableException; | ||
import org.apache.cassandra.gms.ApplicationState; | ||
import org.apache.cassandra.gms.EndpointState; | ||
import org.apache.cassandra.gms.Gossiper; | ||
import org.apache.cassandra.gms.VersionedValue; | ||
import org.apache.cassandra.gms.IGossiper; | ||
import org.apache.cassandra.io.IVersionedSerializer; | ||
import org.apache.cassandra.io.util.DataInputPlus; | ||
import org.apache.cassandra.io.util.DataOutputPlus; | ||
|
@@ -55,6 +52,7 @@ | |
import org.apache.cassandra.net.Message; | ||
import org.apache.cassandra.net.MessagingService; | ||
import org.apache.cassandra.net.RequestCallbackWithFailure; | ||
import org.apache.cassandra.repair.SharedContext; | ||
import org.apache.cassandra.schema.Schema; | ||
import org.apache.cassandra.schema.TableId; | ||
import org.apache.cassandra.schema.TableMetadata; | ||
|
@@ -659,45 +657,24 @@ static boolean validateVersionCompatibility(CassandraVersion version) | |
return (version.major == 4 && version.minor > 0) || version.major > 4; | ||
} | ||
|
||
static String getPeerVersion(InetAddressAndPort peer) | ||
static boolean validatePeerCompatibility(IGossiper gossiper, Replica peer) | ||
{ | ||
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(peer); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is duplicate logic from
Not sure how a field access leads to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know about the IllegalArgumentException, probably some leftover |
||
if (epState == null) | ||
return null; | ||
|
||
VersionedValue value = epState.getApplicationState(ApplicationState.RELEASE_VERSION); | ||
if (value == null) | ||
return null; | ||
|
||
try | ||
{ | ||
return value.value; | ||
} | ||
catch (IllegalArgumentException e) | ||
{ | ||
return null; | ||
} | ||
} | ||
|
||
static boolean validatePeerCompatibility(Replica peer) | ||
{ | ||
String versionString = getPeerVersion(peer.endpoint()); | ||
CassandraVersion version = versionString != null ? new CassandraVersion(versionString) : null; | ||
CassandraVersion version = gossiper.getReleaseVersion(peer.endpoint()); | ||
boolean result = validateVersionCompatibility(version); | ||
if (!result) | ||
logger.info("PaxosRepair isn't supported by {} on version {}", peer, versionString); | ||
logger.info("PaxosRepair isn't supported by {} on version {}", peer, version); | ||
return result; | ||
} | ||
|
||
static boolean validatePeerCompatibility(TableMetadata table, Range<Token> range) | ||
static boolean validatePeerCompatibility(SharedContext ctx, TableMetadata table, Range<Token> range) | ||
{ | ||
Participants participants = Participants.get(table, range.right, ConsistencyLevel.SERIAL); | ||
return Iterables.all(participants.all, PaxosRepair::validatePeerCompatibility); | ||
Participants participants = Participants.get(table, range.right, ConsistencyLevel.SERIAL, r -> ctx.failureDetector().isAlive(r.endpoint())); | ||
return Iterables.all(participants.all, r -> validatePeerCompatibility(ctx.gossiper(), r)); | ||
} | ||
|
||
public static boolean validatePeerCompatibility(TableMetadata table, Collection<Range<Token>> ranges) | ||
public static boolean validatePeerCompatibility(SharedContext ctx, TableMetadata table, Collection<Range<Token>> ranges) | ||
{ | ||
return Iterables.all(ranges, range -> validatePeerCompatibility(table, range)); | ||
return Iterables.all(ranges, range -> validatePeerCompatibility(ctx, table, range)); | ||
} | ||
|
||
public static void shutdownAndWait(long timeout, TimeUnit units) throws InterruptedException, TimeoutException | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two can benefit from some javadoc