Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster metadata id to gossip syn messages #3284

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/java/org/apache/cassandra/gms/GossipDigestSyn.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.tcm.ClusterMetadata;

/**
* This is the first message that gets sent out as a start of the Gossip protocol in a
Expand All @@ -37,12 +39,14 @@ public class GossipDigestSyn

final String clusterId;
final String partioner;
final int metadataId;
final List<GossipDigest> gDigests;

public GossipDigestSyn(String clusterId, String partioner, List<GossipDigest> gDigests)
public GossipDigestSyn(String clusterId, String partioner, int metadataId, List<GossipDigest> gDigests)
{
this.clusterId = clusterId;
this.partioner = partioner;
this.metadataId = metadataId;
this.gDigests = gDigests;
}

Expand Down Expand Up @@ -85,6 +89,8 @@ public void serialize(GossipDigestSyn gDigestSynMessage, DataOutputPlus out, int
{
out.writeUTF(gDigestSynMessage.clusterId);
out.writeUTF(gDigestSynMessage.partioner);
if (version >= MessagingService.VERSION_51)
out.writeUnsignedVInt32(gDigestSynMessage.metadataId);
GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests, out, version);
}

Expand All @@ -93,14 +99,19 @@ public GossipDigestSyn deserialize(DataInputPlus in, int version) throws IOExcep
String clusterId = in.readUTF();
String partioner = null;
partioner = in.readUTF();
int metadataId = version >= MessagingService.VERSION_51
? in.readUnsignedVInt32()
: ClusterMetadata.EMPTY_METADATA_IDENTIFIER;
List<GossipDigest> gDigests = GossipDigestSerializationHelper.deserialize(in, version);
return new GossipDigestSyn(clusterId, partioner, gDigests);
return new GossipDigestSyn(clusterId, partioner, metadataId, gDigests);
}

public long serializedSize(GossipDigestSyn syn, int version)
{
long size = TypeSizes.sizeof(syn.clusterId);
size += TypeSizes.sizeof(syn.partioner);
if (version >= MessagingService.VERSION_51)
size += TypeSizes.sizeofUnsignedVInt(syn.metadataId);
size += GossipDigestSerializationHelper.serializedSize(syn.gDigests, version);
return size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.tcm.ClusterMetadata;

import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_ACK;

Expand Down Expand Up @@ -65,6 +66,12 @@ public void doVerb(Message<GossipDigestSyn> message)
return;
}

if (gDigestMessage.metadataId != ClusterMetadata.current().metadataIdentifier)
{
logger.warn("Cluster metadata identifier mismatch from {} {}!={}", from, gDigestMessage.metadataId, ClusterMetadata.current().metadataIdentifier);
return;
}

List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
// if the syn comes from a peer performing a shadow round and this node is
// also currently in a shadow round, send back a minimal ack. This node must
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/gms/Gossiper.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ public void run()
{
GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(),
getPartitionerName(),
ClusterMetadata.current().metadataIdentifier,
gDigests);
Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, digestSynMessage);
/* Gossip to some random live member */
Expand Down Expand Up @@ -1353,6 +1354,7 @@ public void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap)
InetAddressAndPort ep = entry.getKey();
if (ep.equals(getBroadcastAddressAndPort()))
continue;

if (justRemovedEndpoints.containsKey(ep))
{
if (logger.isTraceEnabled())
Expand Down Expand Up @@ -2223,6 +2225,7 @@ public void triggerRoundWithCMS()
Gossiper.instance.makeGossipDigest(gDigests);
GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(),
getPartitionerName(),
ClusterMetadata.current().metadataIdentifier,
gDigests);
Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, digestSynMessage);
sendGossip(message, cms);
Expand Down
6 changes: 5 additions & 1 deletion src/java/org/apache/cassandra/gms/NewGossiper.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.compatibility.GossipHelper;
import org.apache.cassandra.utils.concurrent.Accumulator;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
Expand Down Expand Up @@ -127,7 +128,10 @@ public boolean isDone()
public Promise<Map<InetAddressAndPort, EndpointState>> doShadowRound()
{
// send a completely empty syn
GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(), getPartitionerName(), new ArrayList<>());
GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(),
getPartitionerName(),
ClusterMetadata.current().metadataIdentifier,
new ArrayList<>());
Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, digestSynMessage);

logger.info("Sending shadow round GOSSIP DIGEST SYN to known peers {}", peers);
Expand Down
Binary file modified test/data/serialization/5.1/gms.EndpointState.bin
Binary file not shown.
Binary file modified test/data/serialization/5.1/gms.Gossip.bin
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,171 @@
package org.apache.cassandra.distributed.test.tcm;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Assert;
import org.junit.Test;

import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IMessageFilters;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.SimpleSeedProvider;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.log.LogState;
import org.awaitility.Awaitility;

