From f78c64e3e2bee6d6ed1f44a0b2e57249cba0e6d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Thu, 10 Dec 2020 00:31:07 +0100 Subject: [PATCH] fix: transaction retry could fail if tx contained failed statements (#688) Transaction retries in the Connection API / JDBC driver could fail if the following happened: 1. The initial transaction contains a statement that returns an error that does not invalidate the transaction, such as for example a "Table not found" error, and that error is caught and handled by the application code. 2. The retry attempt tries to execute the failed statement to verify that the statement still returns the same error. If however the transaction that is used by the retry has been aborted immediately before the execution of this statement, the statement will now return Aborted instead of the original error. That would be seen as a different error than the initial error and would fail the retry attempt. When the above happens, the Aborted error in the retry should be propagated and the retry attempt should be restarted. Fixes #685 --- .../spanner/connection/FailedBatchUpdate.java | 3 + .../cloud/spanner/connection/FailedQuery.java | 3 + .../spanner/connection/FailedUpdate.java | 3 + .../cloud/spanner/MockSpannerServiceImpl.java | 31 +-- .../cloud/spanner/connection/AbortedTest.java | 212 ++++++++++++++++++ 5 files changed, 237 insertions(+), 15 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedBatchUpdate.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedBatchUpdate.java index ba5c1b9020..6721e9b6ec 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedBatchUpdate.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedBatchUpdate.java @@ -58,6 +58,9 @@ public void retry(AbortedException aborted) throws AbortedException { transaction); try { transaction.getReadContext().batchUpdate(statements); + } catch (AbortedException e) { + // Propagate abort to force a new retry. + throw e; } catch (SpannerBatchUpdateException e) { // Check that we got the same exception as in the original transaction. if (exception instanceof SpannerBatchUpdateException diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedQuery.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedQuery.java index 3f0891124b..4a1e1b005c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedQuery.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedQuery.java @@ -69,6 +69,9 @@ public void retry(AbortedException aborted) throws AbortedException { // Do nothing with the results, we are only interested in whether the statement throws the // same exception as in the original transaction. } + } catch (AbortedException e) { + // Propagate abort to force a new retry. + throw e; } catch (SpannerException e) { // Check that we got the same exception as in the original transaction if (e.getErrorCode() == exception.getErrorCode() diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedUpdate.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedUpdate.java index 208711e985..836185fabe 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedUpdate.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/FailedUpdate.java @@ -54,6 +54,9 @@ public void retry(AbortedException aborted) throws AbortedException { .getStatementExecutor() .invokeInterceptors(statement, StatementExecutionStep.RETRY_STATEMENT, transaction); transaction.getReadContext().executeUpdate(statement.getStatement()); + } catch (AbortedException e) { + // Propagate abort to force a new retry. + throw e; } catch (SpannerException e) { // Check that we got the same exception as in the original transaction. if (e.getErrorCode() == exception.getErrorCode() diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index ca91bc35f8..149e0d2888 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -1702,25 +1702,26 @@ private void simulateAbort(Session session, ByteString transactionId) { if (isReadWriteTransaction(transactionId)) { if (abortNextStatement.getAndSet(false) || abortProbability > random.nextDouble()) { rollbackTransaction(transactionId); - RetryInfo retryInfo = - RetryInfo.newBuilder() - .setRetryDelay(Duration.newBuilder().setNanos(100).build()) - .build(); - Metadata.Key key = - Metadata.Key.of( - retryInfo.getDescriptorForType().getFullName() + Metadata.BINARY_HEADER_SUFFIX, - ProtoLiteUtils.metadataMarshaller(retryInfo)); - Metadata trailers = new Metadata(); - trailers.put(key, retryInfo); - throw Status.ABORTED - .withDescription( - String.format( - "Transaction with id %s has been aborted", transactionId.toStringUtf8())) - .asRuntimeException(trailers); + throw createAbortedException(transactionId); } } } + public StatusRuntimeException createAbortedException(ByteString transactionId) { + RetryInfo retryInfo = + RetryInfo.newBuilder().setRetryDelay(Duration.newBuilder().setNanos(100).build()).build(); + Metadata.Key key = + Metadata.Key.of( + retryInfo.getDescriptorForType().getFullName() + Metadata.BINARY_HEADER_SUFFIX, + ProtoLiteUtils.metadataMarshaller(retryInfo)); + Metadata trailers = new Metadata(); + trailers.put(key, retryInfo); + return Status.ABORTED + .withDescription( + String.format("Transaction with id %s has been aborted", transactionId.toStringUtf8())) + .asRuntimeException(trailers); + } + private void ensureMostRecentTransaction(Session session, ByteString transactionId) { AtomicLong counter = transactionCounters.get(session.getName()); if (transactionId != null && transactionId.toStringUtf8() != null && counter != null) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbortedTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbortedTest.java index 6716efb829..88401ef1bc 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbortedTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbortedTest.java @@ -16,16 +16,29 @@ package com.google.cloud.spanner.connection; +import static com.google.common.truth.Truth.assertThat; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.connection.ITAbstractSpannerTest.AbortInterceptor; import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection; import com.google.cloud.spanner.connection.it.ITTransactionRetryTest.CountTransactionRetryListener; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.spanner.v1.CommitRequest; +import com.google.spanner.v1.ExecuteBatchDmlRequest; +import com.google.spanner.v1.ExecuteSqlRequest; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.util.Arrays; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -71,4 +84,203 @@ public void testCommitAborted() { } } } + + @Test + public void testAbortedDuringRetryOfFailedQuery() { + final Statement invalidStatement = Statement.of("SELECT * FROM FOO"); + StatusRuntimeException notFound = + Status.NOT_FOUND.withDescription("Table not found").asRuntimeException(); + mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound)); + try (ITConnection connection = + createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) { + connection.execute(INSERT_STATEMENT); + try (ResultSet rs = connection.executeQuery(invalidStatement)) { + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND); + } + // Force an abort and retry. + mockSpanner.abortNextStatement(); + connection.commit(); + } + assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2); + // The transaction will be executed 3 times, which means that there will be 6 + // ExecuteSqlRequests: + // 1. The initial attempt. + // 2. The first retry attempt. This will fail on the invalid statement as it is aborted. + // 3. the second retry attempt. This will succeed. + assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(6); + } + + @Test + public void testAbortedDuringRetryOfFailedUpdate() { + final Statement invalidStatement = Statement.of("INSERT INTO FOO"); + StatusRuntimeException notFound = + Status.NOT_FOUND.withDescription("Table not found").asRuntimeException(); + mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound)); + try (ITConnection connection = + createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) { + connection.execute(INSERT_STATEMENT); + try { + connection.execute(invalidStatement); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND); + } + // Force an abort and retry. + mockSpanner.abortNextStatement(); + connection.commit(); + } + assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2); + // The transaction will be executed 3 times, which means that there will be 6 + // ExecuteSqlRequests: + // 1. The initial attempt. + // 2. The first retry attempt. This will fail on the invalid statement as it is aborted. + // 3. the second retry attempt. This will succeed. + assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(6); + } + + @Test + public void testAbortedDuringRetryOfFailedBatchUpdate() { + final Statement invalidStatement = Statement.of("INSERT INTO FOO"); + StatusRuntimeException notFound = + Status.NOT_FOUND.withDescription("Table not found").asRuntimeException(); + mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound)); + try (ITConnection connection = + createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) { + connection.execute(INSERT_STATEMENT); + try { + connection.executeBatchUpdate(Arrays.asList(invalidStatement)); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND); + } + // Force an abort and retry. + mockSpanner.abortNextStatement(); + connection.commit(); + } + assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2); + assertThat(mockSpanner.countRequestsOfType(ExecuteBatchDmlRequest.class)).isEqualTo(3); + } + + @Test + public void testAbortedDuringRetryOfFailedQueryAsFirstStatement() { + final Statement invalidStatement = Statement.of("SELECT * FROM FOO"); + StatusRuntimeException notFound = + Status.NOT_FOUND.withDescription("Table not found").asRuntimeException(); + mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound)); + // Abort the invalid statement on the third retry (listener counts from 0). The first retry will + // be triggered by the client library because the first statement of the transaction failed. + // That means that it also failed to return a transaction, and the first retry is only executed + // in order to execute an explicit BeginTransaction RPC: + + // 1: First statement fails => Retry because no transaction was returned + // 2: BeginTransaction + Invalid statement + Insert + Commit (aborted) => Retry + // 3: First statement fails => Retry because no transaction was returned + // 4: BeginTransaction + Invalid statement (aborted) => Retry + // 5: First statement fails => Retry because no transaction was returned + // 6: BeginTransaction + Invalid statement + Insert + Commit => Success + + try (ITConnection connection = + createConnection(createAbortRetryListener(2, invalidStatement, notFound))) { + try (ResultSet rs = connection.executeQuery(invalidStatement)) { + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND); + } + connection.executeUpdate(INSERT_STATEMENT); + // Force an abort and retry. + mockSpanner.abortNextStatement(); + connection.commit(); + } + assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2); + // 6 times invalid query + 2 times INSERT. + assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(8); + } + + @Test + public void testAbortedDuringRetryOfFailedUpdateAsFirstStatement() { + final Statement invalidStatement = Statement.of("INSERT INTO FOO"); + StatusRuntimeException notFound = + Status.NOT_FOUND.withDescription("Table not found").asRuntimeException(); + mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound)); + try (ITConnection connection = + createConnection(createAbortRetryListener(2, invalidStatement, notFound))) { + try { + connection.execute(invalidStatement); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND); + } + connection.execute(INSERT_STATEMENT); + // Force an abort and retry. + mockSpanner.abortNextStatement(); + connection.commit(); + } + assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2); + assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(8); + } + + @Test + public void testAbortedDuringRetryOfFailedBatchUpdateAsFirstStatement() { + final Statement invalidStatement = Statement.of("INSERT INTO FOO"); + StatusRuntimeException notFound = + Status.NOT_FOUND.withDescription("Table not found").asRuntimeException(); + mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound)); + try (ITConnection connection = + createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) { + try { + connection.executeBatchUpdate(Arrays.asList(invalidStatement)); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND); + } + connection.execute(INSERT_STATEMENT); + // Force an abort and retry. + mockSpanner.abortNextStatement(); + connection.commit(); + } + assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2); + assertThat(mockSpanner.countRequestsOfType(ExecuteBatchDmlRequest.class)).isEqualTo(6); + } + + ITConnection createConnection(TransactionRetryListener listener) { + ITConnection connection = + super.createConnection( + ImmutableList.of(), ImmutableList.of(listener)); + connection.setAutocommit(false); + return connection; + } + + /** Creates a retry listener that will abort the first retry as well. */ + TransactionRetryListener createAbortFirstRetryListener( + final Statement invalidStatement, final StatusRuntimeException statementException) { + return createAbortRetryListener(0, invalidStatement, statementException); + } + + /** Creates a retry listener that will abort the n'th retry. */ + TransactionRetryListener createAbortRetryListener( + final int onAttempt, + final Statement invalidStatement, + final StatusRuntimeException statementException) { + return new TransactionRetryListener() { + @Override + public void retryStarting( + Timestamp transactionStarted, long transactionId, int retryAttempt) { + if (retryAttempt == onAttempt) { + mockSpanner.putStatementResult( + StatementResult.exception( + invalidStatement, + mockSpanner.createAbortedException(ByteString.copyFromUtf8("some-transaction")))); + } else { + mockSpanner.putStatementResult( + StatementResult.exception(invalidStatement, statementException)); + } + } + + @Override + public void retryFinished( + Timestamp transactionStarted, long transactionId, int retryAttempt, RetryResult result) {} + }; + } }