From 00b6eb3703c8f4942d6da42b827ecbeeb9a13ef5 Mon Sep 17 00:00:00 2001 From: Sebastian Schmidt Date: Wed, 1 Apr 2020 14:56:53 -0700 Subject: [PATCH] feat: base transaction retries on error codes (#129) Port of googleapis/nodejs-firestore#953 Implements go/transaction-retry-matrix for Java. Transaction retries are now dependent upon error code, and will only be retried on specific error codes. This change takes care of all of the state tracking and cleanup necessary to retry transactions. --- .../cloud/firestore/FirestoreException.java | 23 +- .../google/cloud/firestore/FirestoreImpl.java | 178 +---------- .../google/cloud/firestore/Transaction.java | 22 +- .../cloud/firestore/TransactionOptions.java | 18 +- .../cloud/firestore/TransactionRunner.java | 284 ++++++++++++++++++ .../com/google/cloud/firestore/Watch.java | 17 +- .../cloud/firestore/LocalFirestoreHelper.java | 46 ++- .../cloud/firestore/TransactionTest.java | 250 +++++++++++---- .../com/google/cloud/firestore/WatchTest.java | 7 +- .../cloud/firestore/it/ITSystemTest.java | 28 ++ 10 files changed, 580 insertions(+), 293 deletions(-) create mode 100644 google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java index 2aaefb0c5..5fabd5e8b 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java @@ -37,12 +37,16 @@ private FirestoreException(String reason, Status status, @Nullable Throwable cau this.status = status; } - private FirestoreException(IOException exception, boolean retryable) { - super(exception, retryable); + private FirestoreException(String reason, ApiException exception) { + super( + reason, + exception, + exception.getStatusCode().getCode().getHttpStatusCode(), + exception.isRetryable()); } - private FirestoreException(ApiException exception) { - super(exception); + private FirestoreException(IOException exception, boolean retryable) { + super(exception, retryable); } /** @@ -91,7 +95,16 @@ static FirestoreException networkException(IOException exception, boolean retrya * @return The FirestoreException */ static FirestoreException apiException(ApiException exception) { - return new FirestoreException(exception); + return new FirestoreException(exception.getMessage(), exception); + } + + /** + * Creates a FirestoreException from an ApiException. + * + * @return The FirestoreException + */ + static FirestoreException apiException(ApiException exception, String message) { + return new FirestoreException(message, exception); } @InternalApi diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java index 77472d5f1..7c00b1dbd 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java @@ -17,10 +17,7 @@ package com.google.cloud.firestore; import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutureCallback; -import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; -import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.BidiStreamingCallable; import com.google.api.gax.rpc.ServerStreamingCallable; @@ -29,16 +26,12 @@ import com.google.cloud.firestore.spi.v1.FirestoreRpc; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.MoreExecutors; import com.google.firestore.v1.BatchGetDocumentsRequest; import com.google.firestore.v1.BatchGetDocumentsResponse; import com.google.firestore.v1.DatabaseRootName; import com.google.protobuf.ByteString; import io.grpc.Context; -import io.grpc.Status; -import io.opencensus.common.Scope; import io.opencensus.trace.AttributeValue; -import io.opencensus.trace.Span; import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; import java.security.SecureRandom; @@ -48,7 +41,6 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.Executor; -import java.util.logging.Logger; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -63,10 +55,7 @@ class FirestoreImpl implements Firestore { private static final String AUTO_ID_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; - private static final Logger LOGGER = Logger.getLogger("Firestore"); private static final Tracer tracer = Tracing.getTracer(); - private static final io.opencensus.trace.Status TOO_MANY_RETRIES_STATUS = - io.opencensus.trace.Status.ABORTED.withDescription("too many retries"); private final FirestoreRpc firestoreClient; private final FirestoreOptions firestoreOptions; @@ -291,7 +280,8 @@ public Query collectionGroup(@Nonnull final String collectionId) { @Nonnull @Override public ApiFuture runTransaction(@Nonnull final Transaction.Function updateFunction) { - return runTransaction(updateFunction, TransactionOptions.create()); + return runAsyncTransaction( + new TransactionAsyncAdapter<>(updateFunction), TransactionOptions.create()); } @Nonnull @@ -299,9 +289,7 @@ public ApiFuture runTransaction(@Nonnull final Transaction.Function up public ApiFuture runTransaction( @Nonnull final Transaction.Function updateFunction, @Nonnull TransactionOptions transactionOptions) { - SettableApiFuture resultFuture = SettableApiFuture.create(); - runTransaction(new TransactionAsyncAdapter<>(updateFunction), resultFuture, transactionOptions); - return resultFuture; + return runAsyncTransaction(new TransactionAsyncAdapter<>(updateFunction), transactionOptions); } @Nonnull @@ -316,160 +304,16 @@ public ApiFuture runAsyncTransaction( public ApiFuture runAsyncTransaction( @Nonnull final Transaction.AsyncFunction updateFunction, @Nonnull TransactionOptions transactionOptions) { - SettableApiFuture resultFuture = SettableApiFuture.create(); - runTransaction(updateFunction, resultFuture, transactionOptions); - return resultFuture; - } - - /** Transaction functions that returns its result in the provided SettableFuture. */ - private void runTransaction( - final Transaction.AsyncFunction transactionCallback, - final SettableApiFuture resultFuture, - final TransactionOptions options) { - // span is intentionally not ended here. It will be ended by runTransactionAttempt on success - // or error. - Span span = tracer.spanBuilder("CloudFirestore.Transaction").startSpan(); - try (Scope s = tracer.withSpan(span)) { - runTransactionAttempt(transactionCallback, resultFuture, options, span); - } - } - - private void runTransactionAttempt( - final Transaction.AsyncFunction transactionCallback, - final SettableApiFuture resultFuture, - final TransactionOptions options, - final Span span) { - final Transaction transaction = new Transaction(this, options.getPreviousTransactionId()); final Executor userCallbackExecutor = Context.currentContextExecutor( - options.getExecutor() != null ? options.getExecutor() : firestoreClient.getExecutor()); - - final int attemptsRemaining = options.getNumberOfAttempts() - 1; - span.addAnnotation( - "Start runTransaction", - ImmutableMap.of("attemptsRemaining", AttributeValue.longAttributeValue(attemptsRemaining))); - - ApiFutures.addCallback( - transaction.begin(), - new ApiFutureCallback() { - @Override - public void onFailure(Throwable throwable) { - // Don't retry failed BeginTransaction requests. - rejectTransaction(throwable); - } - - @Override - public void onSuccess(Void ignored) { - ApiFutures.addCallback( - invokeUserCallback(), - new ApiFutureCallback() { - @Override - public void onFailure(Throwable throwable) { - // This was a error in the user callback, forward the throwable. - rejectTransaction(throwable); - } - - @Override - public void onSuccess(final T userResult) { - // Commit the transaction - ApiFutures.addCallback( - transaction.commit(), - new ApiFutureCallback>() { - @Override - public void onFailure(Throwable throwable) { - // Retry failed commits. - maybeRetry(throwable); - } - - @Override - public void onSuccess(List writeResults) { - span.setStatus(io.opencensus.trace.Status.OK); - span.end(); - resultFuture.set(userResult); - } - }, - MoreExecutors.directExecutor()); - } - }, - MoreExecutors.directExecutor()); - } - - private SettableApiFuture invokeUserCallback() { - // Execute the user callback on the provided executor. - final SettableApiFuture callbackResult = SettableApiFuture.create(); - userCallbackExecutor.execute( - new Runnable() { - @Override - public void run() { - try { - ApiFuture updateCallback = transactionCallback.updateCallback(transaction); - ApiFutures.addCallback( - updateCallback, - new ApiFutureCallback() { - @Override - public void onFailure(Throwable t) { - callbackResult.setException(t); - } - - @Override - public void onSuccess(T result) { - callbackResult.set(result); - } - }, - MoreExecutors.directExecutor()); - } catch (Throwable t) { - callbackResult.setException(t); - } - } - }); - return callbackResult; - } - - private void maybeRetry(Throwable throwable) { - if (attemptsRemaining > 0) { - span.addAnnotation("retrying"); - runTransactionAttempt( - transactionCallback, - resultFuture, - new TransactionOptions( - attemptsRemaining, options.getExecutor(), transaction.getTransactionId()), - span); - } else { - span.setStatus(TOO_MANY_RETRIES_STATUS); - rejectTransaction( - FirestoreException.serverRejected( - Status.ABORTED, - throwable, - "Transaction was cancelled because of too many retries.")); - } - } - - private void rejectTransaction(final Throwable throwable) { - if (throwable instanceof ApiException) { - span.setStatus(TraceUtil.statusFromApiException((ApiException) throwable)); - } - span.end(); - if (transaction.isPending()) { - ApiFutures.addCallback( - transaction.rollback(), - new ApiFutureCallback() { - @Override - public void onFailure(Throwable throwable) { - resultFuture.setException(throwable); - } - - @Override - public void onSuccess(Void ignored) { - resultFuture.setException(throwable); - } - }, - MoreExecutors.directExecutor()); - } else { - resultFuture.setException(throwable); - } - } - }, - MoreExecutors.directExecutor()); + transactionOptions.getExecutor() != null + ? transactionOptions.getExecutor() + : firestoreClient.getExecutor()); + + TransactionRunner transactionRunner = + new TransactionRunner<>( + this, updateFunction, userCallbackExecutor, transactionOptions.getNumberOfAttempts()); + return transactionRunner.run(); } /** Returns whether the user has opted into receiving dates as com.google.cloud.Timestamp. */ diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java index e6c03b470..27b0ccd0c 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java @@ -61,25 +61,15 @@ public interface AsyncFunction { ApiFuture updateCallback(Transaction transaction); } - private final ByteString previousTransactionId; private ByteString transactionId; - private boolean pending; + private @Nullable ByteString previousTransactionId; - Transaction(FirestoreImpl firestore, @Nullable ByteString previousTransactionId) { + Transaction(FirestoreImpl firestore, @Nullable Transaction previousTransaction) { super(firestore); - this.previousTransactionId = previousTransactionId; + previousTransactionId = previousTransaction != null ? previousTransaction.transactionId : null; } - @Nullable - ByteString getTransactionId() { - return transactionId; - } - - boolean isPending() { - return pending; - } - - /** Starts a transaction and obtains the transaction id from the server. */ + /** Starts a transaction and obtains the transaction id. */ ApiFuture begin() { BeginTransactionRequest.Builder beginTransaction = BeginTransactionRequest.newBuilder(); beginTransaction.setDatabase(firestore.getDatabaseName()); @@ -101,7 +91,6 @@ ApiFuture begin() { @Override public Void apply(BeginTransactionResponse beginTransactionResponse) { transactionId = beginTransactionResponse.getTransaction(); - pending = true; return null; } }, @@ -110,14 +99,11 @@ public Void apply(BeginTransactionResponse beginTransactionResponse) { /** Commits a transaction. */ ApiFuture> commit() { - pending = false; return super.commit(transactionId); } /** Rolls a transaction back and releases all read locks. */ ApiFuture rollback() { - pending = false; - RollbackRequest.Builder reqBuilder = RollbackRequest.newBuilder(); reqBuilder.setTransaction(transactionId); reqBuilder.setDatabase(firestore.getDatabaseName()); diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionOptions.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionOptions.java index 699a047ea..9b54f7ac8 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionOptions.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionOptions.java @@ -17,7 +17,6 @@ package com.google.cloud.firestore; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; import java.util.concurrent.Executor; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -29,12 +28,10 @@ public final class TransactionOptions { private final int numberOfAttempts; private final Executor executor; - private final ByteString previousTransactionId; - TransactionOptions(int maxAttempts, Executor executor, ByteString previousTransactionId) { + TransactionOptions(int maxAttempts, Executor executor) { this.numberOfAttempts = maxAttempts; this.executor = executor; - this.previousTransactionId = previousTransactionId; } public int getNumberOfAttempts() { @@ -46,11 +43,6 @@ public Executor getExecutor() { return executor; } - @Nullable - ByteString getPreviousTransactionId() { - return previousTransactionId; - } - /** * Create a default set of options suitable for most use cases. Transactions will be attempted 5 * times. @@ -59,7 +51,7 @@ ByteString getPreviousTransactionId() { */ @Nonnull public static TransactionOptions create() { - return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, null, null); + return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, null); } /** @@ -71,7 +63,7 @@ public static TransactionOptions create() { @Nonnull public static TransactionOptions create(int numberOfAttempts) { Preconditions.checkArgument(numberOfAttempts > 0, "You must allow at least one attempt"); - return new TransactionOptions(numberOfAttempts, null, null); + return new TransactionOptions(numberOfAttempts, null); } /** @@ -82,7 +74,7 @@ public static TransactionOptions create(int numberOfAttempts) { */ @Nonnull public static TransactionOptions create(@Nonnull Executor executor) { - return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, executor, null); + return new TransactionOptions(DEFAULT_NUM_ATTEMPTS, executor); } /** @@ -95,6 +87,6 @@ public static TransactionOptions create(@Nonnull Executor executor) { @Nonnull public static TransactionOptions create(@Nonnull Executor executor, int numberOfAttempts) { Preconditions.checkArgument(numberOfAttempts > 0, "You must allow at least one attempt"); - return new TransactionOptions(numberOfAttempts, executor, null); + return new TransactionOptions(numberOfAttempts, executor); } } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java new file mode 100644 index 000000000..a6724c654 --- /dev/null +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java @@ -0,0 +1,284 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore; + +import com.google.api.core.ApiAsyncFunction; +import com.google.api.core.ApiFunction; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.CurrentMillisClock; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.retrying.ExponentialRetryAlgorithm; +import com.google.api.gax.retrying.TimedAttemptSettings; +import com.google.api.gax.rpc.ApiException; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.Span; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Implements backoff and retry semantics for Firestore transactions. + * + *

A TransactionRunner is instantiated with a `userCallback`, a `userCallbackExecutor` and + * `numberOfAttempts`. Upon invoking {@link #run()}, the class invokes the provided callback on the + * specified executor at most `numberOfAttempts` times. {@link #run()} returns an ApiFuture that + * resolves when all retries complete. + * + *

TransactionRunner uses exponential backoff to increase the chance that retries succeed. To + * customize the backoff settings, you can specify custom settings via {@link FirestoreOptions}. + */ +class TransactionRunner { + + private static final Tracer tracer = Tracing.getTracer(); + private static final io.opencensus.trace.Status TOO_MANY_RETRIES_STATUS = + io.opencensus.trace.Status.ABORTED.withDescription("too many retries"); + private static final io.opencensus.trace.Status USER_CALLBACK_FAILED = + io.opencensus.trace.Status.ABORTED.withDescription("user callback failed"); + + private final Transaction.AsyncFunction userCallback; + private final Span span; + private final FirestoreImpl firestoreClient; + private final ScheduledExecutorService firestoreExecutor; + private final Executor userCallbackExecutor; + private final ExponentialRetryAlgorithm backoffAlgorithm; + private TimedAttemptSettings nextBackoffAttempt; + private Transaction transaction; + private int attemptsRemaining; + + /** + * @param firestore The active Firestore instance + * @param userCallback The user provided transaction callback + * @param userCallbackExecutor The executor to run the user callback on + * @param numberOfAttempts The total number of attempts for this transaction + */ + TransactionRunner( + FirestoreImpl firestore, + Transaction.AsyncFunction userCallback, + Executor userCallbackExecutor, + int numberOfAttempts) { + this.span = tracer.spanBuilder("CloudFirestore.Transaction").startSpan(); + this.firestoreClient = firestore; + this.firestoreExecutor = firestore.getClient().getExecutor(); + this.userCallback = userCallback; + this.attemptsRemaining = numberOfAttempts; + this.userCallbackExecutor = userCallbackExecutor; + this.backoffAlgorithm = + new ExponentialRetryAlgorithm( + firestore.getOptions().getRetrySettings(), CurrentMillisClock.getDefaultClock()); + this.nextBackoffAttempt = backoffAlgorithm.createFirstAttempt(); + } + + ApiFuture run() { + this.transaction = new Transaction(firestoreClient, this.transaction); + + --attemptsRemaining; + + span.addAnnotation( + "Start runTransaction", + ImmutableMap.of("attemptsRemaining", AttributeValue.longAttributeValue(attemptsRemaining))); + + final SettableApiFuture backoff = SettableApiFuture.create(); + + // Add a backoff delay. At first, this is 0. + this.firestoreExecutor.schedule( + new Runnable() { + @Override + public void run() { + backoff.set(null); + } + }, + nextBackoffAttempt.getRandomizedRetryDelay().toMillis(), + TimeUnit.MILLISECONDS); + + nextBackoffAttempt = backoffAlgorithm.createNextAttempt(nextBackoffAttempt); + + return ApiFutures.transformAsync( + backoff, new BackoffCallback(), MoreExecutors.directExecutor()); + } + + /** + * Invokes the user callback on the user callback executor and returns the user-provided result. + */ + private SettableApiFuture invokeUserCallback() { + final SettableApiFuture callbackResult = SettableApiFuture.create(); + userCallbackExecutor.execute( + new Runnable() { + @Override + public void run() { + ApiFutures.addCallback( + userCallback.updateCallback(transaction), + new ApiFutureCallback() { + @Override + public void onFailure(Throwable t) { + + callbackResult.setException(t); + } + + @Override + public void onSuccess(T result) { + callbackResult.set(result); + } + }, + MoreExecutors.directExecutor()); + } + }); + return callbackResult; + } + + /** A callback that invokes the BeginTransaction callback. */ + private class BackoffCallback implements ApiAsyncFunction { + @Override + public ApiFuture apply(Void input) { + return ApiFutures.transformAsync( + transaction.begin(), new BeginTransactionCallback(), MoreExecutors.directExecutor()); + } + } + + /** + * The callback for the BeginTransaction RPC, which invokes the user callback and handles all + * errors thereafter. + */ + private class BeginTransactionCallback implements ApiAsyncFunction { + public ApiFuture apply(Void ignored) { + return ApiFutures.catchingAsync( + ApiFutures.transformAsync( + invokeUserCallback(), new UserFunctionCallback(), MoreExecutors.directExecutor()), + Throwable.class, + new RestartTransactionCallback(), + MoreExecutors.directExecutor()); + } + } + + /** + * The callback that is invoked after the user function finishes execution. It invokes the Commit + * RPC. + */ + private class UserFunctionCallback implements ApiAsyncFunction { + @Override + public ApiFuture apply(T userFunctionResult) { + return ApiFutures.transform( + transaction.commit(), + new CommitTransactionCallback(userFunctionResult), + MoreExecutors.directExecutor()); + } + } + + /** The callback that is invoked after the Commit RPC returns. It returns the user result. */ + private class CommitTransactionCallback implements ApiFunction, T> { + private T userFunctionResult; + + CommitTransactionCallback(T userFunctionResult) { + this.userFunctionResult = userFunctionResult; + } + + @Override + public T apply(List input) { + span.setStatus(io.opencensus.trace.Status.OK); + span.end(); + return userFunctionResult; + } + } + + /** A callback that restarts a transaction after an ApiException. It invokes the Rollback RPC. */ + private class RestartTransactionCallback implements ApiAsyncFunction { + public ApiFuture apply(Throwable throwable) { + if (!(throwable instanceof ApiException)) { + // This is likely a failure in the user callback. + span.setStatus(USER_CALLBACK_FAILED); + return rollbackAndReject(throwable); + } + + ApiException apiException = (ApiException) throwable; + if (isRetryableTransactionError(apiException)) { + if (attemptsRemaining > 0) { + span.addAnnotation("retrying"); + return rollbackAndContinue(); + } else { + span.setStatus(TOO_MANY_RETRIES_STATUS); + final FirestoreException firestoreException = + FirestoreException.apiException( + apiException, "Transaction was cancelled because of too many retries."); + return rollbackAndReject(firestoreException); + } + } else { + span.setStatus(TraceUtil.statusFromApiException(apiException)); + final FirestoreException firestoreException = + FirestoreException.apiException( + apiException, "Transaction failed with non-retryable error"); + return rollbackAndReject(firestoreException); + } + } + + /** Determines whether the provided error is considered retryable. */ + private boolean isRetryableTransactionError(ApiException exception) { + switch (exception.getStatusCode().getCode()) { + // This list is based on + // https://github.com/firebase/firebase-js-sdk/blob/c822e78b00dd3420dcc749beb2f09a947aa4a344/packages/firestore/src/core/transaction_runner.ts#L112 + case ABORTED: + case CANCELLED: + case UNKNOWN: + case DEADLINE_EXCEEDED: + case INTERNAL: + case UNAVAILABLE: + case UNAUTHENTICATED: + case RESOURCE_EXHAUSTED: + return true; + default: + return false; + } + } + + /** Rolls the transaction back and attempts it again. */ + private ApiFuture rollbackAndContinue() { + return ApiFutures.transformAsync( + transaction.rollback(), + new ApiAsyncFunction() { + @Override + public ApiFuture apply(Void input) { + return run(); + } + }, + MoreExecutors.directExecutor()); + } + + /** Rolls the transaction back and returns the error. */ + private ApiFuture rollbackAndReject(final Throwable throwable) { + final SettableApiFuture failedTransaction = SettableApiFuture.create(); + // We use `addListener()` since we want to return the original exception regardless of whether + // rollback() succeeds. + transaction + .rollback() + .addListener( + new Runnable() { + @Override + public void run() { + failedTransaction.setException(throwable); + } + }, + MoreExecutors.directExecutor()); + span.end(); + return failedTransaction; + } + } +} diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java index f0783924e..fc07cc585 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java @@ -19,7 +19,6 @@ import com.google.api.core.CurrentMillisClock; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.retrying.ExponentialRetryAlgorithm; -import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiStreamObserver; @@ -49,7 +48,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; -import org.threeten.bp.Duration; /** * Watch provides listen functionality and exposes snapshot listeners. It can be used with any valid @@ -66,18 +64,6 @@ class Watch implements ApiStreamObserver { */ private static final int WATCH_TARGET_ID = 0x1; - private static RetrySettings RETRY_SETTINGS = - RetrySettings.newBuilder() - // The initial backoff time in seconds after an error. - // Set to 1s according to https://cloud.google.com/apis/design/errors. - .setInitialRetryDelay(Duration.ofSeconds(1)) - // The maximum backoff time in minutes. - .setMaxRetryDelay(Duration.ofMinutes(1)) - // The factor to increase the backup by after each failed attempt. - .setRetryDelayMultiplier(1.5) - .setJittered(true) - .build(); - private final FirestoreImpl firestore; private final ScheduledExecutorService firestoreExecutor; private final Query query; @@ -138,7 +124,8 @@ private Watch(FirestoreImpl firestore, Query query, Target target) { this.query = query; this.comparator = query.comparator(); this.backoff = - new ExponentialRetryAlgorithm(RETRY_SETTINGS, CurrentMillisClock.getDefaultClock()); + new ExponentialRetryAlgorithm( + firestore.getOptions().getRetrySettings(), CurrentMillisClock.getDefaultClock()); this.firestoreExecutor = firestore.getClient().getExecutor(); this.isActive = new AtomicBoolean(); this.nextAttempt = backoff.createFirstAttempt(); diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java index 41be685cf..148ce5585 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/LocalFirestoreHelper.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.cloud.Timestamp; import com.google.common.collect.ImmutableList; @@ -57,7 +58,6 @@ import com.google.type.LatLng; import java.io.IOException; import java.math.BigInteger; -import java.nio.charset.Charset; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -73,14 +73,23 @@ import javax.annotation.Nullable; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.threeten.bp.Duration; public final class LocalFirestoreHelper { + protected static RetrySettings IMMEDIATE_RETRY_SETTINGS = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ZERO) + .setMaxRetryDelay(Duration.ZERO) + .setRetryDelayMultiplier(1) + .setJittered(false) + .build(); + public static final String DATABASE_NAME; public static final String COLLECTION_ID; public static final String DOCUMENT_PATH; public static final String DOCUMENT_NAME; - public static final ByteString TRANSACTION_ID; + public static final String TRANSACTION_ID; public static final Map EMPTY_MAP_PROTO; @@ -285,12 +294,15 @@ public static BeginTransactionRequest begin() { return begin(null); } - public static BeginTransactionRequest begin(@Nullable ByteString previousTransactionId) { + public static BeginTransactionRequest begin(@Nullable String previousTransactionId) { BeginTransactionRequest.Builder begin = BeginTransactionRequest.newBuilder(); begin.setDatabase(DATABASE_NAME); if (previousTransactionId != null) { - begin.getOptionsBuilder().getReadWriteBuilder().setRetryTransaction(previousTransactionId); + begin + .getOptionsBuilder() + .getReadWriteBuilder() + .setRetryTransaction(ByteString.copyFromUtf8(previousTransactionId)); } return begin.build(); @@ -300,16 +312,20 @@ public static ApiFuture beginResponse() { return beginResponse(TRANSACTION_ID); } - public static ApiFuture beginResponse(ByteString transactionId) { + public static ApiFuture beginResponse(String transactionId) { BeginTransactionResponse.Builder beginResponse = BeginTransactionResponse.newBuilder(); - beginResponse.setTransaction(transactionId); + beginResponse.setTransaction(ByteString.copyFromUtf8(transactionId)); return ApiFutures.immediateFuture(beginResponse.build()); } public static RollbackRequest rollback() { + return rollback(TRANSACTION_ID); + } + + public static RollbackRequest rollback(String transactionId) { RollbackRequest.Builder rollback = RollbackRequest.newBuilder(); rollback.setDatabase(DATABASE_NAME); - rollback.setTransaction(TRANSACTION_ID); + rollback.setTransaction(ByteString.copyFromUtf8(transactionId)); return rollback.build(); } @@ -424,13 +440,13 @@ public static Write update( return write.build(); } - public static CommitRequest commit(@Nullable ByteString transactionId, Write... writes) { + public static CommitRequest commit(@Nullable String transactionId, Write... writes) { CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); commitRequest.setDatabase(DATABASE_NAME); commitRequest.addAllWrites(Arrays.asList(writes)); if (transactionId != null) { - commitRequest.setTransaction(transactionId); + commitRequest.setTransaction(ByteString.copyFromUtf8(transactionId)); } return commitRequest.build(); @@ -484,7 +500,7 @@ public static RunQueryRequest query(StructuredQuery... query) { } public static RunQueryRequest query( - @Nullable ByteString transactionId, boolean allDescendants, StructuredQuery... query) { + @Nullable String transactionId, boolean allDescendants, StructuredQuery... query) { RunQueryRequest.Builder request = RunQueryRequest.newBuilder(); request.setParent(LocalFirestoreHelper.DATABASE_NAME + "/documents"); StructuredQuery.Builder structuredQuery = request.getStructuredQueryBuilder(); @@ -512,7 +528,7 @@ public static RunQueryRequest query( } if (transactionId != null) { - request.setTransaction(transactionId); + request.setTransaction(ByteString.copyFromUtf8(transactionId)); } return request.build(); @@ -522,12 +538,12 @@ public static BatchGetDocumentsRequest get() { return getAll(null, DOCUMENT_NAME); } - public static BatchGetDocumentsRequest get(@Nullable ByteString transactionId) { + public static BatchGetDocumentsRequest get(@Nullable String transactionId) { return getAll(transactionId, DOCUMENT_NAME); } public static BatchGetDocumentsRequest getAll( - @Nullable ByteString transactionId, String... documentNames) { + @Nullable String transactionId, String... documentNames) { BatchGetDocumentsRequest.Builder request = BatchGetDocumentsRequest.newBuilder(); request.setDatabase(DATABASE_NAME); @@ -536,7 +552,7 @@ public static BatchGetDocumentsRequest getAll( } if (transactionId != null) { - request.setTransaction(transactionId); + request.setTransaction(ByteString.copyFromUtf8(transactionId)); } return request.build(); @@ -694,7 +710,7 @@ public boolean equals(Object o) { } static { - TRANSACTION_ID = ByteString.copyFrom("foo", Charset.defaultCharset()); + TRANSACTION_ID = "foo"; try { DATE = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S z").parse("1985-03-18 08:20:00.123 CET"); diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/TransactionTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/TransactionTest.java index 989d815e3..52b3150b8 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/TransactionTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/TransactionTest.java @@ -16,6 +16,7 @@ package com.google.cloud.firestore; +import static com.google.cloud.firestore.LocalFirestoreHelper.IMMEDIATE_RETRY_SETTINGS; import static com.google.cloud.firestore.LocalFirestoreHelper.SINGLE_FIELD_PROTO; import static com.google.cloud.firestore.LocalFirestoreHelper.TRANSACTION_ID; import static com.google.cloud.firestore.LocalFirestoreHelper.begin; @@ -42,19 +43,26 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.Timestamp; import com.google.cloud.firestore.spi.v1.FirestoreRpc; import com.google.firestore.v1.BatchGetDocumentsRequest; import com.google.firestore.v1.DocumentMask; import com.google.firestore.v1.Write; -import com.google.protobuf.ByteString; +import com.google.protobuf.GeneratedMessageV3; import com.google.protobuf.Message; +import io.grpc.Status; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -69,16 +77,26 @@ import org.mockito.Mockito; import org.mockito.Spy; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Stubber; @RunWith(MockitoJUnitRunner.class) public class TransactionTest { + private final ApiFuture RETRYABLE_API_EXCEPTION = + ApiFutures.immediateFailedFuture( + new ApiException( + new Exception("Test exception"), GrpcStatusCode.of(Status.Code.UNKNOWN), true)); + @Spy private FirestoreRpc firestoreRpc = Mockito.mock(FirestoreRpc.class); @Spy private FirestoreImpl firestoreMock = new FirestoreImpl( - FirestoreOptions.newBuilder().setProjectId("test-project").build(), firestoreRpc); + FirestoreOptions.newBuilder() + .setProjectId("test-project") + .setRetrySettings(IMMEDIATE_RETRY_SETTINGS) + .build(), + firestoreRpc); @Captor private ArgumentCaptor requestCapture; @Captor private ArgumentCaptor> streamObserverCapture; @@ -163,8 +181,6 @@ public ApiFuture updateCallback(Transaction transaction) { @Test public void canReturnNull() throws Exception { doReturn(beginResponse()) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo2"))) .doReturn(commitResponse(0, 0)) .when(firestoreMock) .sendRequest(requestCapture.capture(), Matchers.>any()); @@ -179,14 +195,12 @@ public String updateCallback(Transaction transaction) { }, options); - assertEquals(null, transaction.get()); + assertNull(transaction.get()); } @Test public void canReturnNullAsync() throws Exception { doReturn(beginResponse()) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo2"))) .doReturn(commitResponse(0, 0)) .when(firestoreMock) .sendRequest(requestCapture.capture(), Matchers.>any()); @@ -324,28 +338,40 @@ public ApiFuture updateCallback(Transaction transaction) { @Test public void limitsRetriesWithFailure() throws Exception { - doReturn(beginResponse(ByteString.copyFromUtf8("foo1"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo2"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo3"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo4"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo5"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .when(firestoreMock) - .sendRequest(requestCapture.capture(), Matchers.>any()); + RequestResponseMap requestResponseMap = + new RequestResponseMap() { + { + put(begin(), beginResponse("foo1")); + put(commit("foo1"), RETRYABLE_API_EXCEPTION); + put(rollback("foo1"), rollbackResponse()); + put(begin("foo1"), beginResponse("foo2")); + put(commit("foo2"), RETRYABLE_API_EXCEPTION); + put(rollback("foo2"), rollbackResponse()); + put(begin("foo2"), beginResponse("foo3")); + put(commit("foo3"), RETRYABLE_API_EXCEPTION); + put(rollback("foo3"), rollbackResponse()); + put(begin("foo3"), beginResponse("foo4")); + put(commit("foo4"), RETRYABLE_API_EXCEPTION); + put(rollback("foo4"), rollbackResponse()); + put(begin("foo4"), beginResponse("foo5")); + put(commit("foo5"), RETRYABLE_API_EXCEPTION); + put(rollback("foo5"), rollbackResponse()); + } + }; + + initializeStub(requestResponseMap); + + final AtomicInteger retryCount = new AtomicInteger(1); ApiFuture transaction = firestoreMock.runTransaction( new Transaction.Function() { @Override public String updateCallback(Transaction transaction) { - return "foo"; + return "foo" + retryCount.getAndIncrement(); } }, - options); + TransactionOptions.create(options.getExecutor(), 5)); try { transaction.get(); @@ -355,36 +381,40 @@ public String updateCallback(Transaction transaction) { } List requests = requestCapture.getAllValues(); - assertEquals(10, requests.size()); + assertEquals(requestResponseMap.size(), requests.size()); - assertEquals(begin(), requests.get(0)); - assertEquals(commit(ByteString.copyFromUtf8("foo1")), requests.get(1)); - assertEquals(begin(ByteString.copyFromUtf8("foo1")), requests.get(2)); - assertEquals(commit(ByteString.copyFromUtf8("foo2")), requests.get(3)); - assertEquals(begin(ByteString.copyFromUtf8("foo2")), requests.get(4)); - assertEquals(commit(ByteString.copyFromUtf8("foo3")), requests.get(5)); - assertEquals(begin(ByteString.copyFromUtf8("foo3")), requests.get(6)); - assertEquals(commit(ByteString.copyFromUtf8("foo4")), requests.get(7)); - assertEquals(begin(ByteString.copyFromUtf8("foo4")), requests.get(8)); - assertEquals(commit(ByteString.copyFromUtf8("foo5")), requests.get(9)); + int index = 0; + for (GeneratedMessageV3 request : requestResponseMap.keySet()) { + assertEquals(request, requests.get(index++)); + } } @Test public void limitsRetriesWithSuccess() throws Exception { - doReturn(beginResponse(ByteString.copyFromUtf8("foo1"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo2"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo3"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo4"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo5"))) - .doReturn(ApiFutures.immediateFailedFuture(new Exception())) - .doReturn(beginResponse(ByteString.copyFromUtf8("foo6"))) - .doReturn(commitResponse(0, 0)) - .when(firestoreMock) - .sendRequest(requestCapture.capture(), Matchers.>any()); + RequestResponseMap requestResponseMap = + new RequestResponseMap() { + { + put(begin(), beginResponse("foo1")); + put(commit("foo1"), RETRYABLE_API_EXCEPTION); + put(rollback("foo1"), rollbackResponse()); + put(begin("foo1"), beginResponse("foo2")); + put(commit("foo2"), RETRYABLE_API_EXCEPTION); + put(rollback("foo2"), rollbackResponse()); + put(begin("foo2"), beginResponse("foo3")); + put(commit("foo3"), RETRYABLE_API_EXCEPTION); + put(rollback("foo3"), rollbackResponse()); + put(begin("foo3"), beginResponse("foo4")); + put(commit("foo4"), RETRYABLE_API_EXCEPTION); + put(rollback("foo4"), rollbackResponse()); + put(begin("foo4"), beginResponse("foo5")); + put(commit("foo5"), RETRYABLE_API_EXCEPTION); + put(rollback("foo5"), rollbackResponse()); + put(begin("foo5"), beginResponse("foo6")); + put(commit("foo6"), commitResponse(0, 0)); + } + }; + + initializeStub(requestResponseMap); final AtomicInteger retryCount = new AtomicInteger(1); @@ -401,20 +431,109 @@ public String updateCallback(Transaction transaction) { assertEquals("foo6", transaction.get()); List requests = requestCapture.getAllValues(); - assertEquals(12, requests.size()); + assertEquals(requestResponseMap.size(), requests.size()); - assertEquals(begin(), requests.get(0)); - assertEquals(commit(ByteString.copyFromUtf8("foo1")), requests.get(1)); - assertEquals(begin(ByteString.copyFromUtf8("foo1")), requests.get(2)); - assertEquals(commit(ByteString.copyFromUtf8("foo2")), requests.get(3)); - assertEquals(begin(ByteString.copyFromUtf8("foo2")), requests.get(4)); - assertEquals(commit(ByteString.copyFromUtf8("foo3")), requests.get(5)); - assertEquals(begin(ByteString.copyFromUtf8("foo3")), requests.get(6)); - assertEquals(commit(ByteString.copyFromUtf8("foo4")), requests.get(7)); - assertEquals(begin(ByteString.copyFromUtf8("foo4")), requests.get(8)); - assertEquals(commit(ByteString.copyFromUtf8("foo5")), requests.get(9)); - assertEquals(begin(ByteString.copyFromUtf8("foo5")), requests.get(10)); - assertEquals(commit(ByteString.copyFromUtf8("foo6")), requests.get(11)); + int index = 0; + for (GeneratedMessageV3 request : requestResponseMap.keySet()) { + assertEquals(request, requests.get(index++)); + } + } + + @Test + public void retriesBasedOnErrorCode() throws Exception { + Map retryBehavior = + new HashMap() { + { + put(Status.Code.CANCELLED, true); + put(Status.Code.UNKNOWN, true); + put(Status.Code.INVALID_ARGUMENT, false); + put(Status.Code.DEADLINE_EXCEEDED, true); + put(Status.Code.NOT_FOUND, false); + put(Status.Code.ALREADY_EXISTS, false); + put(Status.Code.RESOURCE_EXHAUSTED, true); + put(Status.Code.FAILED_PRECONDITION, false); + put(Status.Code.ABORTED, true); + put(Status.Code.OUT_OF_RANGE, false); + put(Status.Code.UNIMPLEMENTED, false); + put(Status.Code.INTERNAL, true); + put(Status.Code.UNAVAILABLE, true); + put(Status.Code.DATA_LOSS, false); + put(Status.Code.UNAUTHENTICATED, true); + } + }; + + for (Map.Entry entry : retryBehavior.entrySet()) { + StatusCode code = GrpcStatusCode.of(entry.getKey()); + boolean shouldRetry = entry.getValue(); + + final ApiException apiException = + new ApiException(new Exception("Test Exception"), code, shouldRetry); + + if (shouldRetry) { + RequestResponseMap requestResponseMap = + new RequestResponseMap() { + { + put(begin(), beginResponse("foo1")); + put( + commit("foo1"), + ApiFutures.immediateFailedFuture(apiException)); + put(rollback("foo1"), rollbackResponse()); + put(begin("foo1"), beginResponse("foo2")); + put(commit("foo2"), commitResponse(0, 0)); + } + }; + initializeStub(requestResponseMap); + + final int[] attempts = new int[] {0}; + + ApiFuture transaction = + firestoreMock.runTransaction( + new Transaction.Function() { + @Override + public String updateCallback(Transaction transaction) { + ++attempts[0]; + return null; + } + }); + + transaction.get(); + + assertEquals(2, attempts[0]); + } else { + RequestResponseMap requestResponseMap = + new RequestResponseMap() { + { + put(begin(), beginResponse("foo1")); + put( + commit("foo1"), + ApiFutures.immediateFailedFuture(apiException)); + put(rollback("foo1"), rollbackResponse()); + } + }; + + initializeStub(requestResponseMap); + + final int[] attempts = new int[] {0}; + + ApiFuture transaction = + firestoreMock.runTransaction( + new Transaction.Function() { + @Override + public String updateCallback(Transaction transaction) { + ++attempts[0]; + return null; + } + }); + + try { + transaction.get(); + fail("Transaction should have failed"); + } catch (Exception ignored) { + } + + assertEquals(1, attempts[0]); + } + } } @Test @@ -747,4 +866,17 @@ public String updateCallback(Transaction transaction) { assertEquals(begin(), requests.get(0)); assertEquals(commit(TRANSACTION_ID, writes.toArray(new Write[] {})), requests.get(1)); } + + static class RequestResponseMap + extends LinkedHashMap> {} + + private void initializeStub(RequestResponseMap requestResponseMap) { + Stubber stubber = null; + for (ApiFuture response : requestResponseMap.values()) { + stubber = (stubber != null) ? stubber.doReturn(response) : doReturn(response); + } + stubber + .when(firestoreMock) + .sendRequest(requestCapture.capture(), Matchers.>any()); + } } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java index 2b71c28b0..6eeb1dbd8 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/WatchTest.java @@ -17,6 +17,7 @@ package com.google.cloud.firestore; import static com.google.cloud.firestore.LocalFirestoreHelper.DATABASE_NAME; +import static com.google.cloud.firestore.LocalFirestoreHelper.IMMEDIATE_RETRY_SETTINGS; import static com.google.cloud.firestore.LocalFirestoreHelper.SINGLE_FIELD_MAP; import static com.google.cloud.firestore.LocalFirestoreHelper.SINGLE_FIELD_PROTO; import static com.google.cloud.firestore.LocalFirestoreHelper.UPDATED_FIELD_MAP; @@ -97,7 +98,11 @@ public class WatchTest { @Spy private FirestoreImpl firestoreMock = new FirestoreImpl( - FirestoreOptions.newBuilder().setProjectId("test-project").build(), firestoreRpc); + FirestoreOptions.newBuilder() + .setProjectId("test-project") + .setRetrySettings(IMMEDIATE_RETRY_SETTINGS) + .build(), + firestoreRpc); @Captor private ArgumentCaptor> streamObserverCapture; diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java index 0565d05e3..a0379f061 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java @@ -544,6 +544,34 @@ public String updateCallback(Transaction transaction) { } } + @Test + public void doesNotRetryTransactionsWithFailedPreconditions() { + final DocumentReference documentReference = randomColl.document(); + + final AtomicInteger attempts = new AtomicInteger(); + + ApiFuture firstTransaction = + firestore.runTransaction( + new Transaction.Function() { + @Override + public Void updateCallback(Transaction transaction) { + attempts.incrementAndGet(); + transaction.update(documentReference, "foo", "bar"); + return null; + } + }); + + try { + firstTransaction.get(); + fail("ApiFuture should fail with ExecutionException"); + } catch (InterruptedException e) { + fail("ApiFuture should fail with ExecutionException"); + } catch (ExecutionException e) { + assertEquals(1, attempts.intValue()); + assertEquals(404, ((FirestoreException) e.getCause()).getCode()); + } + } + @Test public void successfulTransactionWithContention() throws Exception { final DocumentReference documentReference = addDocument("counter", 1);