diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedBatchUpdate.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedBatchUpdate.java index ba5c1b9020..6721e9b6ec 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedBatchUpdate.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedBatchUpdate.java @@ -58,6 +58,9 @@ public void retry(AbortedException aborted) throws AbortedException { transaction); try { transaction.getReadContext().batchUpdate(statements); + } catch (AbortedException e) { + // Propagate abort to force a new retry. + throw e; } catch (SpannerBatchUpdateException e) { // Check that we got the same exception as in the original transaction. if (exception instanceof SpannerBatchUpdateException diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedQuery.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedQuery.java index 3f0891124b..4a1e1b005c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedQuery.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedQuery.java @@ -69,6 +69,9 @@ public void retry(AbortedException aborted) throws AbortedException { // Do nothing with the results, we are only interested in whether the statement throws the // same exception as in the original transaction. } + } catch (AbortedException e) { + // Propagate abort to force a new retry. + throw e; } catch (SpannerException e) { // Check that we got the same exception as in the original transaction if (e.getErrorCode() == exception.getErrorCode() diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedUpdate.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedUpdate.java index 208711e985..836185fabe 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedUpdate.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedUpdate.java @@ -54,6 +54,9 @@ public void retry(AbortedException aborted) throws AbortedException { .getStatementExecutor() .invokeInterceptors(statement, StatementExecutionStep.RETRY_STATEMENT, transaction); transaction.getReadContext().executeUpdate(statement.getStatement()); + } catch (AbortedException e) { + // Propagate abort to force a new retry. + throw e; } catch (SpannerException e) { // Check that we got the same exception as in the original transaction. if (e.getErrorCode() == exception.getErrorCode() diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index ca91bc35f8..149e0d2888 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -1702,25 +1702,26 @@ private void simulateAbort(Session session, ByteString transactionId) { if (isReadWriteTransaction(transactionId)) { if (abortNextStatement.getAndSet(false) || abortProbability > random.nextDouble()) { rollbackTransaction(transactionId); - RetryInfo retryInfo = - RetryInfo.newBuilder() - .setRetryDelay(Duration.newBuilder().setNanos(100).build()) - .build(); - Metadata.Key key = - Metadata.Key.of( - retryInfo.getDescriptorForType().getFullName() + Metadata.BINARY_HEADER_SUFFIX, - ProtoLiteUtils.metadataMarshaller(retryInfo)); - Metadata trailers = new Metadata(); - trailers.put(key, retryInfo); - throw Status.ABORTED - .withDescription( - String.format( - "Transaction with id %s has been aborted", transactionId.toStringUtf8())) - .asRuntimeException(trailers); + throw createAbortedException(transactionId); } } } + public StatusRuntimeException createAbortedException(ByteString transactionId) { + RetryInfo retryInfo = + RetryInfo.newBuilder().setRetryDelay(Duration.newBuilder().setNanos(100).build()).build(); + Metadata.Key key = + Metadata.Key.of( + retryInfo.getDescriptorForType().getFullName() + Metadata.BINARY_HEADER_SUFFIX, + ProtoLiteUtils.metadataMarshaller(retryInfo)); + Metadata trailers = new Metadata(); + trailers.put(key, retryInfo); + return Status.ABORTED + .withDescription( + String.format("Transaction with id %s has been aborted", transactionId.toStringUtf8())) + .asRuntimeException(trailers); + } + private void ensureMostRecentTransaction(Session session, ByteString transactionId) { AtomicLong counter = transactionCounters.get(session.getName()); if (transactionId != null && transactionId.toStringUtf8() != null && counter != null) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbortedTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbortedTest.java index 6716efb829..88401ef1bc 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbortedTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbortedTest.java @@ -16,16 +16,29 @@ package com.google.cloud.spanner.connection; +import static com.google.common.truth.Truth.assertThat; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.connection.ITAbstractSpannerTest.AbortInterceptor; import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection; import com.google.cloud.spanner.connection.it.ITTransactionRetryTest.CountTransactionRetryListener; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.spanner.v1.CommitRequest; +import com.google.spanner.v1.ExecuteBatchDmlRequest; +import com.google.spanner.v1.ExecuteSqlRequest; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.util.Arrays; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -71,4 +84,203 @@ public void testCommitAborted() { } } } + + @Test + public void testAbortedDuringRetryOfFailedQuery() { + final Statement invalidStatement = Statement.of("SELECT * FROM FOO"); + StatusRuntimeException notFound = + Status.NOT_FOUND.withDescription("Table not found").asRuntimeException(); + mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound)); + try (ITConnection connection = + createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) { + connection.execute(INSERT_STATEMENT); + try (ResultSet rs = connection.executeQuery(invalidStatement)) { + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND); + } + // Force an abort and retry. + mockSpanner.abortNextStatement(); + connection.commit(); + } + assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2); + // The transaction will be executed 3 times, which means that there will be 6 + // ExecuteSqlRequests: + // 1. The initial attempt. + // 2. The first retry attempt. This will fail on the invalid statement as it is aborted. + // 3. the second retry attempt. This will succeed. + assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(6); + } + + @Test + public void testAbortedDuringRetryOfFailedUpdate() { + final Statement invalidStatement = Statement.of("INSERT INTO FOO"); + StatusRuntimeException notFound = + Status.NOT_FOUND.withDescription("Table not found").asRuntimeException(); + mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound)); + try (ITConnection connection = + createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) { + connection.execute(INSERT_STATEMENT); + try { + connection.execute(invalidStatement); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND); + } + // Force an abort and retry. + mockSpanner.abortNextStatement(); + connection.commit(); + } + assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2); + // The transaction will be executed 3 times, which means that there will be 6 + // ExecuteSqlRequests: + // 1. The initial attempt. + // 2. The first retry attempt. This will fail on the invalid statement as it is aborted. + // 3. the second retry attempt. This will succeed. + assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(6); + } + + @Test + public void testAbortedDuringRetryOfFailedBatchUpdate() { + final Statement invalidStatement = Statement.of("INSERT INTO FOO"); + StatusRuntimeException notFound = + Status.NOT_FOUND.withDescription("Table not found").asRuntimeException(); + mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound)); + try (ITConnection connection = + createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) { + connection.execute(INSERT_STATEMENT); + try { + connection.executeBatchUpdate(Arrays.asList(invalidStatement)); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND); + } + // Force an abort and retry. + mockSpanner.abortNextStatement(); + connection.commit(); + } + assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2); + assertThat(mockSpanner.countRequestsOfType(ExecuteBatchDmlRequest.class)).isEqualTo(3); + } + + @Test + public void testAbortedDuringRetryOfFailedQueryAsFirstStatement() { + final Statement invalidStatement = Statement.of("SELECT * FROM FOO"); + StatusRuntimeException notFound = + Status.NOT_FOUND.withDescription("Table not found").asRuntimeException(); + mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound)); + // Abort the invalid statement on the third retry (listener counts from 0). The first retry will + // be triggered by the client library because the first statement of the transaction failed. + // That means that it also failed to return a transaction, and the first retry is only executed + // in order to execute an explicit BeginTransaction RPC: + + // 1: First statement fails => Retry because no transaction was returned + // 2: BeginTransaction + Invalid statement + Insert + Commit (aborted) => Retry + // 3: First statement fails => Retry because no transaction was returned + // 4: BeginTransaction + Invalid statement (aborted) => Retry + // 5: First statement fails => Retry because no transaction was returned + // 6: BeginTransaction + Invalid statement + Insert + Commit => Success + + try (ITConnection connection = + createConnection(createAbortRetryListener(2, invalidStatement, notFound))) { + try (ResultSet rs = connection.executeQuery(invalidStatement)) { + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND); + } + connection.executeUpdate(INSERT_STATEMENT); + // Force an abort and retry. + mockSpanner.abortNextStatement(); + connection.commit(); + } + assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2); + // 6 times invalid query + 2 times INSERT. + assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(8); + } + + @Test + public void testAbortedDuringRetryOfFailedUpdateAsFirstStatement() { + final Statement invalidStatement = Statement.of("INSERT INTO FOO"); + StatusRuntimeException notFound = + Status.NOT_FOUND.withDescription("Table not found").asRuntimeException(); + mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound)); + try (ITConnection connection = + createConnection(createAbortRetryListener(2, invalidStatement, notFound))) { + try { + connection.execute(invalidStatement); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND); + } + connection.execute(INSERT_STATEMENT); + // Force an abort and retry. + mockSpanner.abortNextStatement(); + connection.commit(); + } + assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2); + assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(8); + } + + @Test + public void testAbortedDuringRetryOfFailedBatchUpdateAsFirstStatement() { + final Statement invalidStatement = Statement.of("INSERT INTO FOO"); + StatusRuntimeException notFound = + Status.NOT_FOUND.withDescription("Table not found").asRuntimeException(); + mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound)); + try (ITConnection connection = + createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) { + try { + connection.executeBatchUpdate(Arrays.asList(invalidStatement)); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND); + } + connection.execute(INSERT_STATEMENT); + // Force an abort and retry. + mockSpanner.abortNextStatement(); + connection.commit(); + } + assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2); + assertThat(mockSpanner.countRequestsOfType(ExecuteBatchDmlRequest.class)).isEqualTo(6); + } + + ITConnection createConnection(TransactionRetryListener listener) { + ITConnection connection = + super.createConnection( + ImmutableList.of(), ImmutableList.of(listener)); + connection.setAutocommit(false); + return connection; + } + + /** Creates a retry listener that will abort the first retry as well. */ + TransactionRetryListener createAbortFirstRetryListener( + final Statement invalidStatement, final StatusRuntimeException statementException) { + return createAbortRetryListener(0, invalidStatement, statementException); + } + + /** Creates a retry listener that will abort the n'th retry. */ + TransactionRetryListener createAbortRetryListener( + final int onAttempt, + final Statement invalidStatement, + final StatusRuntimeException statementException) { + return new TransactionRetryListener() { + @Override + public void retryStarting( + Timestamp transactionStarted, long transactionId, int retryAttempt) { + if (retryAttempt == onAttempt) { + mockSpanner.putStatementResult( + StatementResult.exception( + invalidStatement, + mockSpanner.createAbortedException(ByteString.copyFromUtf8("some-transaction")))); + } else { + mockSpanner.putStatementResult( + StatementResult.exception(invalidStatement, statementException)); + } + } + + @Override + public void retryFinished( + Timestamp transactionStarted, long transactionId, int retryAttempt, RetryResult result) {} + }; + } }