Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: retry cancelled error on first statement in transaction (#999)
If the first statement of a read/write transaction fails with a `CANCELLED` error and the error message is `Read/query was cancelled due to the enclosing transaction being invalidated by a later transaction in the same session.`, then the transaction should be retried, as the error could be caused by a previous statement that was abandoned by the client but still executed by the backend. This could be the case if the statement timed out (on the client) or was cancelled.

Fixes #938
  • Loading branch information
olavloite committed Mar 22, 2021
1 parent 39b0ec4 commit a95f6f8
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 22 deletions.
Expand Up @@ -701,7 +701,9 @@ public void close() {
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {}
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
return e;
}

@Override
public void onDone(boolean withBeginTransaction) {}
Expand Down
Expand Up @@ -81,8 +81,8 @@ interface Listener {
void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
throws SpannerException;

/** Called when the read finishes with an error. */
void onError(SpannerException e, boolean withBeginTransaction);
/** Called when the read finishes with an error. Returns the error that should be thrown. */
SpannerException onError(SpannerException e, boolean withBeginTransaction);

/** Called when the read finishes normally. */
void onDone(boolean withBeginTransaction);
Expand Down Expand Up @@ -159,9 +159,9 @@ public Type getType() {
}

private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
listener.onError(e, beginTransaction);
SpannerException toThrow = listener.onError(e, beginTransaction);
close();
throw e;
throw toThrow;
}
}
/**
Expand Down
Expand Up @@ -70,6 +70,11 @@
class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
private static final Tracer tracer = Tracing.getTracer();
private static final Logger txnLogger = Logger.getLogger(TransactionRunner.class.getName());
/**
* (Part of) the error message that is returned by Cloud Spanner if a transaction is cancelled
* because it was invalidated by a later transaction in the same session.
*/
private static final String TRANSACTION_CANCELLED_MESSAGE = "invalidated by a later transaction";

@VisibleForTesting
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {
Expand Down Expand Up @@ -372,8 +377,7 @@ public void run() {
}
span.addAnnotation("Commit Failed", TraceUtil.getExceptionAnnotations(e));
TraceUtil.endSpanWithFailure(opSpan, e);
onError((SpannerException) e, false);
res.setException(e);
res.setException(onError((SpannerException) e, false));
}
}
}),
Expand Down Expand Up @@ -519,7 +523,7 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
// If the statement that caused an error was the statement that included a BeginTransaction
// option, we simulate an aborted transaction to force a retry of the entire transaction. This
// will cause the retry to execute an explicit BeginTransaction RPC and then the actual
Expand All @@ -536,21 +540,41 @@ public void onError(SpannerException e, boolean withBeginTransaction) {
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
"Aborted due to failed initial statement", e, 0, 1)));
}
SpannerException exceptionToThrow;
if (withBeginTransaction
&& e.getErrorCode() == ErrorCode.CANCELLED
&& e.getMessage().contains(TRANSACTION_CANCELLED_MESSAGE)) {
// If the first statement of a transaction fails because it was invalidated by a later
// transaction, then the transaction should be retried with an explicit BeginTransaction
// RPC. It could be that this occurred because of a previous transaction that timed out or
// was cancelled by the client, but that was sent to Cloud Spanner and that was still active
// on the backend.
exceptionToThrow =
SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED,
e.getMessage(),
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
"Aborted due to failed initial statement", e, 0, 1));
} else {
exceptionToThrow = e;
}

if (e.getErrorCode() == ErrorCode.ABORTED) {
if (exceptionToThrow.getErrorCode() == ErrorCode.ABORTED) {
long delay = -1L;
if (e instanceof AbortedException) {
delay = ((AbortedException) e).getRetryDelayInMillis();
if (exceptionToThrow instanceof AbortedException) {
delay = ((AbortedException) exceptionToThrow).getRetryDelayInMillis();
}
if (delay == -1L) {
txnLogger.log(Level.FINE, "Retry duration is missing from the exception.", e);
txnLogger.log(
Level.FINE, "Retry duration is missing from the exception.", exceptionToThrow);
}

synchronized (lock) {
retryDelayInMillis = delay;
aborted = true;
}
}
return exceptionToThrow;
}

@Override
Expand Down Expand Up @@ -607,8 +631,8 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
// For standard DML, using the exact row count.
return resultSet.getStats().getRowCountExact();
} catch (Throwable t) {
onError(SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin());
throw t;
throw onError(
SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin());
}
}

