Skip to content

Commit

Permalink
fix: transaction retry could fail if tx contained failed statements (#…
Browse files Browse the repository at this point in the history
…688)

Transaction retries in the Connection API / JDBC driver could fail if the following happened:
1. The initial transaction contains a statement that returns an error that does not invalidate
   the transaction, such as for example a "Table not found" error, and that error is caught and
   handled by the application code.
2. The retry attempt tries to execute the failed statement to verify that the statement still
   returns the same error. If however the transaction that is used by the retry has been aborted
   immediately before the execution of this statement, the statement will now return Aborted
   instead of the original error. That would be seen as a different error than the initial
   error and would fail the retry attempt.

When the above happens, the Aborted error in the retry should be propagated and the retry
attempt should be restarted.

Fixes #685
  • Loading branch information
olavloite committed Dec 9, 2020
1 parent 345c858 commit f78c64e
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 15 deletions.
Expand Up @@ -58,6 +58,9 @@ public void retry(AbortedException aborted) throws AbortedException {
transaction);
try {
transaction.getReadContext().batchUpdate(statements);
} catch (AbortedException e) {
// Propagate abort to force a new retry.
throw e;
} catch (SpannerBatchUpdateException e) {
// Check that we got the same exception as in the original transaction.
if (exception instanceof SpannerBatchUpdateException
Expand Down
Expand Up @@ -69,6 +69,9 @@ public void retry(AbortedException aborted) throws AbortedException {
// Do nothing with the results, we are only interested in whether the statement throws the
// same exception as in the original transaction.
}
} catch (AbortedException e) {
// Propagate abort to force a new retry.
throw e;
} catch (SpannerException e) {
// Check that we got the same exception as in the original transaction
if (e.getErrorCode() == exception.getErrorCode()
Expand Down
Expand Up @@ -54,6 +54,9 @@ public void retry(AbortedException aborted) throws AbortedException {
.getStatementExecutor()
.invokeInterceptors(statement, StatementExecutionStep.RETRY_STATEMENT, transaction);
transaction.getReadContext().executeUpdate(statement.getStatement());
} catch (AbortedException e) {
// Propagate abort to force a new retry.
throw e;
} catch (SpannerException e) {
// Check that we got the same exception as in the original transaction.
if (e.getErrorCode() == exception.getErrorCode()
Expand Down
Expand Up @@ -1702,25 +1702,26 @@ private void simulateAbort(Session session, ByteString transactionId) {
if (isReadWriteTransaction(transactionId)) {
if (abortNextStatement.getAndSet(false) || abortProbability > random.nextDouble()) {
rollbackTransaction(transactionId);
RetryInfo retryInfo =
RetryInfo.newBuilder()
.setRetryDelay(Duration.newBuilder().setNanos(100).build())
.build();
Metadata.Key<RetryInfo> key =
Metadata.Key.of(
retryInfo.getDescriptorForType().getFullName() + Metadata.BINARY_HEADER_SUFFIX,
ProtoLiteUtils.metadataMarshaller(retryInfo));
Metadata trailers = new Metadata();
trailers.put(key, retryInfo);
throw Status.ABORTED
.withDescription(
String.format(
"Transaction with id %s has been aborted", transactionId.toStringUtf8()))
.asRuntimeException(trailers);
throw createAbortedException(transactionId);
}
}
}

public StatusRuntimeException createAbortedException(ByteString transactionId) {
RetryInfo retryInfo =
RetryInfo.newBuilder().setRetryDelay(Duration.newBuilder().setNanos(100).build()).build();
Metadata.Key<RetryInfo> key =
Metadata.Key.of(
retryInfo.getDescriptorForType().getFullName() + Metadata.BINARY_HEADER_SUFFIX,
ProtoLiteUtils.metadataMarshaller(retryInfo));
Metadata trailers = new Metadata();
trailers.put(key, retryInfo);
return Status.ABORTED
.withDescription(
String.format("Transaction with id %s has been aborted", transactionId.toStringUtf8()))
.asRuntimeException(trailers);
}

private void ensureMostRecentTransaction(Session session, ByteString transactionId) {
AtomicLong counter = transactionCounters.get(session.getName());
if (transactionId != null && transactionId.toStringUtf8() != null && counter != null) {
Expand Down
Expand Up @@ -16,16 +16,29 @@

package com.google.cloud.spanner.connection;

import static com.google.common.truth.Truth.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.ITAbstractSpannerTest.AbortInterceptor;
import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection;
import com.google.cloud.spanner.connection.it.ITTransactionRetryTest.CountTransactionRetryListener;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -71,4 +84,203 @@ public void testCommitAborted() {
}
}
}

@Test
public void testAbortedDuringRetryOfFailedQuery() {
final Statement invalidStatement = Statement.of("SELECT * FROM FOO");
StatusRuntimeException notFound =
Status.NOT_FOUND.withDescription("Table not found").asRuntimeException();
mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound));
try (ITConnection connection =
createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) {
connection.execute(INSERT_STATEMENT);
try (ResultSet rs = connection.executeQuery(invalidStatement)) {
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND);
}
// Force an abort and retry.
mockSpanner.abortNextStatement();
connection.commit();
}
assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2);
// The transaction will be executed 3 times, which means that there will be 6
// ExecuteSqlRequests:
// 1. The initial attempt.
// 2. The first retry attempt. This will fail on the invalid statement as it is aborted.
// 3. the second retry attempt. This will succeed.
assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(6);
}

@Test
public void testAbortedDuringRetryOfFailedUpdate() {
final Statement invalidStatement = Statement.of("INSERT INTO FOO");
StatusRuntimeException notFound =
Status.NOT_FOUND.withDescription("Table not found").asRuntimeException();
mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound));
try (ITConnection connection =
createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) {
connection.execute(INSERT_STATEMENT);
try {
connection.execute(invalidStatement);
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND);
}
// Force an abort and retry.
mockSpanner.abortNextStatement();
connection.commit();
}
assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2);
// The transaction will be executed 3 times, which means that there will be 6
// ExecuteSqlRequests:
// 1. The initial attempt.
// 2. The first retry attempt. This will fail on the invalid statement as it is aborted.
// 3. the second retry attempt. This will succeed.
assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(6);
}

