From a95f6f8dc21d27133a0150ea8df963e2bc543e40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Mon, 22 Mar 2021 10:34:02 +0100 Subject: [PATCH] fix: retry cancelled error on first statement in transaction (#999) If the first statement of a read/write transaction fails with a `CANCELLED` error and the error message is `Read/query was cancelled due to the enclosing transaction being invalidated by a later transaction in the same session.`, then the transaction should be retried, as the error could be caused by a previous statement that was abandoned by the client but still executed by the backend. This could be the case if the statement timed out (on the client) or was cancelled. Fixes #938 --- .../cloud/spanner/AbstractReadContext.java | 4 +- .../cloud/spanner/AbstractResultSet.java | 8 +- .../cloud/spanner/TransactionRunnerImpl.java | 52 ++++++++---- .../cloud/spanner/GrpcResultSetTest.java | 4 +- .../spanner/InlineBeginTransactionTest.java | 83 +++++++++++++++++++ .../cloud/spanner/ReadFormatTestRunner.java | 4 +- 6 files changed, 133 insertions(+), 22 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 5e0e30b7dc..b8d2315b98 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -701,7 +701,9 @@ public void close() { public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {} @Override - public void onError(SpannerException e, boolean withBeginTransaction) {} + public SpannerException onError(SpannerException e, boolean withBeginTransaction) { + return e; + } @Override public void onDone(boolean withBeginTransaction) {} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index 93f7e6cb0c..56cd1af47e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -81,8 +81,8 @@ interface Listener { void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) throws SpannerException; - /** Called when the read finishes with an error. */ - void onError(SpannerException e, boolean withBeginTransaction); + /** Called when the read finishes with an error. Returns the error that should be thrown. */ + SpannerException onError(SpannerException e, boolean withBeginTransaction); /** Called when the read finishes normally. */ void onDone(boolean withBeginTransaction); @@ -159,9 +159,9 @@ public Type getType() { } private SpannerException yieldError(SpannerException e, boolean beginTransaction) { - listener.onError(e, beginTransaction); + SpannerException toThrow = listener.onError(e, beginTransaction); close(); - throw e; + throw toThrow; } } /** diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index fef873112d..daa87fea02 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -70,6 +70,11 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner { private static final Tracer tracer = Tracing.getTracer(); private static final Logger txnLogger = Logger.getLogger(TransactionRunner.class.getName()); + /** + * (Part of) the error message that is returned by Cloud Spanner if a transaction is cancelled + * because it was invalidated by a later transaction in the same session. + */ + private static final String TRANSACTION_CANCELLED_MESSAGE = "invalidated by a later transaction"; @VisibleForTesting static class TransactionContextImpl extends AbstractReadContext implements TransactionContext { @@ -372,8 +377,7 @@ public void run() { } span.addAnnotation("Commit Failed", TraceUtil.getExceptionAnnotations(e)); TraceUtil.endSpanWithFailure(opSpan, e); - onError((SpannerException) e, false); - res.setException(e); + res.setException(onError((SpannerException) e, false)); } } }), @@ -519,7 +523,7 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude } @Override - public void onError(SpannerException e, boolean withBeginTransaction) { + public SpannerException onError(SpannerException e, boolean withBeginTransaction) { // If the statement that caused an error was the statement that included a BeginTransaction // option, we simulate an aborted transaction to force a retry of the entire transaction. This // will cause the retry to execute an explicit BeginTransaction RPC and then the actual @@ -536,14 +540,33 @@ public void onError(SpannerException e, boolean withBeginTransaction) { SpannerExceptionFactory.createAbortedExceptionWithRetryDelay( "Aborted due to failed initial statement", e, 0, 1))); } + SpannerException exceptionToThrow; + if (withBeginTransaction + && e.getErrorCode() == ErrorCode.CANCELLED + && e.getMessage().contains(TRANSACTION_CANCELLED_MESSAGE)) { + // If the first statement of a transaction fails because it was invalidated by a later + // transaction, then the transaction should be retried with an explicit BeginTransaction + // RPC. It could be that this occurred because of a previous transaction that timed out or + // was cancelled by the client, but that was sent to Cloud Spanner and that was still active + // on the backend. + exceptionToThrow = + SpannerExceptionFactory.newSpannerException( + ErrorCode.ABORTED, + e.getMessage(), + SpannerExceptionFactory.createAbortedExceptionWithRetryDelay( + "Aborted due to failed initial statement", e, 0, 1)); + } else { + exceptionToThrow = e; + } - if (e.getErrorCode() == ErrorCode.ABORTED) { + if (exceptionToThrow.getErrorCode() == ErrorCode.ABORTED) { long delay = -1L; - if (e instanceof AbortedException) { - delay = ((AbortedException) e).getRetryDelayInMillis(); + if (exceptionToThrow instanceof AbortedException) { + delay = ((AbortedException) exceptionToThrow).getRetryDelayInMillis(); } if (delay == -1L) { - txnLogger.log(Level.FINE, "Retry duration is missing from the exception.", e); + txnLogger.log( + Level.FINE, "Retry duration is missing from the exception.", exceptionToThrow); } synchronized (lock) { @@ -551,6 +574,7 @@ public void onError(SpannerException e, boolean withBeginTransaction) { aborted = true; } } + return exceptionToThrow; } @Override @@ -607,8 +631,8 @@ public long executeUpdate(Statement statement, UpdateOption... options) { // For standard DML, using the exact row count. return resultSet.getStats().getRowCountExact(); } catch (Throwable t) { - onError(SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin()); - throw t; + throw onError( + SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin()); } } @@ -661,8 +685,7 @@ public Long apply(ResultSet input) { @Override public Long apply(Throwable input) { SpannerException e = SpannerExceptionFactory.asSpannerException(input); - onError(e, builder.getTransaction().hasBegin()); - throw e; + throw onError(e, builder.getTransaction().hasBegin()); } }, MoreExecutors.directExecutor()); @@ -730,8 +753,8 @@ public long[] batchUpdate(Iterable statements, UpdateOption... option } return results; } catch (Throwable e) { - onError(SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin()); - throw e; + throw onError( + SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin()); } } @@ -788,8 +811,7 @@ public long[] apply(ExecuteBatchDmlResponse batchDmlResponse) { @Override public long[] apply(Throwable input) { SpannerException e = SpannerExceptionFactory.asSpannerException(input); - onError(e, builder.getTransaction().hasBegin()); - throw e; + throw onError(e, builder.getTransaction().hasBegin()); } }, MoreExecutors.directExecutor()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java index 8f8c7aada8..4961229b20 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java @@ -60,7 +60,9 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude throws SpannerException {} @Override - public void onError(SpannerException e, boolean withBeginTransaction) {} + public SpannerException onError(SpannerException e, boolean withBeginTransaction) { + return e; + } @Override public void onDone(boolean withBeginTransaction) {} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index 69204cb958..00c2f1ab47 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import com.google.api.core.ApiAsyncFunction; @@ -1711,6 +1712,88 @@ public long[] run(TransactionContext transaction) throws Exception { assertThat(countRequests(ExecuteBatchDmlRequest.class)).isEqualTo(1); assertThat(countRequests(CommitRequest.class)).isEqualTo(0); } + + @Test + public void testInlinedBeginTx_withCancelledOnFirstStatement() { + final Statement statement = Statement.of("INSERT INTO FOO (Id) VALUES (1)"); + mockSpanner.putStatementResult( + StatementResult.exception( + statement, + Status.CANCELLED + .withDescription( + "Read/query was cancelled due to the enclosing transaction being invalidated by a later transaction in the same session.") + .asRuntimeException())); + + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + long updateCount = + client + .readWriteTransaction() + .run( + new TransactionCallable() { + int attempt = 0; + + @Override + public Long run(TransactionContext transaction) throws Exception { + if (attempt > 0) { + mockSpanner.putStatementResult(StatementResult.update(statement, 1L)); + } + attempt++; + return transaction.executeUpdate(statement); + } + }); + assertEquals(1L, updateCount); + // The transaction will be retried because the first statement that also tried to include the + // BeginTransaction statement failed with the specific CANCELLED error and did not return a + // transaction. That forces a retry of the entire transaction with an explicit + // BeginTransaction RPC. + assertEquals(1, countRequests(BeginTransactionRequest.class)); + // The update statement will be executed 2 times: + assertEquals(2, countRequests(ExecuteSqlRequest.class)); + // The transaction will attempt to commit once. + assertEquals(1, countRequests(CommitRequest.class)); + // The first update will start a transaction, but then fail the update statement. This will + // start a transaction on the mock server, but that transaction will never be returned to the + // client. + assertEquals(2, countTransactionsStarted()); + } + + @Test + public void testInlinedBeginTx_withStickyCancelledOnFirstStatement() { + final Statement statement = Statement.of("INSERT INTO FOO (Id) VALUES (1)"); + mockSpanner.putStatementResult( + StatementResult.exception( + statement, + Status.CANCELLED + .withDescription( + "Read/query was cancelled due to the enclosing transaction being invalidated by a later transaction in the same session.") + .asRuntimeException())); + + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + // The CANCELLED error is thrown both on the first and second attempt. The second attempt will + // not be retried, as it did not include a BeginTransaction option. + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return transaction.executeUpdate(statement); + } + }); + fail("missing expected exception"); + } catch (SpannerException e) { + assertEquals(ErrorCode.CANCELLED, e.getErrorCode()); + } + assertEquals(1, countRequests(BeginTransactionRequest.class)); + // The update statement will be executed 2 times: + assertEquals(2, countRequests(ExecuteSqlRequest.class)); + // The transaction will never attempt to commit. + assertEquals(0, countRequests(CommitRequest.class)); + assertEquals(2, countTransactionsStarted()); + } } private static int countRequests(Class requestType) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java index e50639dc0e..05c79f1a33 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java @@ -48,7 +48,9 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude throws SpannerException {} @Override - public void onError(SpannerException e, boolean withBeginTransaction) {} + public SpannerException onError(SpannerException e, boolean withBeginTransaction) { + return e; + } @Override public void onDone(boolean withBeginTransaction) {}