Skip to content

Commit

Permalink
Transaction retries
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Mar 26, 2020
1 parent 7db0714 commit fcd4ea1
Show file tree
Hide file tree
Showing 10 changed files with 575 additions and 293 deletions.
Expand Up @@ -37,12 +37,16 @@ private FirestoreException(String reason, Status status, @Nullable Throwable cau
this.status = status;
}

private FirestoreException(IOException exception, boolean retryable) {
super(exception, retryable);
private FirestoreException(String reason, ApiException exception) {
super(
reason,
exception,
exception.getStatusCode().getCode().getHttpStatusCode(),
exception.isRetryable());
}

private FirestoreException(ApiException exception) {
super(exception);
private FirestoreException(IOException exception, boolean retryable) {
super(exception, retryable);
}

/**
Expand Down Expand Up @@ -91,7 +95,16 @@ static FirestoreException networkException(IOException exception, boolean retrya
* @return The FirestoreException
*/
static FirestoreException apiException(ApiException exception) {
return new FirestoreException(exception);
return new FirestoreException(exception.getMessage(), exception);
}

/**
* Creates a FirestoreException from an ApiException.
*
* @return The FirestoreException
*/
static FirestoreException apiException(ApiException exception, String message) {
return new FirestoreException(message, exception);
}

@InternalApi
Expand Down
Expand Up @@ -17,10 +17,7 @@
package com.google.cloud.firestore;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.api.gax.rpc.ServerStreamingCallable;
Expand All @@ -29,16 +26,12 @@
import com.google.cloud.firestore.spi.v1.FirestoreRpc;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.firestore.v1.BatchGetDocumentsRequest;
import com.google.firestore.v1.BatchGetDocumentsResponse;
import com.google.firestore.v1.DatabaseRootName;
import com.google.protobuf.ByteString;
import io.grpc.Context;
import io.grpc.Status;
import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
Expand All @@ -47,7 +40,6 @@
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -62,10 +54,7 @@ class FirestoreImpl implements Firestore {
private static final String AUTO_ID_ALPHABET =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";

private static final Logger LOGGER = Logger.getLogger("Firestore");
private static final Tracer tracer = Tracing.getTracer();
private static final io.opencensus.trace.Status TOO_MANY_RETRIES_STATUS =
io.opencensus.trace.Status.ABORTED.withDescription("too many retries");

private final FirestoreRpc firestoreClient;
private final FirestoreOptions firestoreOptions;
Expand Down Expand Up @@ -290,17 +279,16 @@ public Query collectionGroup(@Nonnull final String collectionId) {
@Nonnull
@Override
public <T> ApiFuture<T> runTransaction(@Nonnull final Transaction.Function<T> updateFunction) {
return runTransaction(updateFunction, TransactionOptions.create());
return runAsyncTransaction(
new TransactionAsyncAdapter<>(updateFunction), TransactionOptions.create());
}

@Nonnull
@Override
public <T> ApiFuture<T> runTransaction(
@Nonnull final Transaction.Function<T> updateFunction,
@Nonnull TransactionOptions transactionOptions) {
SettableApiFuture<T> resultFuture = SettableApiFuture.create();
runTransaction(new TransactionAsyncAdapter<>(updateFunction), resultFuture, transactionOptions);
return resultFuture;
return runAsyncTransaction(new TransactionAsyncAdapter<>(updateFunction), transactionOptions);
}

@Nonnull
Expand All @@ -315,160 +303,16 @@ public <T> ApiFuture<T> runAsyncTransaction(
public <T> ApiFuture<T> runAsyncTransaction(
@Nonnull final Transaction.AsyncFunction<T> updateFunction,
@Nonnull TransactionOptions transactionOptions) {
SettableApiFuture<T> resultFuture = SettableApiFuture.create();
runTransaction(updateFunction, resultFuture, transactionOptions);
return resultFuture;
}

/** Transaction functions that returns its result in the provided SettableFuture. */
private <T> void runTransaction(
final Transaction.AsyncFunction<T> transactionCallback,
final SettableApiFuture<T> resultFuture,
final TransactionOptions options) {
// span is intentionally not ended here. It will be ended by runTransactionAttempt on success
// or error.
Span span = tracer.spanBuilder("CloudFirestore.Transaction").startSpan();
try (Scope s = tracer.withSpan(span)) {
runTransactionAttempt(transactionCallback, resultFuture, options, span);
}
}

private <T> void runTransactionAttempt(
final Transaction.AsyncFunction<T> transactionCallback,
final SettableApiFuture<T> resultFuture,
final TransactionOptions options,
final Span span) {
final Transaction transaction = new Transaction(this, options.getPreviousTransactionId());
final Executor userCallbackExecutor =
Context.currentContextExecutor(
options.getExecutor() != null ? options.getExecutor() : firestoreClient.getExecutor());

final int attemptsRemaining = options.getNumberOfAttempts() - 1;
span.addAnnotation(
"Start runTransaction",
ImmutableMap.of("attemptsRemaining", AttributeValue.longAttributeValue(attemptsRemaining)));

ApiFutures.addCallback(
transaction.begin(),
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable throwable) {
// Don't retry failed BeginTransaction requests.
rejectTransaction(throwable);
}

@Override
public void onSuccess(Void ignored) {
ApiFutures.addCallback(
invokeUserCallback(),
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable throwable) {
// This was a error in the user callback, forward the throwable.
rejectTransaction(throwable);
}

@Override
public void onSuccess(final T userResult) {
// Commit the transaction
ApiFutures.addCallback(
transaction.commit(),
new ApiFutureCallback<List<WriteResult>>() {
@Override
public void onFailure(Throwable throwable) {
// Retry failed commits.
maybeRetry(throwable);
}

@Override
public void onSuccess(List<WriteResult> writeResults) {
span.setStatus(io.opencensus.trace.Status.OK);
span.end();
resultFuture.set(userResult);
}
},
MoreExecutors.directExecutor());
}
},
MoreExecutors.directExecutor());
}

private SettableApiFuture<T> invokeUserCallback() {
// Execute the user callback on the provided executor.
final SettableApiFuture<T> callbackResult = SettableApiFuture.create();
userCallbackExecutor.execute(
new Runnable() {
@Override
public void run() {
try {
ApiFuture<T> updateCallback = transactionCallback.updateCallback(transaction);
ApiFutures.addCallback(
updateCallback,
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable t) {
callbackResult.setException(t);
}

@Override
public void onSuccess(T result) {
callbackResult.set(result);
}
},
MoreExecutors.directExecutor());
} catch (Throwable t) {
callbackResult.setException(t);
}
}
});
return callbackResult;
}

private void maybeRetry(Throwable throwable) {
if (attemptsRemaining > 0) {
span.addAnnotation("retrying");
runTransactionAttempt(
transactionCallback,
resultFuture,
new TransactionOptions(
attemptsRemaining, options.getExecutor(), transaction.getTransactionId()),
span);
} else {
span.setStatus(TOO_MANY_RETRIES_STATUS);
rejectTransaction(
FirestoreException.serverRejected(
Status.ABORTED,
throwable,
"Transaction was cancelled because of too many retries."));
}
}

private void rejectTransaction(final Throwable throwable) {
if (throwable instanceof ApiException) {
span.setStatus(TraceUtil.statusFromApiException((ApiException) throwable));
}
span.end();
if (transaction.isPending()) {
ApiFutures.addCallback(
transaction.rollback(),
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable throwable) {
resultFuture.setException(throwable);
}

@Override
public void onSuccess(Void ignored) {
resultFuture.setException(throwable);
}
},
MoreExecutors.directExecutor());
} else {
resultFuture.setException(throwable);
}
}
},
MoreExecutors.directExecutor());
transactionOptions.getExecutor() != null
? transactionOptions.getExecutor()
: firestoreClient.getExecutor());

