Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.xvnodesupport #626

Open
wants to merge 1 commit into
base: 3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ private void setPriamProperties()
String token = null;
String seeds = null;
boolean isReplace = false;
boolean isExternallyDefinedToken = false;
String replacedIp = "";
String extraEnvParams = null;

while (true)
{
try
{
token = DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/get_token");
isExternallyDefinedToken = Boolean.parseBoolean(DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/is_externally_defined_token"));
if (isExternallyDefinedToken) {
token = DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/get_token");
}
seeds = DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/get_seeds");
isReplace = Boolean.parseBoolean(DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/is_replace_token"));
replacedIp = DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/get_replaced_ip");
Expand All @@ -64,7 +68,7 @@ private void setPriamProperties()
e.printStackTrace();
}

if (token != null && seeds != null)
if ((token != null || !isExternallyDefinedToken) && seeds != null)
break;
try
{
Expand All @@ -75,8 +79,10 @@ private void setPriamProperties()
// do nothing.
}
}

System.setProperty("cassandra.initial_token", token);

if (isExternallyDefinedToken) {
System.setProperty("cassandra.initial_token", token);
}

setExtraEnvParams(extraEnvParams);

Expand Down
5 changes: 5 additions & 0 deletions priam/src/main/java/com/netflix/priam/IConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ public interface IConfiguration {
*/
public String getHostIP();

/**
* @return Gets the number of tokens assigned to the node when using virtual nodes.
*/
public int getNumTokens();

/**
* @return Bytes per second to throttle for backups
*/
Expand Down
2 changes: 1 addition & 1 deletion priam/src/main/java/com/netflix/priam/PriamServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public PriamServer(IConfiguration config, PriamScheduler scheduler, InstanceIden
}

public void intialize() throws Exception {
if (id.getInstance().isOutOfService())
if (id.isOutOfService())
return;

// start to schedule jobs
Expand Down
10 changes: 5 additions & 5 deletions priam/src/main/java/com/netflix/priam/aws/S3BackupPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public String getRemotePath() {
buff.append(baseDir).append(S3BackupPath.PATH_SEP); // Base dir
buff.append(region).append(S3BackupPath.PATH_SEP);
buff.append(clusterName).append(S3BackupPath.PATH_SEP);// Cluster name
buff.append(token).append(S3BackupPath.PATH_SEP);
buff.append(nodeIdentifier).append(S3BackupPath.PATH_SEP);
buff.append(formatDate(time)).append(S3BackupPath.PATH_SEP);
buff.append(type).append(S3BackupPath.PATH_SEP);
if (type != BackupFileType.META && type != BackupFileType.CL) {
Expand Down Expand Up @@ -83,7 +83,7 @@ public void parseRemote(String remoteFilePath) {
baseDir = pieces.get(0);
region = pieces.get(1);
clusterName = pieces.get(2);
token = pieces.get(3);
nodeIdentifier = pieces.get(3);
time = parseDate(pieces.get(4));
type = BackupFileType.valueOf(pieces.get(5));
if (type != BackupFileType.META && type != BackupFileType.CL) {
Expand All @@ -109,14 +109,14 @@ public void parsePartialPrefix(String remoteFilePath) {
baseDir = pieces.get(0);
region = pieces.get(1);
clusterName = pieces.get(2);
token = pieces.get(3);
nodeIdentifier = pieces.get(3);
}

@Override
public String remotePrefix(Date start, Date end, String location) {
StringBuffer buff = new StringBuffer(clusterPrefix(location));
token = factory.getInstance().getToken();
buff.append(token).append(S3BackupPath.PATH_SEP);
nodeIdentifier = instanceIdentity.getBackupIdentifier();
buff.append(nodeIdentifier).append(S3BackupPath.PATH_SEP);
// match the common characters to prefix.
buff.append(match(start, end));
return buff.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ protected List<ReplaceableAttribute> createAttributesToRegister(PriamInstance in
instance.setUpdatetime(new Date().getTime());
List<ReplaceableAttribute> attrs = new ArrayList<ReplaceableAttribute>();
attrs.add(new ReplaceableAttribute(Attributes.INSTANCE_ID, instance.getInstanceId(), false));
attrs.add(new ReplaceableAttribute(Attributes.TOKEN, instance.getToken(), true));
if (instance.getToken() != null) {
attrs.add(new ReplaceableAttribute(Attributes.TOKEN, instance.getToken(), true));
}
attrs.add(new ReplaceableAttribute(Attributes.APP_ID, instance.getApp(), true));
attrs.add(new ReplaceableAttribute(Attributes.ID, Integer.toString(instance.getId()), true));
attrs.add(new ReplaceableAttribute(Attributes.AVAILABILITY_ZONE, instance.getRac(), true));
Expand All @@ -152,7 +154,9 @@ protected List<ReplaceableAttribute> createAttributesToRegister(PriamInstance in
protected List<Attribute> createAttributesToDeRegister(PriamInstance instance) {
List<Attribute> attrs = new ArrayList<Attribute>();
attrs.add(new Attribute(Attributes.INSTANCE_ID, instance.getInstanceId()));
attrs.add(new Attribute(Attributes.TOKEN, instance.getToken()));
if (instance.getToken() != null) {
attrs.add(new Attribute(Attributes.TOKEN, instance.getToken()));
}
attrs.add(new Attribute(Attributes.APP_ID, instance.getApp()));
attrs.add(new Attribute(Attributes.ID, Integer.toString(instance.getId())));
attrs.add(new Attribute(Attributes.AVAILABILITY_ZONE, instance.getRac()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static TaskTimer getTimer(InstanceIdentity id) {
logger.info("Seed node. Instance id: {}"
+ ", host ip: {}"
+ ", host name: {}",
id.getInstance().getInstanceId(), id.getInstance().getHostIP(), id.getInstance().getHostName());
id.getInstanceId(), id.getHostIP(), id.getHostName());
return_ = new SimpleTimer(JOBNAME, 120 * 1000 + ran.nextInt(120 * 1000));
} else
return_ = new SimpleTimer(JOBNAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,21 @@ public enum BackupFileType {
protected String columnFamily;
protected String fileName;
protected String baseDir;
protected String token;
protected String nodeIdentifier;
protected String region;
protected Date time;
protected long size; //uncompressed file size
protected long compressedFileSize = 0;
protected boolean isCassandra1_0;

protected final InstanceIdentity factory;
protected final InstanceIdentity instanceIdentity;
protected final IConfiguration config;
protected File backupFile;
protected Date uploadedTs;
protected int awsSlowDownExceptionCounter = 0;

public AbstractBackupPath(IConfiguration config, InstanceIdentity factory) {
this.factory = factory;
public AbstractBackupPath(IConfiguration config, InstanceIdentity instanceIdentity) {
this.instanceIdentity = instanceIdentity;
this.config = config;
}

Expand All @@ -93,7 +93,7 @@ public void parseLocal(File file, BackupFileType type) throws ParseException {
this.clusterName = config.getAppName();
this.baseDir = config.getBackupLocation();
this.region = config.getDC();
this.token = factory.getInstance().getToken();
this.nodeIdentifier = instanceIdentity.getBackupIdentifier();
this.type = type;
if (type != BackupFileType.META && type != BackupFileType.CL) {
this.keyspace = elements[0];
Expand Down Expand Up @@ -214,8 +214,8 @@ public String getBaseDir() {
return baseDir;
}

public String getToken() {
return token;
public String getNodeIdentifier() {
return nodeIdentifier;
}

public String getRegion() {
Expand Down Expand Up @@ -262,7 +262,7 @@ public void setFileName(String fileName) {
}

public InstanceIdentity getInstanceIdentity() {
return this.factory;
return this.instanceIdentity;
}

public void setUploadedTs(Date uploadedTs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ public void execute() throws Exception {

Date startTime = Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime();
snapshotName = pathFactory.get().formatDate(startTime);
String token = instanceIdentity.getInstance().getToken();
String backupIdentifier = instanceIdentity.getBackupIdentifier();

// Save start snapshot status
BackupMetadata backupMetadata = new BackupMetadata(token, startTime);
BackupMetadata backupMetadata = new BackupMetadata(backupIdentifier, startTime);
snapshotStatusMgr.start(backupMetadata);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ public String getCassProcessName() {
return config.get(CONFIG_CASS_PROCESS_NAME, DEFAULT_CASS_PROCESS_NAME);
}

@Override
public int getNumTokens() {
return config.get(CONFIG_VNODE_NUM_TOKENS, DEFAULT_VNODE_NUM_TOKENS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.netflix.priam.utils.ITokenManager;
import com.netflix.priam.utils.RetryableCallable;
import com.netflix.priam.utils.Sleeper;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,6 +74,8 @@ public boolean test(PriamInstance input) {
};

private PriamInstance myInstance;
private String backupIdentifier;
private boolean outOfService = false;
private boolean isReplace = false;
private boolean isTokenPregenerated = false;
private String replacedIp = "";
Expand All @@ -99,7 +102,7 @@ public InstanceIdentity(IPriamInstanceFactory factory, IMembership membership, I
init();
}

public PriamInstance getInstance() {
PriamInstance getInstance() {
return myInstance;
}

Expand All @@ -113,7 +116,7 @@ public PriamInstance retriableCall() throws Exception {
for (PriamInstance ins : deadInstances) {
logger.info("[Dead] Iterating though the hosts: {}", ins.getInstanceId());
if (ins.getInstanceId().equals(config.getInstanceName())) {
ins.setOutOfService(true);
outOfService = true;
logger.info("[Dead] found that this node is dead."
+ " application: {}"
+ ", id: {}"
Expand Down Expand Up @@ -237,6 +240,13 @@ public void forEachExecution() {
}

logger.info("My token: {}", myInstance.getToken());

if (myInstance.getToken() == null || myInstance.getToken().isEmpty()) {
backupIdentifier = "virual" + Integer.toString(myInstance.getId());
} else
{
backupIdentifier = myInstance.getToken();
}
}

private void populateRacMap() {
Expand Down Expand Up @@ -299,4 +309,44 @@ public String getReplacedIp() {
private static boolean isInstanceDummy(PriamInstance instance) {
return instance.getInstanceId().equals(DUMMY_INSTANCE_ID);
}

public boolean isOutOfService()
{
return outOfService;
}

public String getBackupIdentifier()
{
return backupIdentifier;
}

public void setBackupIdentifier(String backupIdentifier)
{
this.backupIdentifier = backupIdentifier;
}

public String getToken()
{
return myInstance.getToken();
}

public String getInstanceId()
{
return myInstance.getInstanceId();
}

public String getHostIP()
{
return myInstance.getHostIP();
}

public String getHostName()
{
return myInstance.getHostName();
}

public boolean isExternallyDefinedToken()
{
return StringUtils.isNotBlank(getToken());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,5 @@ public void setUpdatetime(long updatetime) {
this.updatetime = updatetime;
}

public boolean isOutOfService() {
return outOfService;
}

public void setOutOfService(boolean outOfService) {
this.outOfService = outOfService;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,23 @@ public String getReplaceIp() {

private String findReplaceIp(List<PriamInstance> allIds, String token, String location) {
String ip = null;
for (PriamInstance ins : allIds) {
logger.info("Calling getIp on hostname[{}] and token[{}]", ins.getHostName(), token);
if (ins.getToken().equals(token) || !ins.getDC().equals(location)) { //avoid using dead instance and other regions' instances
continue;
}

try {
ip = getIp(ins.getHostName(), token);
} catch (ParseException e) {
ip = null;
}

if (ip != null) {
logger.info("Found the IP: {}", ip);
return ip;
if (token != null) {
for (PriamInstance ins : allIds) {
logger.info("Calling getIp on hostname[{}] and token[{}]", ins.getHostName(), token);
if (token.equalsIgnoreCase(ins.getToken()) || !ins.getDC().equals(location)) { //avoid using dead instance and other regions' instances
continue;
}

try {
ip = getIp(ins.getHostName(), token);
} catch (ParseException e) {
ip = null;
}

if (ip != null) {
logger.info("Found the IP: {}", ip);
return ip;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public NewTokenRetriever(IPriamInstanceFactory factory, IMembership membership,
@Override
public PriamInstance get() throws Exception {

logger.info("Generating my own and grabbing new token");
logger.info("Generating my own and grabbing new identifier");
// Sleep random interval - upto 15 sec
sleeper.sleep(new Random().nextInt(15000));
int hash = tokenManager.regionOffset(config.getDC());
Expand All @@ -74,10 +74,16 @@ public PriamInstance get() throws Exception {
} else
my_slot = config.getRacs().size() + maxSlot;

logger.info("Trying to createToken with slot {} with rac count {} with rac membership size {} with dc {}",
my_slot, membership.getRacCount(), membership.getRacMembershipSize(), config.getDC());
String payload = tokenManager.createToken(my_slot, membership.getRacCount(), membership.getRacMembershipSize(), config.getDC());
return factory.create(config.getAppName(), my_slot + hash, config.getInstanceName(), config.getHostname(), config.getHostIP(), config.getRac(), null, payload);
int identifier = my_slot + hash;
String token = null;
if (config.getNumTokens() == 1)
{
logger.info("Trying to createToken with slot {} with rac count {} with rac membership size {} with dc {}",
my_slot, membership.getRacCount(), membership.getRacMembershipSize(), config.getDC());
token = tokenManager.createToken(
my_slot, membership.getRacCount(), membership.getRacMembershipSize(), config.getDC());
}
return factory.create(config.getAppName(), identifier, config.getInstanceName(), config.getHostname(), config.getHostIP(), config.getRac(), null, token);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ private void notify(AbstractBackupPath abp, String uploadStatus) {
jsonObject.put("cf", abp.getColumnFamily());
jsonObject.put("region", abp.getRegion());
jsonObject.put("rack", this.config.getRac());
jsonObject.put("token", abp.getToken());
jsonObject.put("token", abp.getNodeIdentifier());
jsonObject.put("nodeIdentifier", abp.getNodeIdentifier());
jsonObject.put("filename", abp.getFileName());
jsonObject.put("uncompressfilesize", abp.getSize());
jsonObject.put("compressfilesize", abp.getCompressedFileSize());
Expand Down