diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java index 2aaefb0c5..5fabd5e8b 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java @@ -37,12 +37,16 @@ private FirestoreException(String reason, Status status, @Nullable Throwable cau this.status = status; } - private FirestoreException(IOException exception, boolean retryable) { - super(exception, retryable); + private FirestoreException(String reason, ApiException exception) { + super( + reason, + exception, + exception.getStatusCode().getCode().getHttpStatusCode(), + exception.isRetryable()); } - private FirestoreException(ApiException exception) { - super(exception); + private FirestoreException(IOException exception, boolean retryable) { + super(exception, retryable); } /** @@ -91,7 +95,16 @@ static FirestoreException networkException(IOException exception, boolean retrya * @return The FirestoreException */ static FirestoreException apiException(ApiException exception) { - return new FirestoreException(exception); + return new FirestoreException(exception.getMessage(), exception); + } + + /** + * Creates a FirestoreException from an ApiException. + * + * @return The FirestoreException + */ + static FirestoreException apiException(ApiException exception, String message) { + return new FirestoreException(message, exception); } @InternalApi diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java index 77472d5f1..7c00b1dbd 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java @@ -17,10 +17,7 @@ package com.google.cloud.firestore; import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutureCallback; -import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; -import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.BidiStreamingCallable; import com.google.api.gax.rpc.ServerStreamingCallable; @@ -29,16 +26,12 @@ import com.google.cloud.firestore.spi.v1.FirestoreRpc; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.MoreExecutors; import com.google.firestore.v1.BatchGetDocumentsRequest; import com.google.firestore.v1.BatchGetDocumentsResponse; import com.google.firestore.v1.DatabaseRootName; import com.google.protobuf.ByteString; import io.grpc.Context; -import io.grpc.Status; -import io.opencensus.common.Scope; import io.opencensus.trace.AttributeValue; -import io.opencensus.trace.Span; import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; import java.security.SecureRandom; @@ -48,7 +41,6 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.Executor; -import java.util.logging.Logger; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -63,10 +55,7 @@ class FirestoreImpl implements Firestore { private static final String AUTO_ID_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; - private static final Logger LOGGER = Logger.getLogger("Firestore"); private static final Tracer tracer = Tracing.getTracer(); - private static final io.opencensus.trace.Status TOO_MANY_RETRIES_STATUS = - io.opencensus.trace.Status.ABORTED.withDescription("too many retries"); private final FirestoreRpc firestoreClient; private final FirestoreOptions firestoreOptions; @@ -291,7 +280,8 @@ public Query collectionGroup(@Nonnull final String collectionId) { @Nonnull @Override public ApiFuture runTransaction(@Nonnull final Transaction.Function updateFunction) { - return runTransaction(updateFunction, TransactionOptions.create()); + return runAsyncTransaction( + new TransactionAsyncAdapter<>(updateFunction), TransactionOptions.create()); } @Nonnull @@ -299,9 +289,7 @@ public ApiFuture runTransaction(@Nonnull final Transaction.Function up public ApiFuture runTransaction( @Nonnull final Transaction.Function updateFunction, @Nonnull TransactionOptions transactionOptions) { - SettableApiFuture resultFuture = SettableApiFuture.create(); - runTransaction(new TransactionAsyncAdapter<>(updateFunction), resultFuture, transactionOptions); - return resultFuture; + return runAsyncTransaction(new TransactionAsyncAdapter<>(updateFunction), transactionOptions); } @Nonnull @@ -316,160 +304,16 @@ public ApiFuture runAsyncTransaction( public ApiFuture runAsyncTransaction( @Nonnull final Transaction.AsyncFunction updateFunction, @Nonnull TransactionOptions transactionOptions) { - SettableApiFuture resultFuture = SettableApiFuture.create(); - runTransaction(updateFunction, resultFuture, transactionOptions); - return resultFuture; - } - - /** Transaction functions that returns its result in the provided SettableFuture. */ - private void runTransaction( - final Transaction.AsyncFunction transactionCallback, - final SettableApiFuture resultFuture, - final TransactionOptions options) { - // span is intentionally not ended here. It will be ended by runTransactionAttempt on success - // or error. - Span span = tracer.spanBuilder("CloudFirestore.Transaction").startSpan(); - try (Scope s = tracer.withSpan(span)) { - runTransactionAttempt(transactionCallback, resultFuture, options, span); - } - } - - private void runTransactionAttempt( - final Transaction.AsyncFunction transactionCallback, - final SettableApiFuture resultFuture, - final TransactionOptions options, - final Span span) { - final Transaction transaction = new Transaction(this, options.getPreviousTransactionId()); final Executor userCallbackExecutor = Context.currentContextExecutor( - options.getExecutor() != null ? options.getExecutor() : firestoreClient.getExecutor()); - - final int attemptsRemaining = options.getNumberOfAttempts() - 1; - span.addAnnotation( - "Start runTransaction", - ImmutableMap.of("attemptsRemaining", AttributeValue.longAttributeValue(attemptsRemaining))); - - ApiFutures.addCallback( - transaction.begin(), - new ApiFutureCallback() { - @Override - public void onFailure(Throwable throwable) { - // Don't retry failed BeginTransaction requests. - rejectTransaction(throwable); - } - - @Override - public void onSuccess(Void ignored) { - ApiFutures.addCallback( - invokeUserCallback(), - new ApiFutureCallback() { - @Override - public void onFailure(Throwable throwable) { - // This was a error in the user callback, forward the throwable. - rejectTransaction(throwable); - } - - @Override - public void onSuccess(final T userResult) { - // Commit the transaction - ApiFutures.addCallback( - transaction.commit(), - new ApiFutureCallback>() { - @Override - public void onFailure(Throwable throwable) { - // Retry failed commits. - maybeRetry(throwable); - } - - @Override - public void onSuccess(List writeResults) { - span.setStatus(io.opencensus.trace.Status.OK); - span.end(); - resultFuture.set(userResult); - } - }, - MoreExecutors.directExecutor()); - } - }, - MoreExecutors.directExecutor()); - } - - private SettableApiFuture invokeUserCallback() { - // Execute the user callback on the provided executor. - final SettableApiFuture callbackResult = SettableApiFuture.create(); - userCallbackExecutor.execute( - new Runnable() { - @Override - public void run() { - try { - ApiFuture updateCallback = transactionCallback.updateCallback(transaction); - ApiFutures.addCallback( - updateCallback, - new ApiFutureCallback() { - @Override - public void onFailure(Throwable t) { - callbackResult.setException(t); - } - - @Override - public void onSuccess(T result) { - callbackResult.set(result); - } - }, - MoreExecutors.directExecutor()); - } catch (Throwable t) { - callbackResult.setException(t); - } - } - }); - return callbackResult; - } - - private void maybeRetry(Throwable throwable) { - if (attemptsRemaining > 0) { - span.addAnnotation("retrying"); - runTransactionAttempt( - transactionCallback, - resultFuture, - new TransactionOptions( - attemptsRemaining, options.getExecutor(), transaction.getTransactionId()), - span); - } else { - span.setStatus(TOO_MANY_RETRIES_STATUS); - rejectTransaction( - FirestoreException.serverRejected( - Status.ABORTED, - throwable, - "Transaction was cancelled because of too many retries.")); - } - } - - private void rejectTransaction(final Throwable throwable) { - if (throwable instanceof ApiException) { - span.setStatus(TraceUtil.statusFromApiException((ApiException) throwable)); - } - span.end(); - if (transaction.isPending()) { - ApiFutures.addCallback( - transaction.rollback(), - new ApiFutureCallback() { - @Override - public void onFailure(Throwable throwable) { - resultFuture.setException(throwable); - } - - @Override - public void onSuccess(Void ignored) { - resultFuture.setException(throwable); - } - }, - MoreExecutors.directExecutor()); - } else { - resultFuture.setException(throwable); - } - } - }, - MoreExecutors.directExecutor()); + transactionOptions.getExecutor() != null + ? transactionOptions.getExecutor() + : firestoreClient.getExecutor()); + + TransactionRunner transactionRunner = + new TransactionRunner<>( + this, updateFunction, userCallbackExecutor, transactionOptions.getNumberOfAttempts()); + return transactionRunner.run(); } /** Returns whether the user has opted into receiving dates as com.google.cloud.Timestamp. */ diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java index e6c03b470..27b0ccd0c 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java @@ -61,25 +61,15 @@ public interface AsyncFunction { ApiFuture updateCallback(Transaction transaction); } - private final ByteString previousTransactionId; private ByteString transactionId; - private boolean pending; + private @Nullable ByteString previousTransactionId; - Transaction(FirestoreImpl firestore, @Nullable ByteString previousTransactionId) { + Transaction(FirestoreImpl firestore, @Nullable Transaction previousTransaction) { super(firestore); - this.previousTransactionId = previousTransactionId; + previousTransactionId = previousTransaction != null ? previousTransaction.transactionId : null; } - @Nullable - ByteString getTransactionId() { - return transactionId; - } - - boolean isPending() { - return pending; - } - - /** Starts a transaction and obtains the transaction id from the server. */ + /** Starts a transaction and obtains the transaction id. */ ApiFuture begin() { BeginTransactionRequest.Builder beginTransaction = BeginTransactionRequest.newBuilder(); beginTransaction.setDatabase(firestore.getDatabaseName()); @@ -101,7 +91,6 @@ ApiFuture begin() { @Override public Void apply(BeginTransactionResponse beginTransactionResponse) { transactionId = beginTransactionResponse.getTransaction(); - pending = true; return null; } }, @@ -110,14 +99,11 @@ public Void apply(BeginTransactionResponse beginTransactionResponse) { /** Commits a transaction. */ ApiFuture> commit() { - pending = false; return super.commit(transactionId); } /** Rolls a transaction back and releases all read locks. */ ApiFuture rollback() { - pending = false; - RollbackRequest.Builder reqBuilder = RollbackRequest.newBuilder(); reqBuilder.setTransaction(transactionId); reqBuilder.setDatabase(firestore.getDatabaseName()); diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionOptions.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionOptions.java index 699a047ea..9b54f7ac8 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionOptions.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionOptions.java @@ -17,7 +17,6 @@ package com.google.cloud.firestore; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; import java.util.concurrent.Executor; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -29,12 +28,10 @@ public final class TransactionOptions { private final int numberOfAttempts; private final Executor executor; - private final ByteString previousTransactionId; - TransactionOptions(int maxAttempts, Executor executor, ByteString previousTransactionId) { + TransactionOptions(int maxAttempts, Executor executor) { this.numberOfAttempts = maxAttempts; this.executor = executor; - this.previousTransactionId = previousTransactionId; } public int getNumberOfAttempts() { @@ -46,11 +43,6 @@ public Executor getExecutor() { return executor; } - @Nullable - ByteString getPreviousTransactionId() { - return previousTransactionId; - } - /** * Create a default set of options suitable for most use cases. Transactions will be attempted 5 * times. @@ -59,7 +51,7 @@ ByteString getPreviousTransactionId() { */ @Nonnull public static TransactionOptions create() { - return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, null, null); + return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, null); } /** @@ -71,7 +63,7 @@ public static TransactionOptions create() { @Nonnull public static TransactionOptions create(int numberOfAttempts) { Preconditions.checkArgument(numberOfAttempts > 0, "You must allow at least one attempt"); - return new TransactionOptions(numberOfAttempts, null, null); + return new TransactionOptions(numberOfAttempts, null); } /** @@ -82,7 +74,7 @@ public static TransactionOptions create(int numberOfAttempts) { */ @Nonnull public static TransactionOptions create(@Nonnull Executor executor) { - return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, executor, null); + return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, executor); } /** @@ -95,6 +87,6 @@ public static TransactionOptions create(@Nonnull Executor executor) { @Nonnull public static TransactionOptions create(@Nonnull Executor executor, int numberOfAttempts) { Preconditions.checkArgument(numberOfAttempts > 0, "You must allow at least one attempt"); - return new TransactionOptions(numberOfAttempts, executor, null); + return new TransactionOptions(numberOfAttempts, executor); } } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java new file mode 100644 index 000000000..a6724c654 --- /dev/null +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java @@ -0,0 +1,284 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore; + +import com.google.api.core.ApiAsyncFunction; +import com.google.api.core.ApiFunction; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.CurrentMillisClock; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.retrying.ExponentialRetryAlgorithm; +import com.google.api.gax.retrying.TimedAttemptSettings; +import com.google.api.gax.rpc.ApiException; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.Span; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Implements backoff and retry semantics for Firestore transactions. + * + *

A TransactionRunner is instantiated with a `userCallback`, a `userCallbackExecutor` and + * `numberOfAttempts`. Upon invoking {@link #run()}, the class invokes the provided callback on the + * specified executor at most `numberOfAttempts` times. {@link #run()} returns an ApiFuture that + * resolves when all retries complete. + * + *

TransactionRunner uses exponential backoff to increase the chance that retries succeed. To + * customize the backoff settings, you can specify custom settings via {@link FirestoreOptions}. + */ +class TransactionRunner { + + private static final Tracer tracer = Tracing.getTracer(); + private static final io.opencensus.trace.Status TOO_MANY_RETRIES_STATUS = + io.opencensus.trace.Status.ABORTED.withDescription("too many retries"); + private static final io.opencensus.trace.Status USER_CALLBACK_FAILED = + io.opencensus.trace.Status.ABORTED.withDescription("user callback failed"); + + private final Transaction.AsyncFunction userCallback; + private final Span span; + private final FirestoreImpl firestoreClient; + private final ScheduledExecutorService firestoreExecutor; + private final Executor userCallbackExecutor; + private final ExponentialRetryAlgorithm backoffAlgorithm; + private TimedAttemptSettings nextBackoffAttempt; + private Transaction transaction; + private int attemptsRemaining; + + /** + * @param firestore The active Firestore instance + * @param userCallback The user provided transaction callback + * @param userCallbackExecutor The executor to run the user callback on + * @param numberOfAttempts The total number of attempts for this transaction + */ + TransactionRunner( + FirestoreImpl firestore, + Transaction.AsyncFunction userCallback, + Executor userCallbackExecutor, + int numberOfAttempts) { + this.span = tracer.spanBuilder("CloudFirestore.Transaction").startSpan(); + this.firestoreClient = firestore; + this.firestoreExecutor = firestore.getClient().getExecutor(); + this.userCallback = userCallback; + this.attemptsRemaining = numberOfAttempts; + this.userCallbackExecutor = userCallbackExecutor; + this.backoffAlgorithm = + new ExponentialRetryAlgorithm( + firestore.getOptions().getRetrySettings(), CurrentMillisClock.getDefaultClock()); + this.nextBackoffAttempt = backoffAlgorithm.createFirstAttempt(); + } + + ApiFuture run() { + this.transaction = new Transaction(firestoreClient, this.transaction); + + --attemptsRemaining; + + span.addAnnotation( + "Start runTransaction", + ImmutableMap.of("attemptsRemaining", AttributeValue.longAttributeValue(attemptsRemaining))); + + final SettableApiFuture backoff = SettableApiFuture.create(); + + // Add a backoff delay. At first, this is 0. + this.firestoreExecutor.schedule( + new Runnable() { + @Override + public void run() { + backoff.set(null); + } + }, + nextBackoffAttempt.getRandomizedRetryDelay().toMillis(), + TimeUnit.MILLISECONDS); + + nextBackoffAttempt = backoffAlgorithm.createNextAttempt(nextBackoffAttempt); + + return ApiFutures.transformAsync( + backoff, new BackoffCallback(), MoreExecutors.directExecutor()); + } + + /** + * Invokes the user callback on the user callback executor and returns the user-provided result. + */ + private SettableApiFuture invokeUserCallback() { + final SettableApiFuture callbackResult = SettableApiFuture.create(); + userCallbackExecutor.execute( + new Runnable() { + @Override + public void run() { + ApiFutures.addCallback( + userCallback.updateCallback(transaction), + new ApiFutureCallback() { + @Override + public void onFailure(Throwable t) { + + callbackResult.setException(t); + } + + @Override + public void onSuccess(T result) { + callbackResult.set(result); + } + }, + MoreExecutors.directExecutor()); + } + }); + return callbackResult; + } + + /** A callback that invokes the BeginTransaction callback. */ + private class BackoffCallback implements ApiAsyncFunction { + @Override + public ApiFuture apply(Void input) { + return ApiFutures.transformAsync( + transaction.begin(), new BeginTransactionCallback(), MoreExecutors.directExecutor()); + } + } + + /** + * The callback for the BeginTransaction RPC, which invokes the user callback and handles all + * errors thereafter. + */ + private class BeginTransactionCallback implements ApiAsyncFunction { + public ApiFuture apply(Void ignored) { + return ApiFutures.catchingAsync( + ApiFutures.transformAsync( + invokeUserCallback(), new UserFunctionCallback(), MoreExecutors.directExecutor()), + Throwable.class, + new RestartTransactionCallback(), + MoreExecutors.directExecutor()); + } + } + + /** + * The callback that is invoked after the user function finishes execution. It invokes the Commit + * RPC. + */ + private class UserFunctionCallback implements ApiAsyncFunction { + @Override + public ApiFuture apply(T userFunctionResult) { + return ApiFutures.transform( + transaction.commit(), + new CommitTransactionCallback(userFunctionResult), + MoreExecutors.directExecutor()); + } + } + + /** The callback that is invoked after the Commit RPC returns. It returns the user result. */ + private class CommitTransactionCallback implements ApiFunction, T> { + private T userFunctionResult; + + CommitTransactionCallback(T userFunctionResult) { + this.userFunctionResult = userFunctionResult; + } + + @Override + public T apply(List input) { + span.setStatus(io.opencensus.trace.Status.OK); + span.end(); + return userFunctionResult; + } + } + + /** A callback that restarts a transaction after an ApiException. It invokes the Rollback RPC. */ + private class RestartTransactionCallback implements ApiAsyncFunction { + public ApiFuture apply(Throwable throwable) { + if (!(throwable instanceof ApiException)) { + // This is likely a failure in the user callback. + span.setStatus(USER_CALLBACK_FAILED); + return rollbackAndReject(throwable); + } + + ApiException apiException = (ApiException) throwable; + if (isRetryableTransactionError(apiException)) { + if (attemptsRemaining > 0) { + span.addAnnotation("retrying"); + return rollbackAndContinue(); + } else { + span.setStatus(TOO_MANY_RETRIES_STATUS); + final FirestoreException firestoreException = + FirestoreException.apiException( + apiException, "Transaction was cancelled because of too many retries."); + return rollbackAndReject(firestoreException); + } + } else { + span.setStatus(TraceUtil.statusFromApiException(apiException)); + final FirestoreException firestoreException = + FirestoreException.apiException( + apiException, "Transaction failed with non-retryable error"); + return rollbackAndReject(firestoreException); + } + } + + /** Determines whether the provided error is considered retryable. */ + private boolean isRetryableTransactionError(ApiException exception) { + switch (exception.getStatusCode().getCode()) { + // This list is based on + // https://github.com/firebase/firebase-js-sdk/blob/c822e78b00dd3420dcc749beb2f09a947aa4a344/packages/firestore/src/core/transaction_runner.ts#L112 + case ABORTED: + case CANCELLED: + case UNKNOWN: + case DEADLINE_EXCEEDED: + case INTERNAL: + case UNAVAILABLE: + case UNAUTHENTICATED: + case RESOURCE_EXHAUSTED: + return true; + default: + return false; + } + } + + /** Rolls the transaction back and attempts it again. */ + private ApiFuture rollbackAndContinue() { + return ApiFutures.transformAsync( + transaction.rollback(), + new ApiAsyncFunction() { + @Override + public ApiFuture apply(Void input) { + return run(); + } + }, + MoreExecutors.directExecutor()); + } + + /** Rolls the transaction back and returns the error. */ + private ApiFuture rollbackAndReject(final Throwable throwable) { + final SettableApiFuture failedTransaction = SettableApiFuture.create(); + // We use `addListener()` since we want to return the original exception regardless of whether + // rollback() succeeds. + transaction + .rollback() + .addListener( + new Runnable() { + @Override + public void run() { + failedTransaction.setException(throwable); + } + }, + MoreExecutors.directExecutor()); + span.end(); + return failedTransaction; + } + } +} diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java index f0783924e..fc07cc585 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java @@ -19,7 +19,6 @@ import com.google.api.core.CurrentMillisClock; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.retrying.ExponentialRetryAlgorithm; -import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiStreamObserver; @@ -49,7 +48,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; -import org.threeten.bp.Duration; /** * Watch provides listen functionality and exposes snapshot listeners. It can be used with any valid @@ -66,18 +64,6 @@ class Watch implements ApiStreamObserver { */ private static final int WATCH_TARGET_ID = 0x1; - private static RetrySettings RETRY_SETTINGS = - RetrySettings.newBuilder() - // The initial backoff time in seconds after an error. - // Set to 1s according to https://cloud.google.com/apis/design/errors. - .setInitialRetryDelay(Duration.ofSeconds(1)) - // The maximum backoff time in minutes. - .setMaxRetryDelay(Duration.ofMinutes(1)) - // The factor to increase the backup by after each failed attempt. - .setRetryDelayMultiplier(1.5) - .setJittered(true) - .build(); - private final FirestoreImpl firestore; private final ScheduledExecutorService firestoreExecutor; private final Query query; @@ -138,7 +124,8 @@ private Watch(FirestoreImpl firestore, Query query, Target target) { this.query = query; this.comparator = query.comparator(); this.backoff = - new ExponentialRetryAlgorithm(RETRY_SETTINGS, CurrentMillisClock.getDefaultClock()); + new ExponentialRetryAlgorithm( + firestore.getOptions().getRetrySettings(), CurrentMillisClock.getDefaultClock()); this.firestoreExecutor = firestore.getClient().getExecutor(); this.isActive = new AtomicBoolean(); this.nextAttempt = backoff.createFirstAttempt(); diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java index 41be685cf..148ce5585 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.cloud.Timestamp; import com.google.common.collect.ImmutableList; @@ -57,7 +58,6 @@ import com.google.type.LatLng; import java.io.IOException; import java.math.BigInteger; -import java.nio.charset.Charset; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -73,14 +73,23 @@ import javax.annotation.Nullable; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.threeten.bp.Duration; public final class LocalFirestoreHelper { + protected static RetrySettings IMMEDIATE_RETRY_SETTINGS = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ZERO) + .setMaxRetryDelay(Duration.ZERO) + .setRetryDelayMultiplier(1) + .setJittered(false) + .build(); + public static final String DATABASE_NAME; public static final String COLLECTION_ID; public static final String DOCUMENT_PATH; public static final String DOCUMENT_NAME; - public static final ByteString TRANSACTION_ID; + public static final String TRANSACTION_ID; public static final Map EMPTY_MAP_PROTO; @@ -285,12 +294,15 @@ public static BeginTransactionRequest begin() { return begin(null); } - public static BeginTransactionRequest begin(@Nullable ByteString previousTransactionId) { + public static BeginTransactionRequest begin(@Nullable String previousTransactionId) { BeginTransactionRequest.Builder begin = BeginTransactionRequest.newBuilder(); begin.setDatabase(DATABASE_NAME); if (previousTransactionId != null) { - begin.getOptionsBuilder().getReadWriteBuilder().setRetryTransaction(previousTransactionId); + begin + .getOptionsBuilder() + .getReadWriteBuilder() + .setRetryTransaction(ByteString.copyFromUtf8(previousTransactionId)); } return begin.build(); @@ -300,16 +312,20 @@ public static ApiFuture beginResponse() { return beginResponse(TRANSACTION_ID); } - public static ApiFuture beginResponse(ByteString transactionId) { + public static ApiFuture beginResponse(String transactionId) { BeginTransactionResponse.Builder beginResponse = BeginTransactionResponse.newBuilder(); - beginResponse.setTransaction(transactionId); + beginResponse.setTransaction(ByteString.copyFromUtf8(transactionId)); return ApiFutures.immediateFuture(beginResponse.build()); } public static RollbackRequest rollback() { + return rollback(TRANSACTION_ID); + } + + public static RollbackRequest rollback(String transactionId) { RollbackRequest.Builder rollback = RollbackRequest.newBuilder(); rollback.setDatabase(DATABASE_NAME); - rollback.setTransaction(TRANSACTION_ID); + rollback.setTransaction(ByteString.copyFromUtf8(transactionId)); return rollback.build(); } @@ -424,13 +440,13 @@ public static Write update( return write.build(); } - public static CommitRequest commit(@Nullable ByteString transactionId, Write... writes) { + public static CommitRequest commit(@Nullable String transactionId, Write... writes) { CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); commitRequest.setDatabase(DATABASE_NAME); commitRequest.addAllWrites(Arrays.asList(writes)); if (transactionId != null) { - commitRequest.setTransaction(transactionId); + commitRequest.setTransaction(ByteString.copyFromUtf8(transactionId)); } return commitRequest.build(); @@ -484,7 +500,7 @@ public static RunQueryRequest query(StructuredQuery... query) { } public static RunQueryRequest query( - @Nullable ByteString transactionId, boolean allDescendants, StructuredQuery... query) { + @Nullable String transactionId, boolean allDescendants, StructuredQuery... query) { RunQueryRequest.Builder request = RunQueryRequest.newBuilder(); request.setParent(LocalFirestoreHelper.DATABASE_NAME + "/documents"); StructuredQuery.Builder structuredQuery = request.getStructuredQueryBuilder(); @@ -512,7 +528,7 @@ public static RunQueryRequest query( } if (transactionId != null) { - request.setTransaction(transactionId); + request.setTransaction(ByteString.copyFromUtf8(transactionId)); } return request.build(); @@ -522,12 +538,12 @@ public static BatchGetDocumentsRequest get() { return getAll(null, DOCUMENT_NAME); } - public static BatchGetDocumentsRequest get(@Nullable ByteString transactionId) { + public static BatchGetDocumentsRequest get(@Nullable String transactionId) { return getAll(transactionId, DOCUMENT_NAME); } public static BatchGetDocumentsRequest getAll( - @Nullable ByteString transactionId, String... documentNames) { + @Nullable String transactionId, String... documentNames) { BatchGetDocumentsRequest.Builder request = BatchGetDocumentsRequest.newBuilder(); request.setDatabase(DATABASE_NAME); @@ -536,7 +552,7 @@ public static BatchGetDocumentsRequest getAll( } if (transactionId != null) { - request.setTransaction(transactionId); + request.setTransaction(ByteString.copyFromUtf8(transactionId)); } return request.build(); @@ -694,7 +710,7 @@ public boolean equals(Object o) { } static { - TRANSACTION_ID = ByteString.copyFrom("foo", Charset.defaultCharset()); + TRANSACTION_ID = "foo"; try { DATE = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S z").parse("1985-03-18 08:20:00.123 CET"); diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/TransactionTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/TransactionTest.java index 989d815e3..52b3150b8 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/TransactionTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/TransactionTest.java @@ -16,6 +16,7 @@ package com.google.cloud.firestore; +import static com.google.cloud.firestore.LocalFirestoreHelper.IMMEDIATE_RETRY_SETTINGS; import static com.google.cloud.firestore.LocalFirestoreHelper.SINGLE_FIELD_PROTO; import static com.google.cloud.firestore.LocalFirestoreHelper.TRANSACTION_ID; import static com.google.cloud.firestore.LocalFirestoreHelper.begin; @@ -42,19 +43,26 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.Timestamp; import com.google.cloud.firestore.spi.v1.FirestoreRpc; import com.google.firestore.v1.BatchGetDocumentsRequest; import com.google.firestore.v1.DocumentMask; import com.google.firestore.v1.Write; -import com.google.protobuf.ByteString; +import com.google.protobuf.GeneratedMessageV3; import com.google.protobuf.Message; +import io.grpc.Status; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -69,16 +77,26 @@ import org.mockito.Mockito; import org.mockito.Spy; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Stubber; @RunWith(MockitoJUnitRunner.class) public class TransactionTest { + private final ApiFuture RETRYABLE_API_EXCEPTION = + ApiFutures.immediateFailedFuture( + new ApiException( + new Exception("Test exception"), GrpcStatusCode.of(Status.Code.UNKNOWN), true)); + @Spy private FirestoreRpc firestoreRpc = Mockito.mock(FirestoreRpc.class); @Spy private FirestoreImpl firestoreMock = new FirestoreImpl( - FirestoreOptions.newBuilder().setProjectId("test-project").build(), firestoreRpc); + FirestoreOptions.newBuilder() + .setProjectId("test-project") + .setRetrySettings(IMMEDIATE_RETRY_SETTINGS) + .build(), + firestoreRpc); @Captor private ArgumentCaptor requestCapture; @Captor private ArgumentCaptor> streamObserverCapture; @@ -163,8 +181,6 @@ public ApiFuture updateCallback(Transaction transaction) { @Test public void canReturnNull() throws Exception { doReturn(beginResponse()) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo2"))) .doReturn(commitResponse(0, 0)) .when(firestoreMock) .sendRequest(requestCapture.capture(), Matchers.>any()); @@ -179,14 +195,12 @@ public String updateCallback(Transaction transaction) { }, options); - assertEquals(null, transaction.get()); + assertNull(transaction.get()); } @Test public void canReturnNullAsync() throws Exception { doReturn(beginResponse()) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo2"))) .doReturn(commitResponse(0, 0)) .when(firestoreMock) .sendRequest(requestCapture.capture(), Matchers.>any()); @@ -324,28 +338,40 @@ public ApiFuture updateCallback(Transaction transaction) { @Test public void limitsRetriesWithFailure() throws Exception { - doReturn(beginResponse(ByteString.copyFromUtf8("foo1"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo2"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo3"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo4"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo5"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .when(firestoreMock) - .sendRequest(requestCapture.capture(), Matchers.>any()); + RequestResponseMap requestResponseMap = + new RequestResponseMap() { + { + put(begin(), beginResponse("foo1")); + put(commit("foo1"), RETRYABLE_API_EXCEPTION); + put(rollback("foo1"), rollbackResponse()); + put(begin("foo1"), beginResponse("foo2")); + put(commit("foo2"), RETRYABLE_API_EXCEPTION); + put(rollback("foo2"), rollbackResponse()); + put(begin("foo2"), beginResponse("foo3")); + put(commit("foo3"), RETRYABLE_API_EXCEPTION); + put(rollback("foo3"), rollbackResponse()); + put(begin("foo3"), beginResponse("foo4")); + put(commit("foo4"), RETRYABLE_API_EXCEPTION); + put(rollback("foo4"), rollbackResponse()); + put(begin("foo4"), beginResponse("foo5")); + put(commit("foo5"), RETRYABLE_API_EXCEPTION); + put(rollback("foo5"), rollbackResponse()); + } + }; + + initializeStub(requestResponseMap); + + final AtomicInteger retryCount = new AtomicInteger(1); ApiFuture transaction = firestoreMock.runTransaction( new Transaction.Function() { @Override public String updateCallback(Transaction transaction) { - return "foo"; + return "foo" + retryCount.getAndIncrement(); } }, - options); + TransactionOptions.create(options.getExecutor(), 5)); try { transaction.get(); @@ -355,36 +381,40 @@ public String updateCallback(Transaction transaction) { } List requests = requestCapture.getAllValues(); - assertEquals(10, requests.size()); + assertEquals(requestResponseMap.size(), requests.size()); - assertEquals(begin(), requests.get(0)); - assertEquals(commit(ByteString.copyFromUtf8("foo1")), requests.get(1)); - assertEquals(begin(ByteString.copyFromUtf8("foo1")), requests.get(2)); - assertEquals(commit(ByteString.copyFromUtf8("foo2")), requests.get(3)); - assertEquals(begin(ByteString.copyFromUtf8("foo2")), requests.get(4)); - assertEquals(commit(ByteString.copyFromUtf8("foo3")), requests.get(5)); - assertEquals(begin(ByteString.copyFromUtf8("foo3")), requests.get(6)); - assertEquals(commit(ByteString.copyFromUtf8("foo4")), requests.get(7)); - assertEquals(begin(ByteString.copyFromUtf8("foo4")), requests.get(8)); - assertEquals(commit(ByteString.copyFromUtf8("foo5")), requests.get(9)); + int index = 0; + for (GeneratedMessageV3 request : requestResponseMap.keySet()) { + assertEquals(request, requests.get(index++)); + } } @Test public void limitsRetriesWithSuccess() throws Exception { - doReturn(beginResponse(ByteString.copyFromUtf8("foo1"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo2"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo3"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo4"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo5"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo6"))) - .doReturn(commitResponse(0, 0)) - .when(firestoreMock) - .sendRequest(requestCapture.capture(), Matchers.>any()); + RequestResponseMap requestResponseMap = + new RequestResponseMap() { + { + put(begin(), beginResponse("foo1")); + put(commit("foo1"), RETRYABLE_API_EXCEPTION); + put(rollback("foo1"), rollbackResponse()); + put(begin("foo1"), beginResponse("foo2")); + put(commit("foo2"), RETRYABLE_API_EXCEPTION); + put(rollback("foo2"), rollbackResponse()); + put(begin("foo2"), beginResponse("foo3")); + put(commit("foo3"), RETRYABLE_API_EXCEPTION); + put(rollback("foo3"), rollbackResponse()); + put(begin("foo3"), beginResponse("foo4")); + put(commit("foo4"), RETRYABLE_API_EXCEPTION); + put(rollback("foo4"), rollbackResponse()); + put(begin("foo4"), beginResponse("foo5")); + put(commit("foo5"), RETRYABLE_API_EXCEPTION); + put(rollback("foo5"), rollbackResponse()); + put(begin("foo5"), beginResponse("foo6")); + put(commit("foo6"), commitResponse(0, 0)); + } + }; + + initializeStub(requestResponseMap); final AtomicInteger retryCount = new AtomicInteger(1); @@ -401,20 +431,109 @@ public String updateCallback(Transaction transaction) { assertEquals("foo6", transaction.get()); List requests = requestCapture.getAllValues(); - assertEquals(12, requests.size()); + assertEquals(requestResponseMap.size(), requests.size()); - assertEquals(begin(), requests.get(0)); - assertEquals(commit(ByteString.copyFromUtf8("foo1")), requests.get(1)); - assertEquals(begin(ByteString.copyFromUtf8("foo1")), requests.get(2)); - assertEquals(commit(ByteString.copyFromUtf8("foo2")), requests.get(3)); - assertEquals(begin(ByteString.copyFromUtf8("foo2")), requests.get(4)); - assertEquals(commit(ByteString.copyFromUtf8("foo3")), requests.get(5)); - assertEquals(begin(ByteString.copyFromUtf8("foo3")), requests.get(6)); - assertEquals(commit(ByteString.copyFromUtf8("foo4")), requests.get(7)); - assertEquals(begin(ByteString.copyFromUtf8("foo4")), requests.get(8)); - assertEquals(commit(ByteString.copyFromUtf8("foo5")), requests.get(9)); - assertEquals(begin(ByteString.copyFromUtf8("foo5")), requests.get(10)); - assertEquals(commit(ByteString.copyFromUtf8("foo6")), requests.get(11)); + int index = 0; + for (GeneratedMessageV3 request : requestResponseMap.keySet()) { + assertEquals(request, requests.get(index++)); + } + } + + @Test + public void retriesBasedOnErrorCode() throws Exception { + Map retryBehavior = + new HashMap() { + { + put(Status.Code.CANCELLED, true); + put(Status.Code.UNKNOWN, true); + put(Status.Code.INVALID_ARGUMENT, false); + put(Status.Code.DEADLINE_EXCEEDED, true); + put(Status.Code.NOT_FOUND, false); + put(Status.Code.ALREADY_EXISTS, false); + put(Status.Code.RESOURCE_EXHAUSTED, true); + put(Status.Code.FAILED_PRECONDITION, false); + put(Status.Code.ABORTED, true); + put(Status.Code.OUT_OF_RANGE, false); + put(Status.Code.UNIMPLEMENTED, false); + put(Status.Code.INTERNAL, true); + put(Status.Code.UNAVAILABLE, true); + put(Status.Code.DATA_LOSS, false); + put(Status.Code.UNAUTHENTICATED, true); + } + }; + + for (Map.Entry entry : retryBehavior.entrySet()) { + StatusCode code = GrpcStatusCode.of(entry.getKey()); + boolean shouldRetry = entry.getValue(); + + final ApiException apiException = + new ApiException(new Exception("Test Exception"), code, shouldRetry); + + if (shouldRetry) { + RequestResponseMap requestResponseMap = + new RequestResponseMap() { + { + put(begin(), beginResponse("foo1")); + put( + commit("foo1"), + ApiFutures.immediateFailedFuture(apiException)); + put(rollback("foo1"), rollbackResponse()); + put(begin("foo1"), beginResponse("foo2")); + put(commit("foo2"), commitResponse(0, 0)); + } + }; + initializeStub(requestResponseMap); + + final int[] attempts = new int[] {0}; + + ApiFuture transaction = + firestoreMock.runTransaction( + new Transaction.Function() { + @Override + public String updateCallback(Transaction transaction) { + ++attempts[0]; + return null; + } + }); + + transaction.get(); + + assertEquals(2, attempts[0]); + } else { + RequestResponseMap requestResponseMap = + new RequestResponseMap() { + { + put(begin(), beginResponse("foo1")); + put( + commit("foo1"), + ApiFutures.immediateFailedFuture(apiException)); + put(rollback("foo1"), rollbackResponse()); + } + }; + + initializeStub(requestResponseMap); + + final int[] attempts = new int[] {0}; + + ApiFuture transaction = + firestoreMock.runTransaction( + new Transaction.Function() { + @Override + public String updateCallback(Transaction transaction) { + ++attempts[0]; + return null; + } + }); + + try { + transaction.get(); + fail("Transaction should have failed"); + } catch (Exception ignored) { + } + + assertEquals(1, attempts[0]); + } + } } @Test @@ -747,4 +866,17 @@ public String updateCallback(Transaction transaction) { assertEquals(begin(), requests.get(0)); assertEquals(commit(TRANSACTION_ID, writes.toArray(new Write[] {})), requests.get(1)); } + + static class RequestResponseMap + extends LinkedHashMap> {} + + private void initializeStub(RequestResponseMap requestResponseMap) { + Stubber stubber = null; + for (ApiFuture response : requestResponseMap.values()) { + stubber = (stubber != null) ? stubber.doReturn(response) : doReturn(response); + } + stubber + .when(firestoreMock) + .sendRequest(requestCapture.capture(), Matchers.>any()); + } } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java index 2b71c28b0..6eeb1dbd8 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java @@ -17,6 +17,7 @@ package com.google.cloud.firestore; import static com.google.cloud.firestore.LocalFirestoreHelper.DATABASE_NAME; +import static com.google.cloud.firestore.LocalFirestoreHelper.IMMEDIATE_RETRY_SETTINGS; import static com.google.cloud.firestore.LocalFirestoreHelper.SINGLE_FIELD_MAP; import static com.google.cloud.firestore.LocalFirestoreHelper.SINGLE_FIELD_PROTO; import static com.google.cloud.firestore.LocalFirestoreHelper.UPDATED_FIELD_MAP; @@ -97,7 +98,11 @@ public class WatchTest { @Spy private FirestoreImpl firestoreMock = new FirestoreImpl( - FirestoreOptions.newBuilder().setProjectId("test-project").build(), firestoreRpc); + FirestoreOptions.newBuilder() + .setProjectId("test-project") + .setRetrySettings(IMMEDIATE_RETRY_SETTINGS) + .build(), + firestoreRpc); @Captor private ArgumentCaptor> streamObserverCapture; diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java index 0565d05e3..a0379f061 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java @@ -544,6 +544,34 @@ public String updateCallback(Transaction transaction) { } } + @Test + public void doesNotRetryTransactionsWithFailedPreconditions() { + final DocumentReference documentReference = randomColl.document(); + + final AtomicInteger attempts = new AtomicInteger(); + + ApiFuture firstTransaction = + firestore.runTransaction( + new Transaction.Function() { + @Override + public Void updateCallback(Transaction transaction) { + attempts.incrementAndGet(); + transaction.update(documentReference, "foo", "bar"); + return null; + } + }); + + try { + firstTransaction.get(); + fail("ApiFuture should fail with ExecutionException"); + } catch (InterruptedException e) { + fail("ApiFuture should fail with ExecutionException"); + } catch (ExecutionException e) { + assertEquals(1, attempts.intValue()); + assertEquals(404, ((FirestoreException) e.getCause()).getCode()); + } + } + @Test public void successfulTransactionWithContention() throws Exception { final DocumentReference documentReference = addDocument("counter", 1);