Skip to content

Commit

Permalink
fix: use create time of database for retries
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Apr 10, 2020
1 parent 1e9c4a6 commit 605b672
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 24 deletions.
Expand Up @@ -148,6 +148,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

Expand Down Expand Up @@ -391,22 +392,20 @@ public boolean shouldRetry(

private final class OperationFutureCallable<RequestT, ResponseT, MetadataT extends Message>
implements Callable<OperationFuture<ResponseT, MetadataT>> {
final Class<MetadataT> metadataClass;
final OperationCallable<RequestT, ResponseT, MetadataT> operationCallable;
final RequestT initialRequest;
final String instanceName;
final OperationsLister lister;
final Function<MetadataT, Timestamp> getStartTimeFunction;
final Function<Operation, Timestamp> getStartTimeFunction;
Timestamp initialCallTime;
boolean isRetry = false;

OperationFutureCallable(
Class<MetadataT> metadataClass,
OperationCallable<RequestT, ResponseT, MetadataT> operationCallable,
RequestT initialRequest,
String instanceName,
OperationsLister lister,
Function<MetadataT, Timestamp> getStartTimeFunction) {
this.metadataClass = metadataClass;
Function<Operation, Timestamp> getStartTimeFunction) {
this.operationCallable = operationCallable;
this.initialRequest = initialRequest;
this.instanceName = instanceName;
Expand All @@ -422,11 +421,17 @@ public OperationFuture<ResponseT, MetadataT> call() throws Exception {
if (isRetry) {
// Query the backend to see if the operation was actually created, and that the
// problem was caused by a network problem or other transient problem client side.
Operation operation = mostRecentOperation(metadataClass, lister, getStartTimeFunction);
Operation operation = mostRecentOperation(lister, getStartTimeFunction, initialCallTime);
if (operation != null) {
// Operation found, resume tracking that operation.
operationName = operation.getName();
}
} else {
initialCallTime =
Timestamp.newBuilder()
.setSeconds(
TimeUnit.SECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS))
.build();
}
isRetry = true;

Expand All @@ -443,8 +448,10 @@ private interface OperationsLister {
Paginated<Operation> listOperations(String nextPageToken);
}

private <T extends Message> Operation mostRecentOperation(
Class<T> metadataClass, OperationsLister lister, Function<T, Timestamp> getStartTimeFunction)
private Operation mostRecentOperation(
OperationsLister lister,
Function<Operation, Timestamp> getStartTimeFunction,
Timestamp initialCallTime)
throws InvalidProtocolBufferException {
Operation res = null;
Timestamp currMaxStartTime = null;
Expand All @@ -453,16 +460,18 @@ private <T extends Message> Operation mostRecentOperation(
do {
operations = lister.listOperations(nextPageToken);
for (Operation op : operations.getResults()) {
T metadata = op.getMetadata().unpack(metadataClass);
Timestamp startTime = getStartTimeFunction.apply(metadata);
if (res == null || TimestampComparator.INSTANCE.compare(startTime, currMaxStartTime) > 0) {
Timestamp startTime = getStartTimeFunction.apply(op);
if (res == null
|| (TimestampComparator.INSTANCE.compare(startTime, currMaxStartTime) > 0
&& TimestampComparator.INSTANCE.compare(startTime, initialCallTime) >= 0)) {
currMaxStartTime = startTime;
res = op;
}
// If the operation does not report any start time, then the operation that is not yet done
// is the one that is the most recent.
if (startTime == null && currMaxStartTime == null && !op.getDone()) {
res = op;
break;
}
}
} while (operations.getNextPageToken() != null);
Expand Down Expand Up @@ -684,7 +693,6 @@ public OperationFuture<Database, CreateDatabaseMetadata> createDatabase(

OperationFutureCallable<CreateDatabaseRequest, Database, CreateDatabaseMetadata> callable =
new OperationFutureCallable<CreateDatabaseRequest, Database, CreateDatabaseMetadata>(
CreateDatabaseMetadata.class,
databaseAdminStub.createDatabaseOperationCallable(),
request,
instanceName,
Expand All @@ -701,9 +709,23 @@ public Paginated<Operation> listOperations(String nextPageToken) {
nextPageToken);
}
},
new Function<CreateDatabaseMetadata, Timestamp>() {
new Function<Operation, Timestamp>() {
@Override
public Timestamp apply(CreateDatabaseMetadata input) {
public Timestamp apply(Operation input) {
if (input.getDone() && input.hasResponse()) {
try {
Timestamp createTime =
input.getResponse().unpack(Database.class).getCreateTime();
if (Timestamp.getDefaultInstance().equals(createTime)) {
// Create time was not returned by the server (proto objects never return
// null, instead they return the default instance). Return null from this
// method to indicate that there is no known create time.
return null;
}
} catch (InvalidProtocolBufferException e) {
return null;
}
}
return null;
}
});
Expand Down Expand Up @@ -790,7 +812,6 @@ public OperationFuture<Backup, CreateBackupMetadata> createBackup(
.build();
OperationFutureCallable<CreateBackupRequest, Backup, CreateBackupMetadata> callable =
new OperationFutureCallable<CreateBackupRequest, Backup, CreateBackupMetadata>(
CreateBackupMetadata.class,
databaseAdminStub.createBackupOperationCallable(),
request,
instanceName,
Expand All @@ -807,10 +828,18 @@ public Paginated<Operation> listOperations(String nextPageToken) {
nextPageToken);
}
},
new Function<CreateBackupMetadata, Timestamp>() {
new Function<Operation, Timestamp>() {
@Override
public Timestamp apply(CreateBackupMetadata input) {
return input.getProgress().getStartTime();
public Timestamp apply(Operation input) {
try {
return input
.getMetadata()
.unpack(CreateBackupMetadata.class)
.getProgress()
.getStartTime();
} catch (InvalidProtocolBufferException e) {
return null;
}
}
});
return RetryHelper.runWithRetries(
Expand All @@ -835,7 +864,6 @@ public final OperationFuture<Database, RestoreDatabaseMetadata> restoreDatabase(

OperationFutureCallable<RestoreDatabaseRequest, Database, RestoreDatabaseMetadata> callable =
new OperationFutureCallable<RestoreDatabaseRequest, Database, RestoreDatabaseMetadata>(
RestoreDatabaseMetadata.class,
databaseAdminStub.restoreDatabaseOperationCallable(),
request,
databaseInstanceName,
Expand All @@ -852,10 +880,18 @@ public Paginated<Operation> listOperations(String nextPageToken) {
nextPageToken);
}
},
new Function<RestoreDatabaseMetadata, Timestamp>() {
new Function<Operation, Timestamp>() {
@Override
public Timestamp apply(RestoreDatabaseMetadata input) {
return input.getProgress().getStartTime();
public Timestamp apply(Operation input) {
try {
return input
.getMetadata()
.unpack(RestoreDatabaseMetadata.class)
.getProgress()
.getStartTime();
} catch (InvalidProtocolBufferException e) {
return null;
}
}
});
return RetryHelper.runWithRetries(
Expand Down
Expand Up @@ -269,6 +269,8 @@ public void testRetryNonIdempotentRpcsReturningLongRunningOperations() throws Ex

List<Database> databases = new ArrayList<>();
List<Backup> backups = new ArrayList<>();
String initialDatabaseId;
Timestamp initialDbCreateTime;

try {
// CreateDatabase
Expand All @@ -277,14 +279,19 @@ public void testRetryNonIdempotentRpcsReturningLongRunningOperations() throws Ex
SpannerOptions options =
testHelper.getOptions().toBuilder().setInterceptorProvider(createDbInterceptor).build();
try (Spanner spanner = options.getService()) {
String databaseId = testHelper.getUniqueDatabaseId();
initialDatabaseId = testHelper.getUniqueDatabaseId();
DatabaseAdminClient client = spanner.getDatabaseAdminClient();
OperationFuture<Database, CreateDatabaseMetadata> op =
client.createDatabase(
testHelper.getInstanceId().getInstance(),
databaseId,
initialDatabaseId,
Collections.<String>emptyList());
databases.add(op.get());
// Keep track of the original create time of this database, as we will drop this database
// later and create another one with the exact same name. That means that the ListOperations
// call will return at least two CreateDatabase operations. The retry logic should always
// pick the last one.
initialDbCreateTime = op.get().getCreateTime();
// Assert that the CreateDatabase RPC was called only once, and that the operation tracking
// was resumed through a GetOperation call.
assertThat(createDbInterceptor.methodCount.get()).isEqualTo(1);
Expand Down Expand Up @@ -363,6 +370,32 @@ public void testRetryNonIdempotentRpcsReturningLongRunningOperations() throws Ex
}
}
}

// Create another database with the exact same name as the first database.
createDbInterceptor = new InjectErrorInterceptorProvider("CreateDatabase");
options =
testHelper.getOptions().toBuilder().setInterceptorProvider(createDbInterceptor).build();
try (Spanner spanner = options.getService()) {
DatabaseAdminClient client = spanner.getDatabaseAdminClient();
// First drop the initial database.
client.dropDatabase(testHelper.getInstanceId().getInstance(), initialDatabaseId);
// Now re-create a database with the exact same name.
OperationFuture<Database, CreateDatabaseMetadata> op =
client.createDatabase(
testHelper.getInstanceId().getInstance(),
initialDatabaseId,
Collections.<String>emptyList());
// Check that the second database was created and has a greater creation time than the
// first.
Timestamp secondCreationTime = op.get().getCreateTime();
// TODO: Change this to greaterThan when the create time of a database is reported back by
// the server.
assertThat(secondCreationTime).isAtLeast(initialDbCreateTime);
// Assert that the CreateDatabase RPC was called only once, and that the operation tracking
// was resumed through a GetOperation call.
assertThat(createDbInterceptor.methodCount.get()).isEqualTo(1);
assertThat(createDbInterceptor.getOperationCount.get()).isAtLeast(1);
}
} finally {
DatabaseAdminClient client = testHelper.getClient().getDatabaseAdminClient();
for (Database database : databases) {
Expand Down

0 comments on commit 605b672

Please sign in to comment.