Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: transaction retry could fail if tx contained failed statements #688

Merged
merged 1 commit into from Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -58,6 +58,9 @@ public void retry(AbortedException aborted) throws AbortedException {
transaction);
try {
transaction.getReadContext().batchUpdate(statements);
} catch (AbortedException e) {
// Propagate abort to force a new retry.
throw e;
} catch (SpannerBatchUpdateException e) {
// Check that we got the same exception as in the original transaction.
if (exception instanceof SpannerBatchUpdateException
Expand Down
Expand Up @@ -69,6 +69,9 @@ public void retry(AbortedException aborted) throws AbortedException {
// Do nothing with the results, we are only interested in whether the statement throws the
// same exception as in the original transaction.
}
} catch (AbortedException e) {
// Propagate abort to force a new retry.
throw e;
} catch (SpannerException e) {
// Check that we got the same exception as in the original transaction
if (e.getErrorCode() == exception.getErrorCode()
Expand Down
Expand Up @@ -54,6 +54,9 @@ public void retry(AbortedException aborted) throws AbortedException {
.getStatementExecutor()
.invokeInterceptors(statement, StatementExecutionStep.RETRY_STATEMENT, transaction);
transaction.getReadContext().executeUpdate(statement.getStatement());
} catch (AbortedException e) {
// Propagate abort to force a new retry.
throw e;
} catch (SpannerException e) {
// Check that we got the same exception as in the original transaction.
if (e.getErrorCode() == exception.getErrorCode()
Expand Down
Expand Up @@ -1702,25 +1702,26 @@ private void simulateAbort(Session session, ByteString transactionId) {
if (isReadWriteTransaction(transactionId)) {
if (abortNextStatement.getAndSet(false) || abortProbability > random.nextDouble()) {
rollbackTransaction(transactionId);
RetryInfo retryInfo =
RetryInfo.newBuilder()
.setRetryDelay(Duration.newBuilder().setNanos(100).build())
.build();
Metadata.Key<RetryInfo> key =
Metadata.Key.of(
retryInfo.getDescriptorForType().getFullName() + Metadata.BINARY_HEADER_SUFFIX,
ProtoLiteUtils.metadataMarshaller(retryInfo));
Metadata trailers = new Metadata();
trailers.put(key, retryInfo);
throw Status.ABORTED
.withDescription(
String.format(
"Transaction with id %s has been aborted", transactionId.toStringUtf8()))
.asRuntimeException(trailers);
throw createAbortedException(transactionId);
}
}
}

public StatusRuntimeException createAbortedException(ByteString transactionId) {
RetryInfo retryInfo =
RetryInfo.newBuilder().setRetryDelay(Duration.newBuilder().setNanos(100).build()).build();
Metadata.Key<RetryInfo> key =
Metadata.Key.of(
retryInfo.getDescriptorForType().getFullName() + Metadata.BINARY_HEADER_SUFFIX,
ProtoLiteUtils.metadataMarshaller(retryInfo));
Metadata trailers = new Metadata();
trailers.put(key, retryInfo);
return Status.ABORTED
.withDescription(
String.format("Transaction with id %s has been aborted", transactionId.toStringUtf8()))
.asRuntimeException(trailers);
}

private void ensureMostRecentTransaction(Session session, ByteString transactionId) {
AtomicLong counter = transactionCounters.get(session.getName());
if (transactionId != null && transactionId.toStringUtf8() != null && counter != null) {
Expand Down
Expand Up @@ -16,16 +16,29 @@

package com.google.cloud.spanner.connection;

import static com.google.common.truth.Truth.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.ITAbstractSpannerTest.AbortInterceptor;
import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection;
import com.google.cloud.spanner.connection.it.ITTransactionRetryTest.CountTransactionRetryListener;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -71,4 +84,203 @@ public void testCommitAborted() {
}
}
}

@Test
public void testAbortedDuringRetryOfFailedQuery() {
final Statement invalidStatement = Statement.of("SELECT * FROM FOO");
StatusRuntimeException notFound =
Status.NOT_FOUND.withDescription("Table not found").asRuntimeException();
mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound));
try (ITConnection connection =
createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) {
connection.execute(INSERT_STATEMENT);
try (ResultSet rs = connection.executeQuery(invalidStatement)) {
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND);
}
// Force an abort and retry.
mockSpanner.abortNextStatement();
connection.commit();
}
assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2);
// The transaction will be executed 3 times, which means that there will be 6
// ExecuteSqlRequests:
// 1. The initial attempt.
// 2. The first retry attempt. This will fail on the invalid statement as it is aborted.
// 3. the second retry attempt. This will succeed.
assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(6);
}

@Test
public void testAbortedDuringRetryOfFailedUpdate() {
final Statement invalidStatement = Statement.of("INSERT INTO FOO");
StatusRuntimeException notFound =
Status.NOT_FOUND.withDescription("Table not found").asRuntimeException();
mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound));
try (ITConnection connection =
createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) {
connection.execute(INSERT_STATEMENT);
try {
connection.execute(invalidStatement);
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND);
}
// Force an abort and retry.
mockSpanner.abortNextStatement();
connection.commit();
}
assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2);
// The transaction will be executed 3 times, which means that there will be 6
// ExecuteSqlRequests:
// 1. The initial attempt.
// 2. The first retry attempt. This will fail on the invalid statement as it is aborted.
// 3. the second retry attempt. This will succeed.
assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(6);
}

