diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 5e0e30b7dc..b8d2315b98 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -701,7 +701,9 @@ public void close() { public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {} @Override - public void onError(SpannerException e, boolean withBeginTransaction) {} + public SpannerException onError(SpannerException e, boolean withBeginTransaction) { + return e; + } @Override public void onDone(boolean withBeginTransaction) {} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index 93f7e6cb0c..56cd1af47e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -81,8 +81,8 @@ interface Listener { void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) throws SpannerException; - /** Called when the read finishes with an error. */ - void onError(SpannerException e, boolean withBeginTransaction); + /** Called when the read finishes with an error. Returns the error that should be thrown. */ + SpannerException onError(SpannerException e, boolean withBeginTransaction); /** Called when the read finishes normally. */ void onDone(boolean withBeginTransaction); @@ -159,9 +159,9 @@ public Type getType() { } private SpannerException yieldError(SpannerException e, boolean beginTransaction) { - listener.onError(e, beginTransaction); + SpannerException toThrow = listener.onError(e, beginTransaction); close(); - throw e; + throw toThrow; } } /** diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index fef873112d..daa87fea02 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -70,6 +70,11 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner { private static final Tracer tracer = Tracing.getTracer(); private static final Logger txnLogger = Logger.getLogger(TransactionRunner.class.getName()); + /** + * (Part of) the error message that is returned by Cloud Spanner if a transaction is cancelled + * because it was invalidated by a later transaction in the same session. + */ + private static final String TRANSACTION_CANCELLED_MESSAGE = "invalidated by a later transaction"; @VisibleForTesting static class TransactionContextImpl extends AbstractReadContext implements TransactionContext { @@ -372,8 +377,7 @@ public void run() { } span.addAnnotation("Commit Failed", TraceUtil.getExceptionAnnotations(e)); TraceUtil.endSpanWithFailure(opSpan, e); - onError((SpannerException) e, false); - res.setException(e); + res.setException(onError((SpannerException) e, false)); } } }), @@ -519,7 +523,7 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude } @Override - public void onError(SpannerException e, boolean withBeginTransaction) { + public SpannerException onError(SpannerException e, boolean withBeginTransaction) { // If the statement that caused an error was the statement that included a BeginTransaction // option, we simulate an aborted transaction to force a retry of the entire transaction. This // will cause the retry to execute an explicit BeginTransaction RPC and then the actual @@ -536,14 +540,33 @@ public void onError(SpannerException e, boolean withBeginTransaction) { SpannerExceptionFactory.createAbortedExceptionWithRetryDelay( "Aborted due to failed initial statement", e, 0, 1))); } + SpannerException exceptionToThrow; + if (withBeginTransaction + && e.getErrorCode() == ErrorCode.CANCELLED + && e.getMessage().contains(TRANSACTION_CANCELLED_MESSAGE)) { + // If the first statement of a transaction fails because it was invalidated by a later + // transaction, then the transaction should be retried with an explicit BeginTransaction + // RPC. It could be that this occurred because of a previous transaction that timed out or + // was cancelled by the client, but that was sent to Cloud Spanner and that was still active + // on the backend. + exceptionToThrow = + SpannerExceptionFactory.newSpannerException( + ErrorCode.ABORTED, + e.getMessage(), + SpannerExceptionFactory.createAbortedExceptionWithRetryDelay( + "Aborted due to failed initial statement", e, 0, 1)); + } else { + exceptionToThrow = e; + } - if (e.getErrorCode() == ErrorCode.ABORTED) { + if (exceptionToThrow.getErrorCode() == ErrorCode.ABORTED) { long delay = -1L; - if (e instanceof AbortedException) { - delay = ((AbortedException) e).getRetryDelayInMillis(); + if (exceptionToThrow instanceof AbortedException) { + delay = ((AbortedException) exceptionToThrow).getRetryDelayInMillis(); } if (delay == -1L) { - txnLogger.log(Level.FINE, "Retry duration is missing from the exception.", e); + txnLogger.log( + Level.FINE, "Retry duration is missing from the exception.", exceptionToThrow); } synchronized (lock) { @@ -551,6 +574,7 @@ public void onError(SpannerException e, boolean withBeginTransaction) { aborted = true; } } + return exceptionToThrow; } @Override @@ -607,8 +631,8 @@ public long executeUpdate(Statement statement, UpdateOption... options) { // For standard DML, using the exact row count. return resultSet.getStats().getRowCountExact(); } catch (Throwable t) { - onError(SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin()); - throw t; + throw onError( + SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin()); } } @@ -661,8 +685,7 @@ public Long apply(ResultSet input) { @Override public Long apply(Throwable input) { SpannerException e = SpannerExceptionFactory.asSpannerException(input); - onError(e, builder.getTransaction().hasBegin()); - throw e; + throw onError(e, builder.getTransaction().hasBegin()); } }, MoreExecutors.directExecutor()); @@ -730,8 +753,8 @@ public long[] batchUpdate(Iterable statements, UpdateOption... option } return results; } catch (Throwable e) { - onError(SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin()); - throw e; + throw onError( + SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin()); } } @@ -788,8 +811,7 @@ public long[] apply(ExecuteBatchDmlResponse batchDmlResponse) { @Override public long[] apply(Throwable input) { SpannerException e = SpannerExceptionFactory.asSpannerException(input); - onError(e, builder.getTransaction().hasBegin()); - throw e; + throw onError(e, builder.getTransaction().hasBegin()); } }, MoreExecutors.directExecutor()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java index 8f8c7aada8..4961229b20 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java @@ -60,7 +60,9 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude throws SpannerException {} @Override - public void onError(SpannerException e, boolean withBeginTransaction) {} + public SpannerException onError(SpannerException e, boolean withBeginTransaction) { + return e; + } @Override public void onDone(boolean withBeginTransaction) {} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index 69204cb958..00c2f1ab47 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import com.google.api.core.ApiAsyncFunction; @@ -1711,6 +1712,88 @@ public long[] run(TransactionContext transaction) throws Exception { assertThat(countRequests(ExecuteBatchDmlRequest.class)).isEqualTo(1); assertThat(countRequests(CommitRequest.class)).isEqualTo(0); } + + @Test + public void testInlinedBeginTx_withCancelledOnFirstStatement() { + final Statement statement = Statement.of("INSERT INTO FOO (Id) VALUES (1)"); + mockSpanner.putStatementResult( + StatementResult.exception( + statement, + Status.CANCELLED + .withDescription( + "Read/query was cancelled due to the enclosing transaction being invalidated by a later transaction in the same session.") + .asRuntimeException())); + + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + long updateCount = + client + .readWriteTransaction() + .run( + new TransactionCallable() { + int attempt = 0; + + @Override + public Long run(TransactionContext transaction) throws Exception { + if (attempt > 0) { + mockSpanner.putStatementResult(StatementResult.update(statement, 1L)); + } + attempt++; + return transaction.executeUpdate(statement); + } + }); + assertEquals(1L, updateCount); + // The transaction will be retried because the first statement that also tried to include the + // BeginTransaction statement failed with the specific CANCELLED error and did not return a + // transaction. That forces a retry of the entire transaction with an explicit + // BeginTransaction RPC. + assertEquals(1, countRequests(BeginTransactionRequest.class)); + // The update statement will be executed 2 times: + assertEquals(2, countRequests(ExecuteSqlRequest.class)); + // The transaction will attempt to commit once. + assertEquals(1, countRequests(CommitRequest.class)); + // The first update will start a transaction, but then fail the update statement. This will + // start a transaction on the mock server, but that transaction will never be returned to the + // client. + assertEquals(2, countTransactionsStarted()); + } + + @Test + public void testInlinedBeginTx_withStickyCancelledOnFirstStatement() { + final Statement statement = Statement.of("INSERT INTO FOO (Id) VALUES (1)"); + mockSpanner.putStatementResult( + StatementResult.exception( + statement, + Status.CANCELLED + .withDescription( + "Read/query was cancelled due to the enclosing transaction being invalidated by a later transaction in the same session.") + .asRuntimeException())); + + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + // The CANCELLED error is thrown both on the first and second attempt. The second attempt will + // not be retried, as it did not include a BeginTransaction option. + try { + client + .readWriteTransaction() + .run( + new TransactionCallable() { + @Override + public Long run(TransactionContext transaction) throws Exception { + return transaction.executeUpdate(statement); + } + }); + fail("missing expected exception"); + } catch (SpannerException e) { + assertEquals(ErrorCode.CANCELLED, e.getErrorCode()); + } + assertEquals(1, countRequests(BeginTransactionRequest.class)); + // The update statement will be executed 2 times: + assertEquals(2, countRequests(ExecuteSqlRequest.class)); + // The transaction will never attempt to commit. + assertEquals(0, countRequests(CommitRequest.class)); + assertEquals(2, countTransactionsStarted()); + } } private static int countRequests(Class requestType) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java index e50639dc0e..05c79f1a33 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java @@ -48,7 +48,9 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude throws SpannerException {} @Override - public void onError(SpannerException e, boolean withBeginTransaction) {} + public SpannerException onError(SpannerException e, boolean withBeginTransaction) { + return e; + } @Override public void onDone(boolean withBeginTransaction) {}