Skip to content

Commit

Permalink
fix: retry transactions that fail with expired transaction IDs (#447)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Nov 5, 2020
1 parent b529245 commit 5905438
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 234 deletions.
Expand Up @@ -65,7 +65,6 @@ public interface AsyncFunction<T> {
}

private final TransactionOptions transactionOptions;
@Nullable private final ByteString previousTransactionId;
private ByteString transactionId;

Transaction(
Expand All @@ -74,8 +73,11 @@ public interface AsyncFunction<T> {
@Nullable Transaction previousTransaction) {
super(firestore);
this.transactionOptions = transactionOptions;
this.previousTransactionId =
previousTransaction != null ? previousTransaction.transactionId : null;
this.transactionId = previousTransaction != null ? previousTransaction.transactionId : null;
}

public boolean hasTransactionId() {
return transactionId != null;
}

Transaction wrapResult(ApiFuture<WriteResult> result) {
Expand All @@ -89,11 +91,8 @@ ApiFuture<Void> begin() {
beginTransaction.setDatabase(firestore.getDatabaseName());

if (TransactionOptionsType.READ_WRITE.equals(transactionOptions.getType())
&& previousTransactionId != null) {
beginTransaction
.getOptionsBuilder()
.getReadWriteBuilder()
.setRetryTransaction(previousTransactionId);
&& transactionId != null) {
beginTransaction.getOptionsBuilder().getReadWriteBuilder().setRetryTransaction(transactionId);
} else if (TransactionOptionsType.READ_ONLY.equals(transactionOptions.getType())) {
final ReadOnly.Builder readOnlyBuilder = ReadOnly.newBuilder();
if (transactionOptions.getReadTime() != null) {
Expand Down
Expand Up @@ -105,23 +105,40 @@ ApiFuture<T> run() {
"Start runTransaction",
ImmutableMap.of("attemptsRemaining", AttributeValue.longAttributeValue(attemptsRemaining)));

final SettableApiFuture<Void> backoff = SettableApiFuture.create();
return ApiFutures.catchingAsync(
ApiFutures.transformAsync(
maybeRollback(), new RollbackCallback(), MoreExecutors.directExecutor()),
Throwable.class,
new RestartTransactionCallback(),
MoreExecutors.directExecutor());
}

// Add a backoff delay. At first, this is 0.
this.firestoreExecutor.schedule(
new Runnable() {
@Override
public void run() {
backoff.set(null);
}
},
nextBackoffAttempt.getRandomizedRetryDelay().toMillis(),
TimeUnit.MILLISECONDS);
private ApiFuture<Void> maybeRollback() {
return transaction.hasTransactionId()
? transaction.rollback()
: ApiFutures.<Void>immediateFuture(null);
}

nextBackoffAttempt = backoffAlgorithm.createNextAttempt(nextBackoffAttempt);
/** A callback that invokes the BeginTransaction callback. */
private class RollbackCallback implements ApiAsyncFunction<Void, T> {
@Override
public ApiFuture<T> apply(Void input) {
final SettableApiFuture<Void> backoff = SettableApiFuture.create();
// Add a backoff delay. At first, this is 0.
firestoreExecutor.schedule(
new Runnable() {
@Override
public void run() {
backoff.set(null);
}
},
nextBackoffAttempt.getRandomizedRetryDelay().toMillis(),
TimeUnit.MILLISECONDS);

return ApiFutures.transformAsync(
backoff, new BackoffCallback(), MoreExecutors.directExecutor());
nextBackoffAttempt = backoffAlgorithm.createNextAttempt(nextBackoffAttempt);
return ApiFutures.transformAsync(
backoff, new BackoffCallback(), MoreExecutors.directExecutor());
}
}

/**
Expand All @@ -138,7 +155,6 @@ public void run() {
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable t) {

callbackResult.setException(t);
}

Expand Down Expand Up @@ -168,12 +184,8 @@ public ApiFuture<T> apply(Void input) {
*/
private class BeginTransactionCallback implements ApiAsyncFunction<Void, T> {
public ApiFuture<T> apply(Void ignored) {
return ApiFutures.catchingAsync(
ApiFutures.transformAsync(
invokeUserCallback(), new UserFunctionCallback(), MoreExecutors.directExecutor()),
Throwable.class,
new RestartTransactionCallback(),
MoreExecutors.directExecutor());
return ApiFutures.transformAsync(
invokeUserCallback(), new UserFunctionCallback(), MoreExecutors.directExecutor());
}
}

Expand Down Expand Up @@ -217,10 +229,10 @@ public ApiFuture<T> apply(Throwable throwable) {
}

ApiException apiException = (ApiException) throwable;
if (isRetryableTransactionError(apiException)) {
if (transaction.hasTransactionId() && isRetryableTransactionError(apiException)) {
if (attemptsRemaining > 0) {
span.addAnnotation("retrying");
return rollbackAndContinue();
return run();
} else {
span.setStatus(TOO_MANY_RETRIES_STATUS);
final FirestoreException firestoreException =
Expand Down Expand Up @@ -251,39 +263,36 @@ private boolean isRetryableTransactionError(ApiException exception) {
case UNAUTHENTICATED:
case RESOURCE_EXHAUSTED:
return true;
case INVALID_ARGUMENT:
// The Firestore backend uses "INVALID_ARGUMENT" for transactions IDs that have expired.
// While INVALID_ARGUMENT is generally not retryable, we retry this specific case.
return exception.getMessage().contains("transaction has expired");
default:
return false;
}
}

/** Rolls the transaction back and attempts it again. */
private ApiFuture<T> rollbackAndContinue() {
return ApiFutures.transformAsync(
transaction.rollback(),
new ApiAsyncFunction<Void, T>() {
@Override
public ApiFuture<T> apply(Void input) {
return run();
}
},
MoreExecutors.directExecutor());
}

/** Rolls the transaction back and returns the error. */
private ApiFuture<T> rollbackAndReject(final Throwable throwable) {
final SettableApiFuture<T> failedTransaction = SettableApiFuture.create();
// We use `addListener()` since we want to return the original exception regardless of whether
// rollback() succeeds.
transaction
.rollback()
.addListener(
new Runnable() {
@Override
public void run() {
failedTransaction.setException(throwable);
}
},
MoreExecutors.directExecutor());

if (transaction.hasTransactionId()) {
// We use `addListener()` since we want to return the original exception regardless of
// whether rollback() succeeds.
transaction
.rollback()
.addListener(
new Runnable() {
@Override
public void run() {
failedTransaction.setException(throwable);
}
},
MoreExecutors.directExecutor());
} else {
failedTransaction.setException(throwable);
}

span.end();
return failedTransaction;
}
Expand Down
Expand Up @@ -36,7 +36,6 @@
import com.google.cloud.firestore.spi.v1.FirestoreRpc;
import com.google.firestore.v1.BatchWriteRequest;
import com.google.firestore.v1.BatchWriteResponse;
import com.google.protobuf.GeneratedMessageV3;
import com.google.rpc.Code;
import io.grpc.Status;
import java.util.ArrayList;
Expand Down Expand Up @@ -120,13 +119,6 @@ private ApiFuture<BatchWriteResponse> mergeResponses(ApiFuture<BatchWriteRespons
return ApiFutures.immediateFuture(response.build());
}

private void verifyRequests(List<BatchWriteRequest> requests, ResponseStubber responseStubber) {
int index = 0;
for (GeneratedMessageV3 request : responseStubber.keySet()) {
assertEquals(request, requests.get(index++));
}
}

@Before
public void before() {
doReturn(immediateExecutor).when(firestoreRpc).getExecutor();
Expand All @@ -150,10 +142,7 @@ public void hasSetMethod() throws Exception {
ApiFuture<WriteResult> result = bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result.get().getUpdateTime());
}

Expand All @@ -172,10 +161,7 @@ public void hasUpdateMethod() throws Exception {
ApiFuture<WriteResult> result = bulkWriter.update(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result.get().getUpdateTime());
}

Expand All @@ -192,10 +178,7 @@ public void hasDeleteMethod() throws Exception {
ApiFuture<WriteResult> result = bulkWriter.delete(doc1);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result.get().getUpdateTime());
}

Expand All @@ -214,10 +197,7 @@ public void hasCreateMethod() throws Exception {
ApiFuture<WriteResult> result = bulkWriter.create(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result.get().getUpdateTime());
}

Expand All @@ -236,10 +216,7 @@ public void surfacesErrors() throws Exception {
ApiFuture<WriteResult> result = bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
try {
result.get();
fail("set() should have failed");
Expand Down Expand Up @@ -274,10 +251,7 @@ public void addsWritesToNewBatchAfterFlush() throws Exception {
ApiFuture<WriteResult> result2 = bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
assertEquals(Timestamp.ofTimeSecondsAndNanos(1, 0), result1.get().getUpdateTime());
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
}
Expand Down Expand Up @@ -350,10 +324,7 @@ public void canSendWritesToSameDocInSameBatch() throws Exception {
bulkWriter.update(sameDoc, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
assertEquals(Timestamp.ofTimeSecondsAndNanos(1, 0), result1.get().getUpdateTime());
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
}
Expand All @@ -376,10 +347,7 @@ public void sendWritesToDifferentDocsInSameBatch() throws Exception {
ApiFuture<WriteResult> result2 = bulkWriter.update(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
assertEquals(Timestamp.ofTimeSecondsAndNanos(1, 0), result1.get().getUpdateTime());
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
}
Expand Down Expand Up @@ -413,9 +381,7 @@ public void sendBatchesWhenSizeLimitIsReached() throws Exception {
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
assertEquals(Timestamp.ofTimeSecondsAndNanos(3, 0), result3.get().getUpdateTime());

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());
verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
}

@Test
Expand Down Expand Up @@ -462,8 +428,7 @@ public void retriesIndividualWritesThatFailWithAbortedOrUnavailable() throws Exc
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
assertEquals(Timestamp.ofTimeSecondsAndNanos(3, 0), result3.get().getUpdateTime());

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());
responseStubber.verifyAllRequestsSent();
}

@Test
Expand Down

0 comments on commit 5905438

Please sign in to comment.