Skip to content

Commit

Permalink
fix: wrong use of getRetryDelayInMillis() / 1000 in documentation and…
Browse files Browse the repository at this point in the history
… retry loops (#885)

Both the documentation for `TransactionManager` as well as some internal retry loops wrongly used `SpannerException#getRetryDelayInMillis() / 1000` as input for `Thread.sleep(long)`. The retry delay is already in milliseconds and should not be modified.

Fixes #874
  • Loading branch information
olavloite committed Feb 19, 2021
1 parent b2b7bb2 commit a55d7ce
Show file tree
Hide file tree
Showing 16 changed files with 186 additions and 93 deletions.
Expand Up @@ -321,19 +321,19 @@ CommitResponse writeAtLeastOnceWithOptions(
* <pre>{@code
* long singerId = my_singer_id;
* try (TransactionManager manager = dbClient.transactionManager()) {
* TransactionContext txn = manager.begin();
* TransactionContext transaction = manager.begin();
* while (true) {
* String column = "FirstName";
* Struct row = txn.readRow("Singers", Key.of(singerId), Collections.singleton(column));
* Struct row = transaction.readRow("Singers", Key.of(singerId), Collections.singleton(column));
* String name = row.getString(column);
* txn.buffer(
* transaction.buffer(
* Mutation.newUpdateBuilder("Singers").set(column).to(name.toUpperCase()).build());
* try {
* manager.commit();
* break;
* } catch (AbortedException e) {
* Thread.sleep(e.getRetryDelayInMillis() / 1000);
* txn = manager.resetForRetry();
* Thread.sleep(e.getRetryDelayInMillis());
* transaction = manager.resetForRetry();
* }
* }
* }
Expand Down Expand Up @@ -385,19 +385,19 @@ CommitResponse writeAtLeastOnceWithOptions(
* <pre>{@code
* long singerId = 1L;
* try (AsyncTransactionManager manager = client.transactionManagerAsync()) {
* TransactionContextFuture txnFut = manager.beginAsync();
* TransactionContextFuture transactionFuture = manager.beginAsync();
* while (true) {
* String column = "FirstName";
* CommitTimestampFuture commitTimestamp =
* txnFut
* transactionFuture
* .then(
* (txn, __) ->
* txn.readRowAsync(
* (transaction, __) ->
* transaction.readRowAsync(
* "Singers", Key.of(singerId), Collections.singleton(column)))
* .then(
* (txn, row) -> {
* (transaction, row) -> {
* String name = row.getString(column);
* txn.buffer(
* transaction.buffer(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
Expand All @@ -409,8 +409,8 @@ CommitResponse writeAtLeastOnceWithOptions(
* commitTimestamp.get();
* break;
* } catch (AbortedException e) {
* Thread.sleep(e.getRetryDelayInMillis() / 1000);
* txnFut = manager.resetForRetryAsync();
* Thread.sleep(e.getRetryDelayInMillis());
* transactionFuture = manager.resetForRetryAsync();
* }
* }
* }
Expand All @@ -421,26 +421,26 @@ CommitResponse writeAtLeastOnceWithOptions(
* <pre>{@code
* final long singerId = 1L;
* try (AsyncTransactionManager manager = client().transactionManagerAsync()) {
* TransactionContextFuture txn = manager.beginAsync();
* TransactionContextFuture transactionFuture = manager.beginAsync();
* while (true) {
* final String column = "FirstName";
* CommitTimestampFuture commitTimestamp =
* txn.then(
* transactionFuture.then(
* new AsyncTransactionFunction<Void, Struct>() {
* @Override
* public ApiFuture<Struct> apply(TransactionContext txn, Void input)
* public ApiFuture<Struct> apply(TransactionContext transaction, Void input)
* throws Exception {
* return txn.readRowAsync(
* return transaction.readRowAsync(
* "Singers", Key.of(singerId), Collections.singleton(column));
* }
* })
* .then(
* new AsyncTransactionFunction<Struct, Void>() {
* @Override
* public ApiFuture<Void> apply(TransactionContext txn, Struct input)
* public ApiFuture<Void> apply(TransactionContext transaction, Struct input)
* throws Exception {
* String name = input.getString(column);
* txn.buffer(
* transaction.buffer(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
Expand All @@ -453,8 +453,8 @@ CommitResponse writeAtLeastOnceWithOptions(
* commitTimestamp.get();
* break;
* } catch (AbortedException e) {
* Thread.sleep(e.getRetryDelayInMillis() / 1000);
* txn = manager.resetForRetryAsync();
* Thread.sleep(e.getRetryDelayInMillis());
* transactionFuture = manager.resetForRetryAsync();
* }
* }
* }
Expand Down
Expand Up @@ -816,13 +816,21 @@ private TransactionContext internalBegin() {
}

@Override
public SpannerException handleSessionNotFound(SessionNotFoundException notFound) {
session = sessionPool.replaceSession(notFound, session);
public SpannerException handleSessionNotFound(SessionNotFoundException notFoundException) {
session = sessionPool.replaceSession(notFoundException, session);
PooledSession pooledSession = session.get();
delegate = pooledSession.delegate.transactionManager(options);
restartedAfterSessionNotFound = true;
return createAbortedExceptionWithMinimalRetryDelay(notFoundException);
}

private static SpannerException createAbortedExceptionWithMinimalRetryDelay(
SessionNotFoundException notFoundException) {
return SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED, notFound.getMessage(), notFound);
ErrorCode.ABORTED,
notFoundException.getMessage(),
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
notFoundException.getMessage(), notFoundException, 0, 1));
}

@Override
Expand Down
Expand Up @@ -24,9 +24,11 @@
import com.google.common.base.Predicate;
import com.google.rpc.ErrorInfo;
import com.google.rpc.ResourceInfo;
import com.google.rpc.RetryInfo;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.ProtoUtils;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -226,6 +228,28 @@ private static ErrorInfo extractErrorInfo(Throwable cause) {
return null;
}

/**
* Creates a {@link StatusRuntimeException} that contains a {@link RetryInfo} with the specified
* retry delay.
*/
static StatusRuntimeException createAbortedExceptionWithRetryDelay(
String message, Throwable cause, long retryDelaySeconds, int retryDelayNanos) {
Metadata.Key<RetryInfo> key = ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());
Metadata trailers = new Metadata();
RetryInfo retryInfo =
RetryInfo.newBuilder()
.setRetryDelay(
com.google.protobuf.Duration.newBuilder()
.setNanos(retryDelayNanos)
.setSeconds(retryDelaySeconds))
.build();
trailers.put(key, retryInfo);
return io.grpc.Status.ABORTED
.withDescription(message)
.withCause(cause)
.asRuntimeException(trailers);
}

static SpannerException newSpannerExceptionPreformatted(
ErrorCode code, @Nullable String message, @Nullable Throwable cause) {
// This is the one place in the codebase that is allowed to call constructors directly.
Expand Down
Expand Up @@ -531,7 +531,10 @@ public void onError(SpannerException e, boolean withBeginTransaction) {
// Simulate an aborted transaction to force a retry with a new transaction.
this.transactionIdFuture.setException(
SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED, "Aborted due to failed initial statement", e));
ErrorCode.ABORTED,
"Aborted due to failed initial statement",
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
"Aborted due to failed initial statement", e, 0, 1)));
}

if (e.getErrorCode() == ErrorCode.ABORTED) {
Expand Down Expand Up @@ -684,6 +687,19 @@ public void run() {
return updateCount;
}

private SpannerException createAbortedExceptionForBatchDml(ExecuteBatchDmlResponse response) {
// Manually construct an AbortedException with a 10ms retry delay for BatchDML responses that
// return an Aborted status (and not an AbortedException).
return newSpannerException(
ErrorCode.fromRpcStatus(response.getStatus()),
response.getStatus().getMessage(),
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
response.getStatus().getMessage(),
/* cause = */ null,
/* retryDelaySeconds = */ 0,
/* retryDelayNanos = */ (int) TimeUnit.MILLISECONDS.toNanos(10L)));
}

@Override
public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... options) {
beforeReadOrQuery();
Expand All @@ -705,8 +721,7 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
if (response.getStatus().getCode() == Code.ABORTED_VALUE) {
throw newSpannerException(
ErrorCode.fromRpcStatus(response.getStatus()), response.getStatus().getMessage());
throw createAbortedExceptionForBatchDml(response);
} else if (response.getStatus().getCode() != 0) {
throw newSpannerBatchUpdateException(
ErrorCode.fromRpcStatus(response.getStatus()),
Expand Down Expand Up @@ -741,25 +756,24 @@ public ApiFuture<long[]> batchUpdateAsync(
response,
new ApiFunction<ExecuteBatchDmlResponse, long[]>() {
@Override
public long[] apply(ExecuteBatchDmlResponse input) {
long[] results = new long[input.getResultSetsCount()];
for (int i = 0; i < input.getResultSetsCount(); ++i) {
results[i] = input.getResultSets(i).getStats().getRowCountExact();
if (input.getResultSets(i).getMetadata().hasTransaction()) {
public long[] apply(ExecuteBatchDmlResponse batchDmlResponse) {
long[] results = new long[batchDmlResponse.getResultSetsCount()];
for (int i = 0; i < batchDmlResponse.getResultSetsCount(); ++i) {
results[i] = batchDmlResponse.getResultSets(i).getStats().getRowCountExact();
if (batchDmlResponse.getResultSets(i).getMetadata().hasTransaction()) {
onTransactionMetadata(
input.getResultSets(i).getMetadata().getTransaction(),
batchDmlResponse.getResultSets(i).getMetadata().getTransaction(),
builder.getTransaction().hasBegin());
}
}
// If one of the DML statements was aborted, we should throw an aborted exception.
// In all other cases, we should throw a BatchUpdateException.
if (input.getStatus().getCode() == Code.ABORTED_VALUE) {
throw newSpannerException(
ErrorCode.fromRpcStatus(input.getStatus()), input.getStatus().getMessage());
} else if (input.getStatus().getCode() != 0) {
if (batchDmlResponse.getStatus().getCode() == Code.ABORTED_VALUE) {
throw createAbortedExceptionForBatchDml(batchDmlResponse);
} else if (batchDmlResponse.getStatus().getCode() != 0) {
throw newSpannerBatchUpdateException(
ErrorCode.fromRpcStatus(input.getStatus()),
input.getStatus().getMessage(),
ErrorCode.fromRpcStatus(batchDmlResponse.getStatus()),
batchDmlResponse.getStatus().getMessage(),
results);
}
return results;
Expand Down
Expand Up @@ -725,8 +725,11 @@ private void handleAborted(AbortedException aborted) {
logger.fine(toString() + ": Starting internal transaction retry");
while (true) {
// First back off and then restart the transaction.
long delay = aborted.getRetryDelayInMillis();
try {
Thread.sleep(aborted.getRetryDelayInMillis() / 1000);
if (delay > 0L) {
Thread.sleep(delay);
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw SpannerExceptionFactory.newSpannerException(
Expand Down
Expand Up @@ -1137,7 +1137,7 @@ public ApiFuture<Void> apply(TransactionContext txn, Struct input)
commitTimestamp.get();
break;
} catch (AbortedException e) {
Thread.sleep(e.getRetryDelayInMillis() / 1000);
Thread.sleep(e.getRetryDelayInMillis());
txn = manager.resetForRetryAsync();
}
}
Expand Down
Expand Up @@ -662,7 +662,7 @@ public void transactionManagerIsNonBlocking() throws Exception {
txManager.commit();
break;
} catch (AbortedException e) {
Thread.sleep(e.getRetryDelayInMillis() / 1000);
Thread.sleep(e.getRetryDelayInMillis());
tx = txManager.resetForRetry();
}
}
Expand Down Expand Up @@ -705,7 +705,7 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) {
txManager.commit();
break;
} catch (AbortedException e) {
Thread.sleep(e.getRetryDelayInMillis() / 1000);
Thread.sleep(e.getRetryDelayInMillis());
tx = txManager.resetForRetry();
}
}
Expand Down
Expand Up @@ -1732,7 +1732,7 @@ private void simulateAbort(Session session, ByteString transactionId) {

public StatusRuntimeException createAbortedException(ByteString transactionId) {
RetryInfo retryInfo =
RetryInfo.newBuilder().setRetryDelay(Duration.newBuilder().setNanos(100).build()).build();
RetryInfo.newBuilder().setRetryDelay(Duration.newBuilder().setNanos(1).build()).build();
Metadata.Key<RetryInfo> key =
Metadata.Key.of(
retryInfo.getDescriptorForType().getFullName() + Metadata.BINARY_HEADER_SUFFIX,
Expand Down

0 comments on commit a55d7ce

Please sign in to comment.