Skip to content

Commit

Permalink
feat: base transaction retries on error codes (#129)
Browse files Browse the repository at this point in the history
Port of googleapis/nodejs-firestore#953

Implements go/transaction-retry-matrix for Java.

Transaction retries are now dependent upon error code, and will only be retried on specific error codes.
This change takes care of all of the state tracking and cleanup necessary to retry transactions.
  • Loading branch information
schmidt-sebastian committed Apr 1, 2020
1 parent edfcecf commit 00b6eb3
Show file tree
Hide file tree
Showing 10 changed files with 580 additions and 293 deletions.
Expand Up @@ -37,12 +37,16 @@ private FirestoreException(String reason, Status status, @Nullable Throwable cau
this.status = status;
}

private FirestoreException(IOException exception, boolean retryable) {
super(exception, retryable);
private FirestoreException(String reason, ApiException exception) {
super(
reason,
exception,
exception.getStatusCode().getCode().getHttpStatusCode(),
exception.isRetryable());
}

private FirestoreException(ApiException exception) {
super(exception);
private FirestoreException(IOException exception, boolean retryable) {
super(exception, retryable);
}

/**
Expand Down Expand Up @@ -91,7 +95,16 @@ static FirestoreException networkException(IOException exception, boolean retrya
* @return The FirestoreException
*/
static FirestoreException apiException(ApiException exception) {
return new FirestoreException(exception);
return new FirestoreException(exception.getMessage(), exception);
}

/**
* Creates a FirestoreException from an ApiException.
*
* @return The FirestoreException
*/
static FirestoreException apiException(ApiException exception, String message) {
return new FirestoreException(message, exception);
}

@InternalApi
Expand Down
Expand Up @@ -17,10 +17,7 @@
package com.google.cloud.firestore;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.api.gax.rpc.ServerStreamingCallable;
Expand All @@ -29,16 +26,12 @@
import com.google.cloud.firestore.spi.v1.FirestoreRpc;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.firestore.v1.BatchGetDocumentsRequest;
import com.google.firestore.v1.BatchGetDocumentsResponse;
import com.google.firestore.v1.DatabaseRootName;
import com.google.protobuf.ByteString;
import io.grpc.Context;
import io.grpc.Status;
import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.security.SecureRandom;
Expand All @@ -48,7 +41,6 @@
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -63,10 +55,7 @@ class FirestoreImpl implements Firestore {
private static final String AUTO_ID_ALPHABET =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";

private static final Logger LOGGER = Logger.getLogger("Firestore");
private static final Tracer tracer = Tracing.getTracer();
private static final io.opencensus.trace.Status TOO_MANY_RETRIES_STATUS =
io.opencensus.trace.Status.ABORTED.withDescription("too many retries");

private final FirestoreRpc firestoreClient;
private final FirestoreOptions firestoreOptions;
Expand Down Expand Up @@ -291,17 +280,16 @@ public Query collectionGroup(@Nonnull final String collectionId) {
@Nonnull
@Override
public <T> ApiFuture<T> runTransaction(@Nonnull final Transaction.Function<T> updateFunction) {
return runTransaction(updateFunction, TransactionOptions.create());
return runAsyncTransaction(
new TransactionAsyncAdapter<>(updateFunction), TransactionOptions.create());
}

@Nonnull
@Override
public <T> ApiFuture<T> runTransaction(
@Nonnull final Transaction.Function<T> updateFunction,
@Nonnull TransactionOptions transactionOptions) {
SettableApiFuture<T> resultFuture = SettableApiFuture.create();
runTransaction(new TransactionAsyncAdapter<>(updateFunction), resultFuture, transactionOptions);
return resultFuture;
return runAsyncTransaction(new TransactionAsyncAdapter<>(updateFunction), transactionOptions);
}

@Nonnull
Expand All @@ -316,160 +304,16 @@ public <T> ApiFuture<T> runAsyncTransaction(
public <T> ApiFuture<T> runAsyncTransaction(
@Nonnull final Transaction.AsyncFunction<T> updateFunction,
@Nonnull TransactionOptions transactionOptions) {
SettableApiFuture<T> resultFuture = SettableApiFuture.create();
runTransaction(updateFunction, resultFuture, transactionOptions);
return resultFuture;
}

/** Transaction functions that returns its result in the provided SettableFuture. */
private <T> void runTransaction(
final Transaction.AsyncFunction<T> transactionCallback,
final SettableApiFuture<T> resultFuture,
final TransactionOptions options) {
// span is intentionally not ended here. It will be ended by runTransactionAttempt on success
// or error.
Span span = tracer.spanBuilder("CloudFirestore.Transaction").startSpan();
try (Scope s = tracer.withSpan(span)) {
runTransactionAttempt(transactionCallback, resultFuture, options, span);
}
}

private <T> void runTransactionAttempt(
final Transaction.AsyncFunction<T> transactionCallback,
final SettableApiFuture<T> resultFuture,
final TransactionOptions options,
final Span span) {
final Transaction transaction = new Transaction(this, options.getPreviousTransactionId());
final Executor userCallbackExecutor =
Context.currentContextExecutor(
options.getExecutor() != null ? options.getExecutor() : firestoreClient.getExecutor());

final int attemptsRemaining = options.getNumberOfAttempts() - 1;
span.addAnnotation(
"Start runTransaction",
ImmutableMap.of("attemptsRemaining", AttributeValue.longAttributeValue(attemptsRemaining)));

ApiFutures.addCallback(
transaction.begin(),
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable throwable) {
// Don't retry failed BeginTransaction requests.
rejectTransaction(throwable);
}

@Override
public void onSuccess(Void ignored) {
ApiFutures.addCallback(
invokeUserCallback(),
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable throwable) {
// This was a error in the user callback, forward the throwable.
rejectTransaction(throwable);
}

@Override
public void onSuccess(final T userResult) {
// Commit the transaction
ApiFutures.addCallback(
transaction.commit(),
new ApiFutureCallback<List<WriteResult>>() {
@Override
public void onFailure(Throwable throwable) {
// Retry failed commits.
maybeRetry(throwable);
}

@Override
public void onSuccess(List<WriteResult> writeResults) {
span.setStatus(io.opencensus.trace.Status.OK);
span.end();
resultFuture.set(userResult);
}
},
MoreExecutors.directExecutor());
}
},
MoreExecutors.directExecutor());
}

private SettableApiFuture<T> invokeUserCallback() {
// Execute the user callback on the provided executor.
final SettableApiFuture<T> callbackResult = SettableApiFuture.create();
userCallbackExecutor.execute(
new Runnable() {
@Override
public void run() {
try {
ApiFuture<T> updateCallback = transactionCallback.updateCallback(transaction);
ApiFutures.addCallback(
updateCallback,
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable t) {
callbackResult.setException(t);
}

@Override
public void onSuccess(T result) {
callbackResult.set(result);
}
},
MoreExecutors.directExecutor());
} catch (Throwable t) {
callbackResult.setException(t);
}
}
});
return callbackResult;
}

private void maybeRetry(Throwable throwable) {
if (attemptsRemaining > 0) {
span.addAnnotation("retrying");
runTransactionAttempt(
transactionCallback,
resultFuture,
new TransactionOptions(
attemptsRemaining, options.getExecutor(), transaction.getTransactionId()),
span);
} else {
span.setStatus(TOO_MANY_RETRIES_STATUS);
rejectTransaction(
FirestoreException.serverRejected(
Status.ABORTED,
throwable,
"Transaction was cancelled because of too many retries."));
}
}

private void rejectTransaction(final Throwable throwable) {
if (throwable instanceof ApiException) {
span.setStatus(TraceUtil.statusFromApiException((ApiException) throwable));
}
span.end();
if (transaction.isPending()) {
ApiFutures.addCallback(
transaction.rollback(),
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable throwable) {
resultFuture.setException(throwable);
}

@Override
public void onSuccess(Void ignored) {
resultFuture.setException(throwable);
}
},
MoreExecutors.directExecutor());
} else {
resultFuture.setException(throwable);
}
}
},
MoreExecutors.directExecutor());
transactionOptions.getExecutor() != null
? transactionOptions.getExecutor()
: firestoreClient.getExecutor());