@Test
public void testAbortedDuringRetryOfFailedBatchUpdate() {
final Statement invalidStatement = Statement.of("INSERT INTO FOO");
StatusRuntimeException notFound =
Status.NOT_FOUND.withDescription("Table not found").asRuntimeException();
mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound));
try (ITConnection connection =
createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) {
connection.execute(INSERT_STATEMENT);
try {
connection.executeBatchUpdate(Arrays.asList(invalidStatement));
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND);
}
// Force an abort and retry.
mockSpanner.abortNextStatement();
connection.commit();
}
assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2);
assertThat(mockSpanner.countRequestsOfType(ExecuteBatchDmlRequest.class)).isEqualTo(3);
}

@Test
public void testAbortedDuringRetryOfFailedQueryAsFirstStatement() {
final Statement invalidStatement = Statement.of("SELECT * FROM FOO");
StatusRuntimeException notFound =
Status.NOT_FOUND.withDescription("Table not found").asRuntimeException();
mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound));
// Abort the invalid statement on the third retry (listener counts from 0). The first retry will
// be triggered by the client library because the first statement of the transaction failed.
// That means that it also failed to return a transaction, and the first retry is only executed
// in order to execute an explicit BeginTransaction RPC:

// 1: First statement fails => Retry because no transaction was returned
// 2: BeginTransaction + Invalid statement + Insert + Commit (aborted) => Retry
// 3: First statement fails => Retry because no transaction was returned
// 4: BeginTransaction + Invalid statement (aborted) => Retry
// 5: First statement fails => Retry because no transaction was returned
// 6: BeginTransaction + Invalid statement + Insert + Commit => Success

try (ITConnection connection =
createConnection(createAbortRetryListener(2, invalidStatement, notFound))) {
try (ResultSet rs = connection.executeQuery(invalidStatement)) {
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND);
}
connection.executeUpdate(INSERT_STATEMENT);
// Force an abort and retry.
mockSpanner.abortNextStatement();
connection.commit();
}
assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2);
// 6 times invalid query + 2 times INSERT.
assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(8);
}

@Test
public void testAbortedDuringRetryOfFailedUpdateAsFirstStatement() {
final Statement invalidStatement = Statement.of("INSERT INTO FOO");
StatusRuntimeException notFound =
Status.NOT_FOUND.withDescription("Table not found").asRuntimeException();
mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound));
try (ITConnection connection =
createConnection(createAbortRetryListener(2, invalidStatement, notFound))) {
try {
connection.execute(invalidStatement);
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND);
}
connection.execute(INSERT_STATEMENT);
// Force an abort and retry.
mockSpanner.abortNextStatement();
connection.commit();
}
assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2);
assertThat(mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)).isEqualTo(8);
}

@Test
public void testAbortedDuringRetryOfFailedBatchUpdateAsFirstStatement() {
final Statement invalidStatement = Statement.of("INSERT INTO FOO");
StatusRuntimeException notFound =
Status.NOT_FOUND.withDescription("Table not found").asRuntimeException();
mockSpanner.putStatementResult(StatementResult.exception(invalidStatement, notFound));
try (ITConnection connection =
createConnection(createAbortFirstRetryListener(invalidStatement, notFound))) {
try {
connection.executeBatchUpdate(Arrays.asList(invalidStatement));
fail("missing expected exception");
} catch (SpannerException e) {
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND);
}
connection.execute(INSERT_STATEMENT);
// Force an abort and retry.
mockSpanner.abortNextStatement();
connection.commit();
}
assertThat(mockSpanner.countRequestsOfType(CommitRequest.class)).isEqualTo(2);
assertThat(mockSpanner.countRequestsOfType(ExecuteBatchDmlRequest.class)).isEqualTo(6);
}

ITConnection createConnection(TransactionRetryListener listener) {
ITConnection connection =
super.createConnection(
ImmutableList.<StatementExecutionInterceptor>of(), ImmutableList.of(listener));
connection.setAutocommit(false);
return connection;
}

/** Creates a retry listener that will abort the first retry as well. */
TransactionRetryListener createAbortFirstRetryListener(
final Statement invalidStatement, final StatusRuntimeException statementException) {
return createAbortRetryListener(0, invalidStatement, statementException);
}

/** Creates a retry listener that will abort the n'th retry. */
TransactionRetryListener createAbortRetryListener(
final int onAttempt,
final Statement invalidStatement,
final StatusRuntimeException statementException) {
return new TransactionRetryListener() {
@Override
public void retryStarting(
Timestamp transactionStarted, long transactionId, int retryAttempt) {
if (retryAttempt == onAttempt) {
mockSpanner.putStatementResult(
StatementResult.exception(
invalidStatement,
mockSpanner.createAbortedException(ByteString.copyFromUtf8("some-transaction"))));
} else {
mockSpanner.putStatementResult(
StatementResult.exception(invalidStatement, statementException));
}
}

@Override
public void retryFinished(
Timestamp transactionStarted, long transactionId, int retryAttempt, RetryResult result) {}
};
}
}