@Test
public void testAbortedDuringRetryOfFailedBatchUpdate() {
final Statement invalidStatement = Statement.of("INSERT INTO FOO");
StatusRuntimeException notFound =
Status.NOT_FOUND.withDescription("Table not found").asRuntimeException();
mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound));
try (ITConnection connection =
createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) {
connection.execute(INSERT_STATEMENT);
try {
connection.executeBatchUpdate(Arrays.asList(invalidStatement));
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND);
}
// Force an abort and retry.
mockSpanner.abortNextStatement();
connection.commit();
}
assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2);
assertThat(mockSpanner.countRequestsOfType(ExecuteBatchDmlRequest.class)).isEqualTo(3);
}

@Test
public void testAbortedDuringRetryOfFailedQueryAsFirstStatement() {
final Statement invalidStatement = Statement.of("SELECT * FROM FOO");
StatusRuntimeException notFound =
Status.NOT_FOUND.withDescription("Table not found").asRuntimeException();
mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound));
// Abort the invalid statement on the third retry (listener counts from 0). The first retry will
// be triggered by the client library because the first statement of the transaction failed.
// That means that it also failed to return a transaction, and the first retry is only executed
// in order to execute an explicit BeginTransaction RPC:

// 1: First statement fails => Retry because no transaction was returned
// 2: BeginTransaction + Invalid statement + Insert + Commit (aborted) => Retry
// 3: First statement fails => Retry because no transaction was returned
// 4: BeginTransaction + Invalid statement (aborted) => Retry
// 5: First statement fails => Retry because no transaction was returned
// 6: BeginTransaction + Invalid statement + Insert + Commit => Success

try (ITConnection connection =
createConnection(createAbortRetryListener(2, invalidStatement, notFound))) {
try (ResultSet rs = connection.executeQuery(invalidStatement)) {
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND);
}
connection.executeUpdate(INSERT_STATEMENT);
// Force an abort and retry.
mockSpanner.abortNextStatement();
connection.commit();
}
assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2);
// 6 times invalid query + 2 times INSERT.
assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(8);
}

@Test
public void testAbortedDuringRetryOfFailedUpdateAsFirstStatement() {
final Statement invalidStatement = Statement.of("INSERT INTO FOO");
StatusRuntimeException notFound =
Status.NOT_FOUND.withDescription("Table not found").asRuntimeException();
mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound));
try (ITConnection connection =
createConnection(createAbortRetryListener(2, invalidStatement, notFound))) {
try {
connection.execute(invalidStatement);
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND);
}
connection.execute(INSERT_STATEMENT);
// Force an abort and retry.
mockSpanner.abortNextStatement();
connection.commit();
}
assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2);
assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(8);
}

@Test
public void testAbortedDuringRetryOfFailedBatchUpdateAsFirstStatement() {
final Statement invalidStatement = Statement.of("INSERT INTO FOO");
StatusRuntimeException notFound =
Status.NOT_FOUND.withDescription("Table not found").asRuntimeException();
mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound));
try (ITConnection connection =
createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) {
try {
connection.executeBatchUpdate(Arrays.asList(invalidStatement));
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND);
}
connection.execute(INSERT_STATEMENT);
// Force an abort and retry.
mockSpanner.abortNextStatement();
connection.commit();
}
assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2);
assertThat(mockSpanner.countRequestsOfType(ExecuteBatchDmlRequest.class)).isEqualTo(6);
}

ITConnection createConnection(TransactionRetryListener listener) {
ITConnection connection =
super.createConnection(
ImmutableList.<StatementExecutionInterceptor>of(), ImmutableList.of(listener));
connection.setAutocommit(false);
return connection;
}

/** Creates a retry listener that will abort the first retry as well. */
TransactionRetryListener createAbortFirstRetryListener(
final Statement invalidStatement, final StatusRuntimeException statementException) {
return createAbortRetryListener(0, invalidStatement, statementException);
}

/** Creates a retry listener that will abort the n'th retry. */
TransactionRetryListener createAbortRetryListener(
final int onAttempt,
final Statement invalidStatement,
final StatusRuntimeException statementException) {
return new TransactionRetryListener() {
@Override
public void retryStarting(
Timestamp transactionStarted, long transactionId, int retryAttempt) {
if (retryAttempt == onAttempt) {
mockSpanner.putStatementResult(
StatementResult.exception(
invalidStatement,
mockSpanner.createAbortedException(ByteString.copyFromUtf8("some-transaction"))));
} else {
mockSpanner.putStatementResult(
StatementResult.exception(invalidStatement, statementException));
}
}

@Override
public void retryFinished(
Timestamp transactionStarted, long transactionId, int retryAttempt, RetryResult result) {}
};
}
}

0 comments on commit f78c64e

Please sign in to comment.