import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class SplitBrainTest extends TestBaseImpl
{
@Test
public void testSplitBrainStartup() throws IOException, TimeoutException
{
// partition the cluster in 2 parts on startup, node1, node2 in one, node3, node4 in the other
try (Cluster cluster = builder().withNodes(4)
.withConfig(config -> config.with(GOSSIP).with(NETWORK)
.set("seed_provider", new IInstanceConfig.ParameterizedClass(SimpleSeedProvider.class.getName(),
Collections.singletonMap("seeds", "127.0.0.1,127.0.0.3")))
.set("discovery_timeout", "1s"))
.createWithoutStarting())
try (Setup setup = setupSplitBrainCluster())
{
cluster.filters().allVerbs().from(1,2).to(3,4).drop();
cluster.filters().allVerbs().from(3,4).to(1,2).drop();
List<Thread> startupThreads = new ArrayList<>(4);
for (int i = 0; i < 4; i++)
{
int threadNr = i + 1;
startupThreads.add(new Thread(() -> cluster.get(threadNr).startup()));
}
startupThreads.forEach(Thread::start);
startupThreads.forEach(SplitBrainTest::join);
cluster.filters().reset();
Cluster cluster = setup.cluster;
// Perform a schema change on one of the clusters resulting from the split brain during initialisation
// before dropping message filters. When comms can be reestablished, we fake the replication of metdata
// state from on cluster to the other.
cluster.coordinator(1).execute(withKeyspace("create keyspace %s with replication = {'class':'SimpleStrategy', 'replication_factor':1}"), ConsistencyLevel.ALL);
long clusterOneEpoch = ClusterUtils.getCurrentEpoch(cluster.get(1)).getEpoch();
long clusterTwoEpoch = ClusterUtils.getCurrentEpoch(cluster.get(3)).getEpoch();
assertTrue(clusterOneEpoch > clusterTwoEpoch);

// Artificially induce node1 to replicate to node3. This should be rejected by node3 as the two technically
// belong to different clusters.
long mark = cluster.get(3).logs().mark();
// Turn off the initial filters
setup.reenableCommunication();

cluster.get(1).runOnInstance(() -> {
LogState state = LogState.getForRecovery(ClusterMetadata.current().epoch);
MessagingService.instance().send(Message.out(Verb.TCM_REPLICATION, state),
InetAddressAndPort.getByNameUnchecked("127.0.0.3"));
});
cluster.get(3).logs().watchFor(mark, Duration.ofSeconds(10), "Cluster Metadata Identifier mismatch");
assertEquals(clusterOneEpoch, ClusterUtils.getCurrentEpoch(cluster.get(1)).getEpoch());
assertEquals(clusterTwoEpoch, ClusterUtils.getCurrentEpoch(cluster.get(3)).getEpoch());
}
}


@Test
public void testFilterGossipStatesWithMismatchingMetadataId() throws IOException, TimeoutException
{
try (Setup setup = setupSplitBrainCluster())
{
Cluster cluster = setup.cluster;
// Allow nodes from the two clusters to communicate again. Because each node's seed list contains an
// instance from the other cluster, they will attempt to perform gossip exchange with that instance.
// Verify that when this happens, gossip state isn't updated with instances from the other cluster.
AtomicInteger node1Received = new AtomicInteger(0);
AtomicInteger node3Received = new AtomicInteger(0);

cluster.filters().inbound().from(1,2,3,4).to(1,2,3,4).messagesMatching((from, to, msg) -> {
if (msg.verb() == Verb.GOSSIP_DIGEST_SYN.id ||
msg.verb() == Verb.GOSSIP_DIGEST_ACK.id ||
msg.verb() == Verb.GOSSIP_DIGEST_ACK2.id)
{
if (to == 1 && (from == 3 || from == 4))
node1Received.incrementAndGet();
if (to == 3 && (from == 1 || from == 2))
node3Received.incrementAndGet();
}
return false;
}).drop().on();

// Turn off the initial filters
setup.reenableCommunication();

// Wait for cross-cluster gossip communication
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.until(() -> node1Received.get() > 5 && node3Received.get() > 5);

// Verify that gossip states for nodes which are not a member of the same cluster were disregarded.
// Each node should have gossip state only for itself and the one other member of its cluster.
cluster.forEach(inst -> {
int id = inst.config().num();
boolean gossipStateValid = inst.callOnInstance((() -> {
Map<InetAddressAndPort, EndpointState> eps = Gossiper.instance.endpointStateMap;
if (eps.size() != 2)
return false;
Collection<InetAddressAndPort> expectedEps = (id <= 2)
? Arrays.asList(InetAddressAndPort.getByNameUnchecked("127.0.0.1"),
InetAddressAndPort.getByNameUnchecked("127.0.0.2"))
: Arrays.asList(InetAddressAndPort.getByNameUnchecked("127.0.0.3"),
InetAddressAndPort.getByNameUnchecked("127.0.0.4"));
return eps.keySet().containsAll(expectedEps);
}));
Assert.assertTrue(String.format("Unexpected gossip state on node %s", id), gossipStateValid);
});
}
}

cluster.get(3).logs().watchFor("Cluster Metadata Identifier mismatch");
private Setup setupSplitBrainCluster() throws IOException
{
// partition the cluster in 2 parts on startup, node1, node2 in one, node3, node4 in the other
Cluster cluster = builder().withNodes(4)
.withConfig(config -> config.with(GOSSIP).with(NETWORK)
.set("seed_provider", new IInstanceConfig.ParameterizedClass(SimpleSeedProvider.class.getName(),
Collections.singletonMap("seeds", "127.0.0.1,127.0.0.3")))
.set("discovery_timeout", "1s"))
.createWithoutStarting();
IMessageFilters.Filter drop1 = cluster.filters().allVerbs().from(1, 2).to(3, 4).drop();
IMessageFilters.Filter drop2 = cluster.filters().allVerbs().from(3, 4).to(1, 2).drop();
List<Thread> startupThreads = new ArrayList<>(4);
for (int i = 0; i < 4; i++)
{
int threadNr = i + 1;
startupThreads.add(new Thread(() -> cluster.get(threadNr).startup()));
}
startupThreads.forEach(Thread::start);
startupThreads.forEach(SplitBrainTest::join);
return new Setup(cluster, drop1, drop2);
}

private final class Setup implements AutoCloseable
{
final Cluster cluster;
final IMessageFilters.Filter[] filters;

Setup(Cluster cluster, IMessageFilters.Filter ... filters)
{
this.cluster = cluster;
this.filters = filters;
}

void reenableCommunication()
{
for (IMessageFilters.Filter filter : filters)
filter.off();
}

@Override
public void close()
{
cluster.close();
}
}

Expand All @@ -76,4 +198,5 @@ private static void join(Thread t)
throw new RuntimeException(e);
}
}

}
7 changes: 4 additions & 3 deletions test/unit/org/apache/cassandra/gms/SerializationsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private void testGossipDigestWrite() throws IOException
GossipDigestAck2 ack2 = new GossipDigestAck2(states);
GossipDigestSyn syn = new GossipDigestSyn("Not a real cluster name",
ClusterMetadata.current().tokenMap.partitioner().getClass().getCanonicalName(),
20240430,
Statics.Digests);