TransactionRunner<T> transactionRunner =
new TransactionRunner<>(
this, updateFunction, userCallbackExecutor, transactionOptions.getNumberOfAttempts());
return transactionRunner.run();
}

/** Returns whether the user has opted into receiving dates as com.google.cloud.Timestamp. */
Expand Down
Expand Up @@ -61,25 +61,15 @@ public interface AsyncFunction<T> {
ApiFuture<T> updateCallback(Transaction transaction);
}

private final ByteString previousTransactionId;
private ByteString transactionId;
private boolean pending;
private @Nullable ByteString previousTransactionId;

Transaction(FirestoreImpl firestore, @Nullable ByteString previousTransactionId) {
Transaction(FirestoreImpl firestore, @Nullable Transaction previousTransaction) {
super(firestore);
this.previousTransactionId = previousTransactionId;
previousTransactionId = previousTransaction != null ? previousTransaction.transactionId : null;
}

@Nullable
ByteString getTransactionId() {
return transactionId;
}

boolean isPending() {
return pending;
}

/** Starts a transaction and obtains the transaction id from the server. */
/** Starts a transaction and obtains the transaction id. */
ApiFuture<Void> begin() {
BeginTransactionRequest.Builder beginTransaction = BeginTransactionRequest.newBuilder();
beginTransaction.setDatabase(firestore.getDatabaseName());
Expand All @@ -101,7 +91,6 @@ ApiFuture<Void> begin() {
@Override
public Void apply(BeginTransactionResponse beginTransactionResponse) {
transactionId = beginTransactionResponse.getTransaction();
pending = true;
return null;
}
},
Expand All @@ -110,14 +99,11 @@ public Void apply(BeginTransactionResponse beginTransactionResponse) {

/** Commits a transaction. */
ApiFuture<List<WriteResult>> commit() {
pending = false;
return super.commit(transactionId);
}

/** Rolls a transaction back and releases all read locks. */
ApiFuture<Void> rollback() {
pending = false;

RollbackRequest.Builder reqBuilder = RollbackRequest.newBuilder();
reqBuilder.setTransaction(transactionId);
reqBuilder.setDatabase(firestore.getDatabaseName());
Expand Down
Expand Up @@ -17,7 +17,6 @@
package com.google.cloud.firestore;

import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -29,12 +28,10 @@ public final class TransactionOptions {

private final int numberOfAttempts;
private final Executor executor;
private final ByteString previousTransactionId;

TransactionOptions(int maxAttempts, Executor executor, ByteString previousTransactionId) {
TransactionOptions(int maxAttempts, Executor executor) {
this.numberOfAttempts = maxAttempts;
this.executor = executor;
this.previousTransactionId = previousTransactionId;
}

public int getNumberOfAttempts() {
Expand All @@ -46,11 +43,6 @@ public Executor getExecutor() {
return executor;
}

@Nullable
ByteString getPreviousTransactionId() {
return previousTransactionId;
}

/**
* Create a default set of options suitable for most use cases. Transactions will be attempted 5
* times.
Expand All @@ -59,7 +51,7 @@ ByteString getPreviousTransactionId() {
*/
@Nonnull
public static TransactionOptions create() {
return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, null, null);
return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, null);
}

/**
Expand All @@ -71,7 +63,7 @@ public static TransactionOptions create() {
@Nonnull
public static TransactionOptions create(int numberOfAttempts) {
Preconditions.checkArgument(numberOfAttempts > 0, "You must allow at least one attempt");
return new TransactionOptions(numberOfAttempts, null, null);
return new TransactionOptions(numberOfAttempts, null);
}

/**
Expand All @@ -82,7 +74,7 @@ public static TransactionOptions create(int numberOfAttempts) {
*/
@Nonnull
public static TransactionOptions create(@Nonnull Executor executor) {
return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, executor, null);
return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, executor);
}

/**
Expand All @@ -95,6 +87,6 @@ public static TransactionOptions create(@Nonnull Executor executor) {
@Nonnull
public static TransactionOptions create(@Nonnull Executor executor, int numberOfAttempts) {
Preconditions.checkArgument(numberOfAttempts > 0, "You must allow at least one attempt");
return new TransactionOptions(numberOfAttempts, executor, null);
return new TransactionOptions(numberOfAttempts, executor);
}
}

0 comments on commit fcd4ea1

Please sign in to comment.