Skip to content

Commit

Permalink
Handle view changes during election
Browse files Browse the repository at this point in the history
Created test cases for the scenarios mentioned in the GitHub issue and
the fixes.

Elected leader leaving before the voting thread has finished. This
causes a liveness issue on both ELECTION and ELECTION2. We can fix this
by stopping the voting thread before starting it again.

Scenarios the coordinator leaves before finishing the election process
and a majority still exists. ELECTION algorithm has a liveness issue in
this case, since it does not take into account changes in the view
coordinator. We handle this case by calculating if the coordinator has
changed between views, there is a majority, it is currently the
coordinator, and there is no leader elected.

Close #259.
  • Loading branch information
jabolina committed Apr 12, 2024
1 parent a3ae7da commit e63f4b0
Show file tree
Hide file tree
Showing 6 changed files with 362 additions and 11 deletions.
16 changes: 13 additions & 3 deletions src/org/jgroups/protocols/raft/ELECTION.java
Expand Up @@ -9,7 +9,6 @@
import org.jgroups.raft.util.Utils.Majority;

import java.util.List;
import java.util.Objects;

/**
* The default leader election algorithm.
Expand Down Expand Up @@ -53,16 +52,27 @@ protected void handleView(View v) {
log.debug("%s: existing view: %s, new view: %s, result: %s", local_addr, this.view, v, result);
List<Address> joiners=View.newMembers(this.view, v);
boolean has_new_members=joiners != null && !joiners.isEmpty();
boolean coordinatorChanged = Utils.viewCoordinatorChanged(this.view, v);
this.view=v;
switch(result) {
case no_change: // the leader resends its term/address for new members to set the term/leader
case no_change:
// the leader resends its term/address for new members to set the term/leader.
if(raft.isLeader() && has_new_members)
sendLeaderElectedMessage(raft.leader(), raft.currentTerm());

// Handle cases where the previous coordinator left *before* a leader was elected.
// See: https://github.com/jgroups-extras/jgroups-raft/issues/259
else if (coordinatorChanged && isViewCoordinator() && isMajorityAvailable() && raft.leader() == null)
startVotingThread();
break;
case reached:
case leader_lost:
if(Objects.equals(this.view.getCoord(), local_addr)) {
// In case the leader is lost, we stop everything *before* starting again.
// This avoids cases where the leader is lost before the voting mechanism has stopped.
// See: https://github.com/jgroups-extras/jgroups-raft/issues/259
if(isViewCoordinator()) {
log.trace("%s: starting voting process (reason: %s, view: %s)", local_addr, result, view);
stopVotingThread();
startVotingThread();
}
break;
Expand Down
13 changes: 6 additions & 7 deletions src/org/jgroups/protocols/raft/ELECTION2.java
Expand Up @@ -17,7 +17,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.jgroups.Message.Flag.OOB;

Expand Down Expand Up @@ -87,13 +86,18 @@ protected void handleView(View v) {
}
// If we have no change in terms of majority threshold. If the view coordinator changed, we need to
// verify if an election is necessary.
if (viewCoordinatorChanged(old_view, v) && isViewCoordinator() && view.size() >= raft.majority()) {
if (Utils.viewCoordinatorChanged(old_view, v) && isViewCoordinator() && view.size() >= raft.majority()) {
preVotingMechanism.start();
}
break;
case reached:
case leader_lost:
// In case the leader is lost, we stop everything *before* starting again.
// This avoids cases where the leader is lost before the voting mechanism has stopped.
// See: https://github.com/jgroups-extras/jgroups-raft/issues/259
if (isViewCoordinator()) {
stopVotingThread();
preVotingMechanism.stop();
preVotingMechanism.start();
}
break;
Expand All @@ -120,11 +124,6 @@ protected void handleMessage(Message msg, RaftHeader hdr) {
super.handleMessage(msg, hdr);
}

private static boolean viewCoordinatorChanged(View old_view, View curr) {
if (old_view == null) return true;
return !Objects.equals(old_view.getCoord(), curr.getCoord());
}

/**
* Handle the {@link PreVoteRequest} coming from other nodes.
* <p>
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/raft/election/BaseElection.java
Expand Up @@ -325,7 +325,7 @@ protected void runVotingProcess() {
local_addr, votes.getValidResults(), time, majority);
}

private boolean isMajorityAvailable() {
protected final boolean isMajorityAvailable() {
return view != null && view.size() >= raft.majority();
}

Expand Down
14 changes: 14 additions & 0 deletions src/org/jgroups/raft/util/Utils.java
Expand Up @@ -5,6 +5,8 @@
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.raft.testfwk.RaftTestUtils;

import java.util.Objects;

/**
* @author Bela Ban
* @since 1.0.6
Expand Down Expand Up @@ -32,6 +34,18 @@ public static Majority computeMajority(View old, View new_view, int majority, Ad
return Majority.no_change;
}

/**
* Verify if the coordinator change between two views.
*
* @param prev: The old {@link View}, it can be <code>null</code>.
* @param curr: The recent {@link View}.
* @return <code>true</code> if the coordinator changed, <code>false</code>, otherwise.
*/
public static boolean viewCoordinatorChanged(View prev, View curr) {
if (prev == null) return true;
return !Objects.equals(prev.getCoord(), curr.getCoord());
}