DataOutputStreamPlus out = getOutput("gms.Gossip.bin");
Expand All @@ -113,8 +114,8 @@ public void testGossipDigestRead() throws IOException

int count = 0;
FileInputStreamPlus in = getInput("gms.Gossip.bin");
while (count < Statics.Digests.size())
assert GossipDigestAck2.serializer.deserialize(in, getVersion()) != null;
while (count++ < Statics.Digests.size())
assert GossipDigest.serializer.deserialize(in, getVersion()) != null;
assert GossipDigestAck.serializer.deserialize(in, getVersion()) != null;
assert GossipDigestAck2.serializer.deserialize(in, getVersion()) != null;
assert GossipDigestSyn.serializer.deserialize(in, getVersion()) != null;
Expand All @@ -130,7 +131,7 @@ private static class Statics
private static VersionedValue vv0 = vvFact.load(23d);
private static VersionedValue vv1 = vvFact.bootstrapping(Collections.<Token>singleton(partitioner.getRandomToken()));
private static List<GossipDigest> Digests = new ArrayList<GossipDigest>();

static
{
HeartbeatSt.updateHeartBeat();
EndpointSt.addApplicationState(ApplicationState.LOAD, vv0);
Expand Down
3 changes: 2 additions & 1 deletion test/unit/org/apache/cassandra/net/HandshakeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.apache.cassandra.net.OutboundConnectionInitiator.Result;
import static org.apache.cassandra.net.OutboundConnectionInitiator.SslFallbackConnectionType;
import static org.apache.cassandra.net.OutboundConnectionInitiator.initiateMessaging;
import static org.apache.cassandra.tcm.ClusterMetadata.EMPTY_METADATA_IDENTIFIER;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -317,7 +318,7 @@ private OutboundConnection initiateOutbound(InetAddressAndPort endpoint, SslFall
.withDebugCallbacks(new HandshakeAcknowledgeChecker(t -> handshakeEx = t))
.withFrom(FROM_ADDR);
OutboundConnections outboundConnections = OutboundConnections.tryRegister(new ConcurrentHashMap<>(), TO_ADDR, settings);
GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner", new ArrayList<>(0));
GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner", EMPTY_METADATA_IDENTIFIER, new ArrayList<>(0));
Message<GossipDigestSyn> message = Message.out(Verb.GOSSIP_DIGEST_SYN, syn);
OutboundConnection outboundConnection = outboundConnections.connectionFor(message);
outboundConnection.enqueue(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import static org.apache.cassandra.net.MessagingService.current_version;
import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
import static org.apache.cassandra.tcm.ClusterMetadata.EMPTY_METADATA_IDENTIFIER;

public class OutboundConnectionsTest
{
Expand Down Expand Up @@ -96,7 +97,7 @@ public void tearDown() throws ExecutionException, InterruptedException, TimeoutE
@Test
public void getConnection_Gossip()
{
GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner", new ArrayList<>(0));
GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner", EMPTY_METADATA_IDENTIFIER, new ArrayList<>(0));
Message<GossipDigestSyn> message = Message.out(Verb.GOSSIP_DIGEST_SYN, syn);
Assert.assertEquals(ConnectionType.URGENT_MESSAGES, connections.connectionFor(message).type());
}
Expand Down