diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java index b6801b93c..5a368fd77 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java @@ -39,7 +39,7 @@ /** Used to represent a batch that contains scheduled BulkWriterOperations. */ class BulkCommitBatch extends UpdateBuilder> { - private final List pendingOperations = new ArrayList<>(); + final List pendingOperations = new ArrayList<>(); private final Set documents = new CopyOnWriteArraySet<>(); private final Executor executor; diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java index 9779aa3d2..6f6bf755b 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java @@ -16,6 +16,8 @@ package com.google.cloud.firestore; +import static com.google.cloud.firestore.BulkWriterOperation.DEFAULT_BACKOFF_MAX_DELAY_MS; + import com.google.api.core.ApiAsyncFunction; import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; @@ -108,6 +110,12 @@ enum OperationType { */ private static final int RATE_LIMITER_MULTIPLIER_MILLIS = 5 * 60 * 1000; + /** + * The default jitter to apply to the exponential backoff used in retries. For example, a factor + * of 0.3 means a 30% jitter is applied. + */ + static final double DEFAULT_JITTER_FACTOR = 0.3; + private static final WriteResultCallback DEFAULT_SUCCESS_LISTENER = new WriteResultCallback() { public void onResult(DocumentReference documentReference, WriteResult result) {} @@ -681,7 +689,7 @@ public ApiFuture flush() { private ApiFuture flushLocked() { verifyNotClosedLocked(); - sendCurrentBatchLocked(/* flush= */ true); + scheduleCurrentBatchLocked(/* flush= */ true); return lastOperation; } @@ -846,37 +854,61 @@ public void addWriteErrorListener(@Nonnull Executor executor, WriteErrorCallback * This allows retries to resolve as part of a {@link BulkWriter#flush()} or {@link * BulkWriter#close()} call. */ - private void sendCurrentBatchLocked(final boolean flush) { + private void scheduleCurrentBatchLocked(final boolean flush) { if (bulkCommitBatch.getMutationsSize() == 0) return; - BulkCommitBatch pendingBatch = bulkCommitBatch; + + final BulkCommitBatch pendingBatch = bulkCommitBatch; bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor); - // Send the batch if it is under the rate limit, or schedule another attempt after the + // Use the write with the longest backoff duration when determining backoff. + int highestBackoffDuration = 0; + for (BulkWriterOperation op : pendingBatch.pendingOperations) { + if (op.getBackoffDuration() > highestBackoffDuration) { + highestBackoffDuration = op.getBackoffDuration(); + } + } + final int backoffMsWithJitter = applyJitter(highestBackoffDuration); + + bulkWriterExecutor.schedule( + new Runnable() { + @Override + public void run() { + synchronized (lock) { + sendBatchLocked(pendingBatch, flush); + } + } + }, + backoffMsWithJitter, + TimeUnit.MILLISECONDS); + } + + /** Sends the provided batch once the rate limiter does not require any delay. */ + private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) { + // Send the batch if it is does not require any delay, or schedule another attempt after the // appropriate timeout. - boolean underRateLimit = rateLimiter.tryMakeRequest(pendingBatch.getMutationsSize()); + boolean underRateLimit = rateLimiter.tryMakeRequest(batch.getMutationsSize()); if (underRateLimit) { - pendingBatch + batch .bulkCommit() .addListener( new Runnable() { @Override public void run() { synchronized (lock) { - sendCurrentBatchLocked(flush); + scheduleCurrentBatchLocked(flush); } } }, bulkWriterExecutor); - } else { - long delayMs = rateLimiter.getNextRequestDelayMs(pendingBatch.getMutationsSize()); + long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize()); logger.log(Level.FINE, String.format("Backing off for %d seconds", delayMs / 1000)); bulkWriterExecutor.schedule( new Runnable() { @Override public void run() { synchronized (lock) { - sendCurrentBatchLocked(flush); + sendBatchLocked(batch, flush); } } }, @@ -905,7 +937,7 @@ private void sendOperationLocked( if (bulkCommitBatch.has(op.getDocumentReference())) { // Create a new batch since the backend doesn't support batches with two writes to the same // document. - sendCurrentBatchLocked(/* flush= */ false); + scheduleCurrentBatchLocked(/* flush= */ false); } // Run the operation on the current batch and advance the `lastOperation` pointer. This @@ -926,7 +958,7 @@ public ApiFuture apply(Void aVoid) { MoreExecutors.directExecutor()); if (bulkCommitBatch.getMutationsSize() == maxBatchSize) { - sendCurrentBatchLocked(/* flush= */ false); + scheduleCurrentBatchLocked(/* flush= */ false); } } @@ -989,4 +1021,11 @@ public void onSuccess(T writeResult) { MoreExecutors.directExecutor()); return flushCallback; } + + private int applyJitter(int backoffMs) { + if (backoffMs == 0) return 0; + // Random value in [-0.3, 0.3]. + double jitter = DEFAULT_JITTER_FACTOR * (Math.random() * 2 - 1); + return (int) Math.min(DEFAULT_BACKOFF_MAX_DELAY_MS, backoffMs + jitter * backoffMs); + } } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOperation.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOperation.java index 3642d8751..c9b0b872f 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOperation.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOperation.java @@ -22,6 +22,7 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Status; /** * Represents a single write for BulkWriter, encapsulating operation dispatch and error handling. @@ -34,7 +35,22 @@ class BulkWriterOperation { private final ApiFunction> successListener; private final ApiFunction> errorListener; + /** + * The default initial backoff time in milliseconds after an error. Set to 1s according to + * https://cloud.google.com/apis/design/errors. + */ + public static final int DEFAULT_BACKOFF_INITIAL_DELAY_MS = 1000; + + /** The default maximum backoff time in milliseconds when retrying an operation. */ + public static final int DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000; + + /** The default factor to increase the backup by after each failed attempt. */ + public static final double DEFAULT_BACKOFF_FACTOR = 1.5; + private int failedAttempts = 0; + private Status lastStatus; + + private int backoffDuration = 0; /** * @param documentReference The document reference being written to. @@ -68,6 +84,10 @@ public DocumentReference getDocumentReference() { return documentReference; } + public int getBackoffDuration() { + return backoffDuration; + } + /** Callback invoked when an operation attempt fails. */ public ApiFuture onException(FirestoreException exception) { ++failedAttempts; @@ -94,6 +114,8 @@ public void onFailure(Throwable throwable) { @Override public void onSuccess(Boolean shouldRetry) { if (shouldRetry) { + lastStatus = bulkWriterException.getStatus(); + updateBackoffDuration(); scheduleWriteCallback.apply(BulkWriterOperation.this); } else { operationFuture.setException(bulkWriterException); @@ -106,6 +128,16 @@ public void onSuccess(Boolean shouldRetry) { return callbackFuture; } + private void updateBackoffDuration() { + if (lastStatus == Status.RESOURCE_EXHAUSTED) { + backoffDuration = DEFAULT_BACKOFF_MAX_DELAY_MS; + } else if (backoffDuration == 0) { + backoffDuration = DEFAULT_BACKOFF_INITIAL_DELAY_MS; + } else { + backoffDuration *= DEFAULT_BACKOFF_FACTOR; + } + } + /** Callback invoked when the operation succeeds. */ public ApiFuture onSuccess(final WriteResult result) { final SettableApiFuture callbackFuture = SettableApiFuture.create(); diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java index 607c6a902..289952238 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java @@ -91,6 +91,13 @@ public class BulkWriterTest { GrpcStatusCode.of(Status.Code.ABORTED), true)); + private static final ApiFuture RESOURCE_EXHAUSTED_FAILED_FUTURE = + ApiFutures.immediateFailedFuture( + new ApiException( + new IllegalStateException("Mock batchWrite failed in test"), + GrpcStatusCode.of(Status.Code.RESOURCE_EXHAUSTED), + true)); + @Rule public Timeout timeout = new Timeout(1, TimeUnit.SECONDS); @Spy private final FirestoreRpc firestoreRpc = Mockito.mock(FirestoreRpc.class); @@ -151,7 +158,18 @@ private ApiFuture mergeResponses(ApiFuture schedule(Runnable command, long delay, TimeUnit unit) { + return super.schedule(command, 0, TimeUnit.MILLISECONDS); + } + }; + + bulkWriter = + firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build()); doc1 = firestoreMock.document("coll/doc1"); doc2 = firestoreMock.document("coll/doc2"); } @@ -1074,6 +1092,26 @@ public void retriesWritesWhenBatchWriteFailsWithRetryableError() throws Exceptio @Test public void failsWritesAfterAllRetryAttemptsFail() throws Exception { final int[] retryAttempts = {0}; + final int[] scheduleWithDelayCount = {0}; + final ScheduledExecutorService timeoutExecutor = + new ScheduledThreadPoolExecutor(1) { + @Override + @Nonnull + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + if (delay > 0) { + int expected = + (int) + (BulkWriterOperation.DEFAULT_BACKOFF_INITIAL_DELAY_MS + * Math.pow(1.5, retryAttempts[0] - 1)); + + assertTrue(delay >= (1 - BulkWriter.DEFAULT_JITTER_FACTOR) * expected); + assertTrue(delay <= (1 + BulkWriter.DEFAULT_JITTER_FACTOR) * expected); + scheduleWithDelayCount[0]++; + } + return super.schedule(command, 0, TimeUnit.MILLISECONDS); + } + }; + doAnswer( new Answer>() { public ApiFuture answer(InvocationOnMock mock) { @@ -1085,6 +1123,9 @@ public ApiFuture answer(InvocationOnMock mock) { .sendRequest( batchWriteCapture.capture(), Matchers.>any()); + + bulkWriter = + firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build()); ApiFuture result = bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP); bulkWriter.flush().get(); @@ -1093,10 +1134,165 @@ public ApiFuture answer(InvocationOnMock mock) { Assert.fail("Expected set() operation to fail"); } catch (Exception e) { assertTrue(e.getMessage().contains("Mock batchWrite failed in test")); - assertEquals(retryAttempts[0], BulkWriter.MAX_RETRY_ATTEMPTS + 1); + assertEquals(BulkWriter.MAX_RETRY_ATTEMPTS + 1, retryAttempts[0]); + // The first attempt should not have a delay. + assertEquals(BulkWriter.MAX_RETRY_ATTEMPTS, scheduleWithDelayCount[0]); + } + } + + @Test + public void appliesMaxBackoffOnRetriesForResourceExhausted() throws Exception { + final int[] retryAttempts = {0}; + final int[] scheduleWithDelayCount = {0}; + final ScheduledExecutorService timeoutExecutor = + new ScheduledThreadPoolExecutor(1) { + @Override + @Nonnull + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + if (delay > 0) { + assertTrue( + delay + >= (1 - BulkWriter.DEFAULT_JITTER_FACTOR) + * BulkWriterOperation.DEFAULT_BACKOFF_MAX_DELAY_MS); + assertTrue( + delay + <= (1 + BulkWriter.DEFAULT_JITTER_FACTOR) + * BulkWriterOperation.DEFAULT_BACKOFF_MAX_DELAY_MS); + scheduleWithDelayCount[0]++; + } + return super.schedule(command, 0, TimeUnit.MILLISECONDS); + } + }; + + doAnswer( + new Answer>() { + public ApiFuture answer(InvocationOnMock mock) { + retryAttempts[0]++; + return RESOURCE_EXHAUSTED_FAILED_FUTURE; + } + }) + .when(firestoreMock) + .sendRequest( + batchWriteCapture.capture(), + Matchers.>any()); + + bulkWriter = + firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build()); + bulkWriter.addWriteErrorListener( + new WriteErrorCallback() { + public boolean onError(BulkWriterException error) { + return error.getFailedAttempts() < 5; + } + }); + + ApiFuture result = bulkWriter.create(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP); + bulkWriter.flush().get(); + + try { + result.get(); + Assert.fail("Expected create() operation to fail"); + } catch (Exception e) { + assertTrue(e.getMessage().contains("Mock batchWrite failed in test")); + assertEquals(5, retryAttempts[0]); + // The first attempt should not have a delay. + assertEquals(4, scheduleWithDelayCount[0]); } } + @Test + public void usesHighestBackoffFoundInBatch() throws Exception { + final int[] expected = { + BulkWriterOperation.DEFAULT_BACKOFF_MAX_DELAY_MS, + (int) + (BulkWriterOperation.DEFAULT_BACKOFF_INITIAL_DELAY_MS + * BulkWriterOperation.DEFAULT_BACKOFF_FACTOR) + }; + final int[] retryAttempts = {0}; + final ScheduledExecutorService timeoutExecutor = + new ScheduledThreadPoolExecutor(1) { + @Override + @Nonnull + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + if (delay > 0) { + // The 1st batch should have max backoff. 2nd batch should have 1 round of backoff + // applied. + assertTrue( + delay >= (1 - BulkWriter.DEFAULT_JITTER_FACTOR) * expected[retryAttempts[0]]); + assertTrue( + delay <= (1 + BulkWriter.DEFAULT_JITTER_FACTOR) * expected[retryAttempts[0]]); + retryAttempts[0]++; + } + return super.schedule(command, 0, TimeUnit.MILLISECONDS); + } + }; + + ResponseStubber responseStubber = + new ResponseStubber() { + { + put( + batchWrite( + create(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc1"), + set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc2")), + mergeResponses( + failedResponse(Code.RESOURCE_EXHAUSTED_VALUE), + failedResponse(Code.UNAVAILABLE_VALUE))); + put( + batchWrite( + create(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc1"), + set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc2")), + mergeResponses(successResponse(1), failedResponse(Code.UNAVAILABLE_VALUE))); + put( + batchWrite(set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc2")), + successResponse(2)); + } + }; + responseStubber.initializeStub(batchWriteCapture, firestoreMock); + + bulkWriter = + firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build()); + bulkWriter.addWriteErrorListener( + new WriteErrorCallback() { + public boolean onError(BulkWriterException error) { + return error.getFailedAttempts() < 5; + } + }); + + bulkWriter.create(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP); + bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP); + bulkWriter.close(); + assertEquals(2, retryAttempts[0]); + } + + @Test + public void sendsBackoffBatchAfterOtherEnqueuedBatches() throws Exception { + ResponseStubber responseStubber = + new ResponseStubber() { + { + put( + batchWrite(create(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc1")), + failedResponse(Code.RESOURCE_EXHAUSTED_VALUE)); + put( + batchWrite(set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc2")), + successResponse(0)); + put( + batchWrite(create(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc1")), + successResponse(0)); + } + }; + responseStubber.initializeStub(batchWriteCapture, firestoreMock); + + bulkWriter.addWriteErrorListener( + new WriteErrorCallback() { + public boolean onError(BulkWriterException error) { + return error.getFailedAttempts() < 5; + } + }); + bulkWriter.create(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP); + bulkWriter.flush(); + bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP); + bulkWriter.close(); + } + @Test public void optionsRequiresPositiveInteger() throws Exception { try {