Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retry transactions that fail with expired transaction IDs #447

Merged
merged 2 commits into from Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -65,7 +65,6 @@ public interface AsyncFunction<T> {
}

private final TransactionOptions transactionOptions;
@Nullable private final ByteString previousTransactionId;
private ByteString transactionId;

Transaction(
Expand All @@ -74,8 +73,11 @@ public interface AsyncFunction<T> {
@Nullable Transaction previousTransaction) {
super(firestore);
this.transactionOptions = transactionOptions;
this.previousTransactionId =
previousTransaction != null ? previousTransaction.transactionId : null;
this.transactionId = previousTransaction != null ? previousTransaction.transactionId : null;
}

public boolean hasTransactionId() {
return transactionId != null;
}

Transaction wrapResult(ApiFuture<WriteResult> result) {
Expand All @@ -89,11 +91,8 @@ ApiFuture<Void> begin() {
beginTransaction.setDatabase(firestore.getDatabaseName());

if (TransactionOptionsType.READ_WRITE.equals(transactionOptions.getType())
&& previousTransactionId != null) {
beginTransaction
.getOptionsBuilder()
.getReadWriteBuilder()
.setRetryTransaction(previousTransactionId);
&& transactionId != null) {
beginTransaction.getOptionsBuilder().getReadWriteBuilder().setRetryTransaction(transactionId);
} else if (TransactionOptionsType.READ_ONLY.equals(transactionOptions.getType())) {
final ReadOnly.Builder readOnlyBuilder = ReadOnly.newBuilder();
if (transactionOptions.getReadTime() != null) {
Expand Down
Expand Up @@ -105,23 +105,40 @@ ApiFuture<T> run() {
"Start runTransaction",
ImmutableMap.of("attemptsRemaining", AttributeValue.longAttributeValue(attemptsRemaining)));

final SettableApiFuture<Void> backoff = SettableApiFuture.create();
return ApiFutures.catchingAsync(
ApiFutures.transformAsync(
maybeRollback(), new RollbackCallback(), MoreExecutors.directExecutor()),
Throwable.class,
new RestartTransactionCallback(),
MoreExecutors.directExecutor());
}

// Add a backoff delay. At first, this is 0.
this.firestoreExecutor.schedule(
new Runnable() {
@Override
public void run() {
backoff.set(null);
}
},
nextBackoffAttempt.getRandomizedRetryDelay().toMillis(),
TimeUnit.MILLISECONDS);
private ApiFuture<Void> maybeRollback() {
return transaction.hasTransactionId()
? transaction.rollback()
: ApiFutures.<Void>immediateFuture(null);
}

nextBackoffAttempt = backoffAlgorithm.createNextAttempt(nextBackoffAttempt);
/** A callback that invokes the BeginTransaction callback. */
private class RollbackCallback implements ApiAsyncFunction<Void, T> {
@Override
public ApiFuture<T> apply(Void input) {
final SettableApiFuture<Void> backoff = SettableApiFuture.create();
// Add a backoff delay. At first, this is 0.
firestoreExecutor.schedule(
new Runnable() {
@Override
public void run() {
backoff.set(null);
}
},
nextBackoffAttempt.getRandomizedRetryDelay().toMillis(),
TimeUnit.MILLISECONDS);

return ApiFutures.transformAsync(
backoff, new BackoffCallback(), MoreExecutors.directExecutor());
nextBackoffAttempt = backoffAlgorithm.createNextAttempt(nextBackoffAttempt);
return ApiFutures.transformAsync(
backoff, new BackoffCallback(), MoreExecutors.directExecutor());
}
}

/**
Expand All @@ -138,7 +155,6 @@ public void run() {
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable t) {

callbackResult.setException(t);
}

Expand Down Expand Up @@ -168,12 +184,8 @@ public ApiFuture<T> apply(Void input) {
*/
private class BeginTransactionCallback implements ApiAsyncFunction<Void, T> {
public ApiFuture<T> apply(Void ignored) {
return ApiFutures.catchingAsync(
ApiFutures.transformAsync(
invokeUserCallback(), new UserFunctionCallback(), MoreExecutors.directExecutor()),
Throwable.class,
new RestartTransactionCallback(),
MoreExecutors.directExecutor());
return ApiFutures.transformAsync(
invokeUserCallback(), new UserFunctionCallback(), MoreExecutors.directExecutor());
}
}

Expand Down Expand Up @@ -217,10 +229,10 @@ public ApiFuture<T> apply(Throwable throwable) {
}

ApiException apiException = (ApiException) throwable;
if (isRetryableTransactionError(apiException)) {
if (transaction.hasTransactionId() && isRetryableTransactionError(apiException)) {
if (attemptsRemaining > 0) {
span.addAnnotation("retrying");
return rollbackAndContinue();
return run();
} else {
span.setStatus(TOO_MANY_RETRIES_STATUS);
final FirestoreException firestoreException =
Expand Down Expand Up @@ -251,39 +263,36 @@ private boolean isRetryableTransactionError(ApiException exception) {
case UNAUTHENTICATED:
case RESOURCE_EXHAUSTED:
return true;
case INVALID_ARGUMENT:
// The Firestore backend uses "INVALID_ARGUMENT" for transactions IDs that have expired.
// While INVALID_ARGUMENT is generally not retryable, we retry this specific case.
return exception.getMessage().contains("transaction has expired");
default:
return false;
}
}

/** Rolls the transaction back and attempts it again. */
private ApiFuture<T> rollbackAndContinue() {
return ApiFutures.transformAsync(
transaction.rollback(),
new ApiAsyncFunction<Void, T>() {
@Override
public ApiFuture<T> apply(Void input) {
return run();
}
},
MoreExecutors.directExecutor());
}

/** Rolls the transaction back and returns the error. */
private ApiFuture<T> rollbackAndReject(final Throwable throwable) {
final SettableApiFuture<T> failedTransaction = SettableApiFuture.create();
// We use `addListener()` since we want to return the original exception regardless of whether
// rollback() succeeds.
transaction
.rollback()
.addListener(
new Runnable() {
@Override
public void run() {
failedTransaction.setException(throwable);
}
},
MoreExecutors.directExecutor());

if (transaction.hasTransactionId()) {
// We use `addListener()` since we want to return the original exception regardless of
// whether rollback() succeeds.
transaction
.rollback()
.addListener(
new Runnable() {
@Override
public void run() {
failedTransaction.setException(throwable);
}
},
MoreExecutors.directExecutor());
} else {
failedTransaction.setException(throwable);
}

span.end();
return failedTransaction;
}
Expand Down
Expand Up @@ -36,7 +36,6 @@
import com.google.cloud.firestore.spi.v1.FirestoreRpc;
import com.google.firestore.v1.BatchWriteRequest;
import com.google.firestore.v1.BatchWriteResponse;
import com.google.protobuf.GeneratedMessageV3;
import com.google.rpc.Code;
import io.grpc.Status;
import java.util.ArrayList;
Expand Down Expand Up @@ -120,13 +119,6 @@ private ApiFuture<BatchWriteResponse> mergeResponses(ApiFuture<BatchWriteRespons
return ApiFutures.immediateFuture(response.build());
}

private void verifyRequests(List<BatchWriteRequest> requests, ResponseStubber responseStubber) {
int index = 0;
for (GeneratedMessageV3 request : responseStubber.keySet()) {
assertEquals(request, requests.get(index++));
}
}

@Before
public void before() {
doReturn(immediateExecutor).when(firestoreRpc).getExecutor();
Expand All @@ -150,10 +142,7 @@ public void hasSetMethod() throws Exception {
ApiFuture<WriteResult> result = bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result.get().getUpdateTime());
}

Expand All @@ -172,10 +161,7 @@ public void hasUpdateMethod() throws Exception {
ApiFuture<WriteResult> result = bulkWriter.update(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result.get().getUpdateTime());
}

Expand All @@ -192,10 +178,7 @@ public void hasDeleteMethod() throws Exception {
ApiFuture<WriteResult> result = bulkWriter.delete(doc1);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result.get().getUpdateTime());
}

Expand All @@ -214,10 +197,7 @@ public void hasCreateMethod() throws Exception {
ApiFuture<WriteResult> result = bulkWriter.create(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result.get().getUpdateTime());
}

Expand All @@ -236,10 +216,7 @@ public void surfacesErrors() throws Exception {
ApiFuture<WriteResult> result = bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
try {
result.get();
fail("set() should have failed");
Expand Down Expand Up @@ -274,10 +251,7 @@ public void addsWritesToNewBatchAfterFlush() throws Exception {
ApiFuture<WriteResult> result2 = bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
assertEquals(Timestamp.ofTimeSecondsAndNanos(1, 0), result1.get().getUpdateTime());
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
}
Expand Down Expand Up @@ -350,10 +324,7 @@ public void canSendWritesToSameDocInSameBatch() throws Exception {
bulkWriter.update(sameDoc, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
assertEquals(Timestamp.ofTimeSecondsAndNanos(1, 0), result1.get().getUpdateTime());
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
}
Expand All @@ -376,10 +347,7 @@ public void sendWritesToDifferentDocsInSameBatch() throws Exception {
ApiFuture<WriteResult> result2 = bulkWriter.update(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());

verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
assertEquals(Timestamp.ofTimeSecondsAndNanos(1, 0), result1.get().getUpdateTime());
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
}
Expand Down Expand Up @@ -413,9 +381,7 @@ public void sendBatchesWhenSizeLimitIsReached() throws Exception {
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
assertEquals(Timestamp.ofTimeSecondsAndNanos(3, 0), result3.get().getUpdateTime());

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());
verifyRequests(requests, responseStubber);
responseStubber.verifyAllRequestsSent();
}

@Test
Expand Down Expand Up @@ -462,8 +428,7 @@ public void retriesIndividualWritesThatFailWithAbortedOrUnavailable() throws Exc
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
assertEquals(Timestamp.ofTimeSecondsAndNanos(3, 0), result3.get().getUpdateTime());

List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
assertEquals(responseStubber.size(), requests.size());
responseStubber.verifyAllRequestsSent();
}

@Test
Expand Down