Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retry cancelled error on first statement in transaction #999

Merged
merged 2 commits into from Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -701,7 +701,9 @@ public void close() {
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {}
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
return e;
}

@Override
public void onDone(boolean withBeginTransaction) {}
Expand Down
Expand Up @@ -81,8 +81,8 @@ interface Listener {
void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
throws SpannerException;

/** Called when the read finishes with an error. */
void onError(SpannerException e, boolean withBeginTransaction);
/** Called when the read finishes with an error. Returns the error that should be thrown. */
SpannerException onError(SpannerException e, boolean withBeginTransaction);

/** Called when the read finishes normally. */
void onDone(boolean withBeginTransaction);
Expand Down Expand Up @@ -159,9 +159,9 @@ public Type getType() {
}

private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
listener.onError(e, beginTransaction);
SpannerException toThrow = listener.onError(e, beginTransaction);
close();
throw e;
throw toThrow;
}
}
/**
Expand Down
Expand Up @@ -372,8 +372,7 @@ public void run() {
}
span.addAnnotation("Commit Failed", TraceUtil.getExceptionAnnotations(e));
TraceUtil.endSpanWithFailure(opSpan, e);
onError((SpannerException) e, false);
res.setException(e);
res.setException(onError((SpannerException) e, false));
}
}
}),
Expand Down Expand Up @@ -519,7 +518,7 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
// If the statement that caused an error was the statement that included a BeginTransaction
// option, we simulate an aborted transaction to force a retry of the entire transaction. This
// will cause the retry to execute an explicit BeginTransaction RPC and then the actual
Expand All @@ -536,21 +535,41 @@ public void onError(SpannerException e, boolean withBeginTransaction) {
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
"Aborted due to failed initial statement", e, 0, 1)));
}
SpannerException exceptionToThrow;
if (withBeginTransaction
&& e.getErrorCode() == ErrorCode.CANCELLED
&& e.getMessage().contains("invalidated by a later transaction")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we extract a constant for this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// If the first statement of a transaction fails because it was invalidated by a later
// transaction, then the transaction should be retried with an explicit BeginTransaction
// RPC. It could be that this occurred because of a previous transaction that timed out or
// was cancelled by the client, but that was sent to Cloud Spanner and that was still active
// on the backend.
exceptionToThrow =
SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED,
e.getMessage(),
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
"Aborted due to failed initial statement", e, 0, 1));
} else {
exceptionToThrow = e;
}

if (e.getErrorCode() == ErrorCode.ABORTED) {
if (exceptionToThrow.getErrorCode() == ErrorCode.ABORTED) {
long delay = -1L;
if (e instanceof AbortedException) {
delay = ((AbortedException) e).getRetryDelayInMillis();
if (exceptionToThrow instanceof AbortedException) {
delay = ((AbortedException) exceptionToThrow).getRetryDelayInMillis();
}
if (delay == -1L) {
txnLogger.log(Level.FINE, "Retry duration is missing from the exception.", e);
txnLogger.log(
Level.FINE, "Retry duration is missing from the exception.", exceptionToThrow);
}

synchronized (lock) {
retryDelayInMillis = delay;
aborted = true;
}
}
return exceptionToThrow;
}

@Override
Expand Down Expand Up @@ -607,8 +626,8 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
// For standard DML, using the exact row count.
return resultSet.getStats().getRowCountExact();
} catch (Throwable t) {
onError(SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin());
throw t;
throw onError(
SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin());
}
}

Expand Down Expand Up @@ -661,8 +680,7 @@ public Long apply(ResultSet input) {
@Override
public Long apply(Throwable input) {
SpannerException e = SpannerExceptionFactory.asSpannerException(input);
onError(e, builder.getTransaction().hasBegin());
throw e;
throw onError(e, builder.getTransaction().hasBegin());
}
},
MoreExecutors.directExecutor());
Expand Down Expand Up @@ -730,8 +748,8 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
}
return results;
} catch (Throwable e) {
onError(SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin());
throw e;
throw onError(
SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin());
}
}

Expand Down Expand Up @@ -788,8 +806,7 @@ public long[] apply(ExecuteBatchDmlResponse batchDmlResponse) {
@Override
public long[] apply(Throwable input) {
SpannerException e = SpannerExceptionFactory.asSpannerException(input);
onError(e, builder.getTransaction().hasBegin());
throw e;
throw onError(e, builder.getTransaction().hasBegin());
}
},
MoreExecutors.directExecutor());
Expand Down
Expand Up @@ -60,7 +60,9 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
throws SpannerException {}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {}
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
return e;
}

@Override
public void onDone(boolean withBeginTransaction) {}
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import com.google.api.core.ApiAsyncFunction;
Expand Down Expand Up @@ -1711,6 +1712,88 @@ public long[] run(TransactionContext transaction) throws Exception {
assertThat(countRequests(ExecuteBatchDmlRequest.class)).isEqualTo(1);
assertThat(countRequests(CommitRequest.class)).isEqualTo(0);
}

@Test
public void testInlinedBeginTx_withCancelledOnFirstStatement() {
final Statement statement = Statement.of("INSERT INTO FOO (Id) VALUES (1)");
mockSpanner.putStatementResult(
StatementResult.exception(
statement,
Status.CANCELLED
.withDescription(
"Read/query was cancelled due to the enclosing transaction being invalidated by a later transaction in the same session.")
.asRuntimeException()));

DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
long updateCount =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
int attempt = 0;

@Override
public Long run(TransactionContext transaction) throws Exception {
if (attempt > 0) {
mockSpanner.putStatementResult(StatementResult.update(statement, 1L));
}
attempt++;
return transaction.executeUpdate(statement);
}
});
assertEquals(1L, updateCount);
// The transaction will be retried because the first statement that also tried to include the
// BeginTransaction statement failed with the specific CANCELLED error and did not return a
// transaction. That forces a retry of the entire transaction with an explicit
// BeginTransaction RPC.
assertEquals(1, countRequests(BeginTransactionRequest.class));
// The update statement will be executed 2 times:
assertEquals(2, countRequests(ExecuteSqlRequest.class));
// The transaction will attempt to commit once.
assertEquals(1, countRequests(CommitRequest.class));
// The first update will start a transaction, but then fail the update statement. This will
// start a transaction on the mock server, but that transaction will never be returned to the
// client.
assertEquals(2, countTransactionsStarted());
}

@Test
public void testInlinedBeginTx_withStickyCancelledOnFirstStatement() {
final Statement statement = Statement.of("INSERT INTO FOO (Id) VALUES (1)");
mockSpanner.putStatementResult(
StatementResult.exception(
statement,
Status.CANCELLED
.withDescription(
"Read/query was cancelled due to the enclosing transaction being invalidated by a later transaction in the same session.")
.asRuntimeException()));

DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
// The CANCELLED error is thrown both on the first and second attempt. The second attempt will
// not be retried, as it did not include a BeginTransaction option.
try {
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
return transaction.executeUpdate(statement);
}
});
fail("missing expected exception");
} catch (SpannerException e) {
assertEquals(ErrorCode.CANCELLED, e.getErrorCode());
}
assertEquals(1, countRequests(BeginTransactionRequest.class));
// The update statement will be executed 2 times:
assertEquals(2, countRequests(ExecuteSqlRequest.class));
// The transaction will never attempt to commit.
assertEquals(0, countRequests(CommitRequest.class));
assertEquals(2, countTransactionsStarted());
}
}

private static int countRequests(Class<? extends AbstractMessage> requestType) {
Expand Down
Expand Up @@ -48,7 +48,9 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
throws SpannerException {}

@Override
public void onError(SpannerException e, boolean withBeginTransaction) {}
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
return e;
}

@Override
public void onDone(boolean withBeginTransaction) {}
Expand Down