Expand Down Expand Up @@ -661,8 +685,7 @@ public Long apply(ResultSet input) {
@Override
public Long apply(Throwable input) {
SpannerException e = SpannerExceptionFactory.asSpannerException(input);
onError(e, builder.getTransaction().hasBegin());
throw e;
throw onError(e, builder.getTransaction().hasBegin());
}
},
MoreExecutors.directExecutor());
Expand Down Expand Up @@ -730,8 +753,8 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
}
return results;
} catch (Throwable e) {
onError(SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin());
throw e;
throw onError(
SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin());
}
}

Expand Down Expand Up @@ -788,8 +811,7 @@ public long[] apply(ExecuteBatchDmlResponse batchDmlResponse) {
@Override
public long[] apply(Throwable input) {
SpannerException e = SpannerExceptionFactory.asSpannerException(input);
onError(e, builder.getTransaction().hasBegin());
throw e;
throw onError(e, builder.getTransaction().hasBegin());
}
},
MoreExecutors.directExecutor());
Expand Down
Expand Up @@ -60,7 +60,9 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
throws SpannerException {}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {}
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
return e;
}

@Override
public void onDone(boolean withBeginTransaction) {}
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import com.google.api.core.ApiAsyncFunction;
Expand Down Expand Up @@ -1711,6 +1712,88 @@ public long[] run(TransactionContext transaction) throws Exception {
assertThat(countRequests(ExecuteBatchDmlRequest.class)).isEqualTo(1);
assertThat(countRequests(CommitRequest.class)).isEqualTo(0);
}

@Test
public void testInlinedBeginTx_withCancelledOnFirstStatement() {
final Statement statement = Statement.of("INSERT INTO FOO (Id) VALUES (1)");
mockSpanner.putStatementResult(
StatementResult.exception(
statement,
Status.CANCELLED
.withDescription(
"Read/query was cancelled due to the enclosing transaction being invalidated by a later transaction in the same session.")
.asRuntimeException()));

DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
long updateCount =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
int attempt = 0;

@Override
public Long run(TransactionContext transaction) throws Exception {
if (attempt > 0) {
mockSpanner.putStatementResult(StatementResult.update(statement, 1L));
}
attempt++;
return transaction.executeUpdate(statement);
}
});
assertEquals(1L, updateCount);
// The transaction will be retried because the first statement that also tried to include the
// BeginTransaction statement failed with the specific CANCELLED error and did not return a
// transaction. That forces a retry of the entire transaction with an explicit
// BeginTransaction RPC.
assertEquals(1, countRequests(BeginTransactionRequest.class));
// The update statement will be executed 2 times:
assertEquals(2, countRequests(ExecuteSqlRequest.class));
// The transaction will attempt to commit once.
assertEquals(1, countRequests(CommitRequest.class));
// The first update will start a transaction, but then fail the update statement. This will
// start a transaction on the mock server, but that transaction will never be returned to the
// client.
assertEquals(2, countTransactionsStarted());
}

@Test
public void testInlinedBeginTx_withStickyCancelledOnFirstStatement() {
final Statement statement = Statement.of("INSERT INTO FOO (Id) VALUES (1)");
mockSpanner.putStatementResult(
StatementResult.exception(
statement,
Status.CANCELLED
.withDescription(
"Read/query was cancelled due to the enclosing transaction being invalidated by a later transaction in the same session.")
.asRuntimeException()));

DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
// The CANCELLED error is thrown both on the first and second attempt. The second attempt will
// not be retried, as it did not include a BeginTransaction option.
try {
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
return transaction.executeUpdate(statement);
}
});
fail("missing expected exception");
} catch (SpannerException e) {
assertEquals(ErrorCode.CANCELLED, e.getErrorCode());
}
assertEquals(1, countRequests(BeginTransactionRequest.class));
// The update statement will be executed 2 times:
assertEquals(2, countRequests(ExecuteSqlRequest.class));
// The transaction will never attempt to commit.
assertEquals(0, countRequests(CommitRequest.class));
assertEquals(2, countTransactionsStarted());
}
}

private static int countRequests(Class<? extends AbstractMessage> requestType) {
Expand Down
Expand Up @@ -48,7 +48,9 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
throws SpannerException {}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {}
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
return e;
}

@Override
public void onDone(boolean withBeginTransaction) {}
Expand Down

0 comments on commit a95f6f8

Please sign in to comment.