/**
* Deletes the log data for the given {@link RAFT} instance.
* <p>
Expand Down
@@ -0,0 +1,119 @@
package org.jgroups.tests.election;

import org.jgroups.Address;
import org.jgroups.Global;
import org.jgroups.Header;
import org.jgroups.View;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.election.LeaderElected;
import org.jgroups.raft.testfwk.BlockingMessageInterceptor;
import org.jgroups.raft.testfwk.PartitionedRaftCluster;
import org.jgroups.raft.testfwk.RaftTestUtils;
import org.jgroups.tests.harness.BaseRaftElectionTest;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.testng.annotations.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.jgroups.tests.harness.BaseRaftElectionTest.ALL_ELECTION_CLASSES_PROVIDER;

@Test(groups = Global.FUNCTIONAL, singleThreaded = true, dataProvider = ALL_ELECTION_CLASSES_PROVIDER)
public class NetworkPartitionElectionTest extends BaseRaftElectionTest.ClusterBased<PartitionedRaftCluster> {

{
clusterSize = 5;

// Since it uses a data provider, it needs to execute per method to inject the values.
recreatePerMethod = true;
}

public void testNetworkPartitionDuringElection(Class<?> ignore) throws Exception {
withClusterSize(5);
createCluster();
long id = 0;

View view = createView(id++, 0, 1, 2, 3, 4);

// We intercept the first `LeaderElected` message.
AtomicBoolean onlyOnce = new AtomicBoolean(true);
BlockingMessageInterceptor interceptor = cluster.addCommandInterceptor(m -> {
for (Map.Entry<Short, Header> h : m.getHeaders().entrySet()) {
if (h.getValue() instanceof LeaderElected && onlyOnce.getAndSet(false)) {
// Assert that node A was elected
LeaderElected le = (LeaderElected) h.getValue();
assertThat(le.leader()).isEqualTo(address(0));
return true;
}
}
return false;
});

cluster.handleView(view);

System.out.println("-- wait command intercept");
assertThat(RaftTestUtils.eventually(() -> interceptor.numberOfBlockedMessages() > 0, 10, TimeUnit.SECONDS)).isTrue();

// While the message is in-flight, the cluster splits.
// The previous coordinator does not have the majority to proceed.
cluster.handleView(createView(id++, 0, 1));
cluster.handleView(createView(id++, 2, 3, 4));

// We can release the elected message.
interceptor.releaseNext();
interceptor.assertNoBlockedMessages();

// Check in all instances that a new leader is elected.
System.out.println("-- waiting for leader in majority partition");
BaseRaftElectionTest.waitUntilLeaderElected(rafts(), 10_000);

// Assert that A and B does not have a leader.
assertThat(raft(0).leader()).isNull();
assertThat(raft(1).leader()).isNull();

System.out.printf("-- elected during the split\n%s%n", dumpLeaderAndTerms());
// Store who's the leader before merging.
assertThat(leaders()).hasSize(1);
RAFT leader = raft(leaders().get(0));
long leaderTermBefore = leader.currentTerm();

System.out.printf("-- merge partition, leader=%s%n", leader);
// Join the partitions.
// Note that the coordinator is different.
cluster.handleView(createView(id++, 0, 1, 2, 3, 4));

// Wait until A and B receive the leader information.
BaseRaftElectionTest.waitUntilAllHaveLeaderElected(rafts(), 10_000);
System.out.printf("-- state after merge\n%s%n", dumpLeaderAndTerms());

// We assert the merge did not disrupt the cluster.
// Same leader and term.
assertThat(Arrays.stream(rafts())
.allMatch(r -> Objects.equals(leader.getAddress(), r.leader()) && r.currentTerm() == leaderTermBefore))
.withFailMessage(this::dumpLeaderAndTerms)
.isTrue();
}

private RAFT raft(Address address) {
for (RAFT raft : rafts()) {
if (Objects.equals(address, raft.getAddress()))
return raft;
}

throw new IllegalArgumentException(String.format("Node with address '%s' not present", address));
}

@Override
protected PartitionedRaftCluster createNewMockCluster() {
return new PartitionedRaftCluster();
}

@Override
protected void amendRAFTConfiguration(RAFT raft) {
raft.synchronous(true);
}
}

0 comments on commit e63f4b0

Please sign in to comment.