Skip to content

Commit

Permalink
Increment cross regional duplicate tokens to replicate the policy we …
Browse files Browse the repository at this point in the history
…have been applying manually. (#1048)

* Increment cross regional duplicate tokens to replicate the policy we have been applying manually. Throw when duplicate tokens are created in region because that would be an obvious error and we should not add two nodes in the same region so closely together.

* Improve error message on intra-regional duplicate token.
  • Loading branch information
mattl-netflix committed Apr 25, 2023
1 parent 68103bb commit 457bd3a
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 23 deletions.
Expand Up @@ -155,30 +155,53 @@ public PriamInstance retriableCall() throws Exception {
set(100, 100);
logger.info("Trying to generate a new token");
sleeper.sleep(new Random().nextInt(15000));
String myRegion = myInstanceInfo.getRegion();
// this offset ensures the nodes are spread far away from the other regions.
int regionOffset = tokenManager.regionOffset(myRegion);
String myRac = myInstanceInfo.getRac();
List<String> racs = config.getRacs();
int mySlot =
factory.getAllIds(config.getAppName())
.stream()
.filter(i -> i.getRac().equals(myRac))
.map(PriamInstance::getId)
.max(Integer::compareTo)
.map(id -> racs.size() + Math.max(id, regionOffset) - regionOffset)
.orElseGet(
() -> {
Preconditions.checkState(racs.contains(myRac));
return racs.indexOf(myRac);
});
int instanceCount = membership.getRacCount() * membership.getRacMembershipSize();
String newToken = tokenManager.createToken(mySlot, instanceCount, myRegion);
return createToken(mySlot + regionOffset, newToken);
return generateNewToken();
}
}.call();
}

@VisibleForTesting
PriamInstance generateNewToken() {
String myRegion = myInstanceInfo.getRegion();
// this offset ensures the nodes are spread far away from the other regions.
int regionOffset = tokenManager.regionOffset(myRegion);
String myRac = myInstanceInfo.getRac();
List<String> racs = config.getRacs();
ImmutableSet<PriamInstance> allIds = factory.getAllIds(config.getAppName());
int mySlot =
allIds.stream()
.filter(i -> i.getRac().equals(myRac))
.map(PriamInstance::getId)
.max(Integer::compareTo)
.map(id -> racs.size() + Math.max(id, regionOffset) - regionOffset)
.orElseGet(
() -> {
Preconditions.checkState(racs.contains(myRac));
return racs.indexOf(myRac);
});
int instanceCount = membership.getRacCount() * membership.getRacMembershipSize();
String newToken = tokenManager.createToken(mySlot, instanceCount, myRegion);
while (newTokenIsADuplicate(newToken, allIds)) {
newToken = new BigInteger(newToken).add(BigInteger.ONE).toString();
}
return createToken(mySlot + regionOffset, newToken);
}

private boolean newTokenIsADuplicate(String newToken, ImmutableSet<PriamInstance> instances) {
for (PriamInstance priamInstance : instances) {
if (newToken.equals(priamInstance.getToken())) {
if (myInstanceInfo.getRegion().equals(priamInstance.getDC())) {
throw new IllegalStateException(
String.format(
"Trying to add token %s to %s but it already exists in %s",
newToken, myInstanceInfo.getRegion(), priamInstance.getDC()));
}
return true;
}
}
return false;
}

private String getReplacedIpForAssignedToken(
ImmutableSet<PriamInstance> aliveInstances, PriamInstance instance)
throws TokenRetrieverUtils.GossipParseException {
Expand Down
Expand Up @@ -42,6 +42,7 @@ public class FakeConfiguration implements IConfiguration {
private boolean skipIngressUnlessIPIsPublic;
private long compressionTransitionEpochMillis;
private boolean autoSnapshot;
private String partitioner;

public Map<String, String> fakeProperties = new HashMap<>();

Expand Down Expand Up @@ -313,4 +314,13 @@ public FakeConfiguration setAutoSnapshot(boolean autoSnapshot) {
public boolean getAutoSnapshot() {
return autoSnapshot;
}

public void setPartitioner(String partitioner) {
this.partitioner = partitioner;
}

@Override
public String getPartitioner() {
return partitioner;
}
}
Expand Up @@ -94,4 +94,12 @@ public String getAutoScalingGroup() {
public InstanceEnvironment getInstanceEnvironment() {
return InstanceEnvironment.VPC;
}

public void setRac(String rac) {
this.availabilityZone = rac;
}

public void setRegion(String region) {
this.region = region;
}
}
Expand Up @@ -17,9 +17,7 @@

package com.netflix.priam.identity.token;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.*;
import com.google.common.truth.Truth;
import com.google.inject.Guice;
import com.google.inject.Injector;
Expand All @@ -29,6 +27,7 @@
import com.netflix.priam.identity.IMembership;
import com.netflix.priam.identity.IPriamInstanceFactory;
import com.netflix.priam.identity.PriamInstance;
import com.netflix.priam.identity.config.FakeInstanceInfo;
import com.netflix.priam.identity.config.InstanceInfo;
import com.netflix.priam.utils.FakeSleeper;
import com.netflix.priam.utils.SystemUtils;
Expand All @@ -42,6 +41,7 @@
import mockit.Mocked;
import org.apache.commons.lang3.math.Fraction;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

Expand Down Expand Up @@ -425,6 +425,43 @@ public void testRingPositionLast(@Mocked SystemUtils systemUtils) throws Excepti
Truth.assertThat(tokenRetriever.getRingPosition()).isEqualTo(Fraction.getFraction(6, 7));
}

@Test
public void testThrowOnDuplicateTokenInSameRegion() {
prepareTokenGenerationTest();
create(1, instanceInfo.getInstanceId(), "host_0", "1.2.3.4", "us-east-1d", 1808575600 + "");
Assert.assertThrows(
IllegalStateException.class, () -> getTokenRetriever().generateNewToken());
}

@Test
public void testIncrementDuplicateTokenInDifferentRegion() {
((FakeInstanceInfo) instanceInfo).setRegion("us-west-2");
create(1, instanceInfo.getInstanceId(), "host_0", "1.2.3.4", "us-west-2a", 1808575600 + "");
prepareTokenGenerationTest();
Truth.assertThat(getTokenRetriever().generateNewToken().getToken()).isEqualTo("1808575601");
}

private void prepareTokenGenerationTest() {
((FakeConfiguration) configuration).setCreateNewToken(true);
((FakeConfiguration) configuration)
.setPartitioner("org.apache.cassandra.dht.RandomPartitioner");
((FakeConfiguration) configuration).setRacs("us-east-1c", "us-east-1d", "us-east-1e");
((FakeInstanceInfo) instanceInfo).setRegion("us-east-1");
((FakeInstanceInfo) instanceInfo).setRac("us-east-1c");
new Expectations() {
{
membership.getRacMembershipSize();
result = 2;
}
};
new Expectations() {
{
membership.getRacCount();
result = 3;
}
};
}

private String getStatus(List<String> liveInstances, Map<String, String> tokenToEndpointMap) {
JSONObject jsonObject = new JSONObject();
try {
Expand Down

0 comments on commit 457bd3a

Please sign in to comment.