TransactionRunner<T> transactionRunner =
new TransactionRunner<>(
this, updateFunction, userCallbackExecutor, transactionOptions.getNumberOfAttempts());
return transactionRunner.run();
}

/** Returns whether the user has opted into receiving dates as com.google.cloud.Timestamp. */
Expand Down
Expand Up @@ -61,25 +61,15 @@ public interface AsyncFunction<T> {
ApiFuture<T> updateCallback(Transaction transaction);
}

private final ByteString previousTransactionId;
private ByteString transactionId;
private boolean pending;
private @Nullable ByteString previousTransactionId;

Transaction(FirestoreImpl firestore, @Nullable ByteString previousTransactionId) {
Transaction(FirestoreImpl firestore, @Nullable Transaction previousTransaction) {
super(firestore);
this.previousTransactionId = previousTransactionId;
previousTransactionId = previousTransaction != null ? previousTransaction.transactionId : null;
}

@Nullable
ByteString getTransactionId() {
return transactionId;
}

boolean isPending() {
return pending;
}

/** Starts a transaction and obtains the transaction id from the server. */
/** Starts a transaction and obtains the transaction id. */
ApiFuture<Void> begin() {
BeginTransactionRequest.Builder beginTransaction = BeginTransactionRequest.newBuilder();
beginTransaction.setDatabase(firestore.getDatabaseName());
Expand All @@ -101,7 +91,6 @@ ApiFuture<Void> begin() {
@Override
public Void apply(BeginTransactionResponse beginTransactionResponse) {
transactionId = beginTransactionResponse.getTransaction();
pending = true;
return null;
}
},
Expand All @@ -110,14 +99,11 @@ public Void apply(BeginTransactionResponse beginTransactionResponse) {

/** Commits a transaction. */
ApiFuture<List<WriteResult>> commit() {
pending = false;
return super.commit(transactionId);
}

/** Rolls a transaction back and releases all read locks. */
ApiFuture<Void> rollback() {
pending = false;

RollbackRequest.Builder reqBuilder = RollbackRequest.newBuilder();
reqBuilder.setTransaction(transactionId);
reqBuilder.setDatabase(firestore.getDatabaseName());
Expand Down
Expand Up @@ -17,7 +17,6 @@
package com.google.cloud.firestore;

import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -29,12 +28,10 @@ public final class TransactionOptions {

private final int numberOfAttempts;
private final Executor executor;
private final ByteString previousTransactionId;

TransactionOptions(int maxAttempts, Executor executor, ByteString previousTransactionId) {
TransactionOptions(int maxAttempts, Executor executor) {
this.numberOfAttempts = maxAttempts;
this.executor = executor;
this.previousTransactionId = previousTransactionId;
}

public int getNumberOfAttempts() {
Expand All @@ -46,11 +43,6 @@ public Executor getExecutor() {
return executor;
}

@Nullable
ByteString getPreviousTransactionId() {
return previousTransactionId;
}

/**
* Create a default set of options suitable for most use cases. Transactions will be attempted 5
* times.
Expand All @@ -59,7 +51,7 @@ ByteString getPreviousTransactionId() {
*/
@Nonnull
public static TransactionOptions create() {
return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, null, null);
return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, null);
}

/**
Expand All @@ -71,7 +63,7 @@ public static TransactionOptions create() {
@Nonnull
public static TransactionOptions create(int numberOfAttempts) {
Preconditions.checkArgument(numberOfAttempts > 0, "You must allow at least one attempt");
return new TransactionOptions(numberOfAttempts, null, null);
return new TransactionOptions(numberOfAttempts, null);
}

/**
Expand All @@ -82,7 +74,7 @@ public static TransactionOptions create(int numberOfAttempts) {
*/
@Nonnull
public static TransactionOptions create(@Nonnull Executor executor) {
return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, executor, null);
return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, executor);
}

/**
Expand All @@ -95,6 +87,6 @@ public static TransactionOptions create(@Nonnull Executor executor) {
@Nonnull
public static TransactionOptions create(@Nonnull Executor executor, int numberOfAttempts) {
Preconditions.checkArgument(numberOfAttempts > 0, "You must allow at least one attempt");
return new TransactionOptions(numberOfAttempts, executor, null);
return new TransactionOptions(numberOfAttempts, executor);
}
}

0 comments on commit 00b6eb3

Please sign in to comment.