diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java index b6801b93c..5a368fd77 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java @@ -39,7 +39,7 @@ /** Used to represent a batch that contains scheduled BulkWriterOperations. */ class BulkCommitBatch extends UpdateBuilder> { - private final List pendingOperations = new ArrayList<>(); + final List pendingOperations = new ArrayList<>(); private final Set documents = new CopyOnWriteArraySet<>(); private final Executor executor; diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java index 9779aa3d2..cae35b276 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java @@ -16,6 +16,8 @@ package com.google.cloud.firestore; +import static com.google.cloud.firestore.BulkWriterOperation.DEFAULT_BACKOFF_MAX_DELAY_MS; + import com.google.api.core.ApiAsyncFunction; import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; @@ -108,6 +110,12 @@ enum OperationType { */ private static final int RATE_LIMITER_MULTIPLIER_MILLIS = 5 * 60 * 1000; + /** + * The default jitter to apply to the exponential backoff used in retries. For example, a factor + * of 0.3 means a 30% jitter is applied. + */ + private static final double DEFAULT_JITTER_FACTOR = 0.3; + private static final WriteResultCallback DEFAULT_SUCCESS_LISTENER = new WriteResultCallback() { public void onResult(DocumentReference documentReference, WriteResult result) {} @@ -681,7 +689,7 @@ public ApiFuture flush() { private ApiFuture flushLocked() { verifyNotClosedLocked(); - sendCurrentBatchLocked(/* flush= */ true); + scheduleCurrentBatchLocked(/* flush= */ true); return lastOperation; } @@ -846,37 +854,62 @@ public void addWriteErrorListener(@Nonnull Executor executor, WriteErrorCallback * This allows retries to resolve as part of a {@link BulkWriter#flush()} or {@link * BulkWriter#close()} call. */ - private void sendCurrentBatchLocked(final boolean flush) { + private void scheduleCurrentBatchLocked(final boolean flush) { if (bulkCommitBatch.getMutationsSize() == 0) return; - BulkCommitBatch pendingBatch = bulkCommitBatch; + + final BulkCommitBatch pendingBatch = bulkCommitBatch; bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor); - // Send the batch if it is under the rate limit, or schedule another attempt after the + // Use the write with the longest backoff duration when determining backoff. + int highestBackoffDuration = 0; + for (BulkWriterOperation op : pendingBatch.pendingOperations) { + if (op.getBackoffDuration() > highestBackoffDuration) { + highestBackoffDuration = op.getBackoffDuration(); + } + } + int backoffMsWithJitter = applyJitter(highestBackoffDuration); + + bulkWriterExecutor.schedule( + new Runnable() { + @Override + public void run() { + synchronized (lock) { + sendBatchLocked(pendingBatch, flush); + } + } + }, + backoffMsWithJitter, + TimeUnit.MILLISECONDS); + } + + /** Sends the provided batch once the rate limiter does not require any delay. */ + private void sendBatchLocked(BulkCommitBatch batch, final boolean flush) { + // Send the batch if it is does not require any delay, or schedule another attempt after the // appropriate timeout. - boolean underRateLimit = rateLimiter.tryMakeRequest(pendingBatch.getMutationsSize()); + boolean underRateLimit = rateLimiter.tryMakeRequest(batch.getMutationsSize()); if (underRateLimit) { - pendingBatch + batch .bulkCommit() .addListener( new Runnable() { @Override public void run() { synchronized (lock) { - sendCurrentBatchLocked(flush); + scheduleCurrentBatchLocked(flush); } } }, bulkWriterExecutor); - } else { - long delayMs = rateLimiter.getNextRequestDelayMs(pendingBatch.getMutationsSize()); + long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize()); + System.out.println("over rate" + delayMs); logger.log(Level.FINE, String.format("Backing off for %d seconds", delayMs / 1000)); bulkWriterExecutor.schedule( new Runnable() { @Override public void run() { synchronized (lock) { - sendCurrentBatchLocked(flush); + sendBatchLocked(bulkCommitBatch, flush); } } }, @@ -905,7 +938,7 @@ private void sendOperationLocked( if (bulkCommitBatch.has(op.getDocumentReference())) { // Create a new batch since the backend doesn't support batches with two writes to the same // document. - sendCurrentBatchLocked(/* flush= */ false); + scheduleCurrentBatchLocked(/* flush= */ false); } // Run the operation on the current batch and advance the `lastOperation` pointer. This @@ -926,7 +959,7 @@ public ApiFuture apply(Void aVoid) { MoreExecutors.directExecutor()); if (bulkCommitBatch.getMutationsSize() == maxBatchSize) { - sendCurrentBatchLocked(/* flush= */ false); + scheduleCurrentBatchLocked(/* flush= */ false); } } @@ -989,4 +1022,11 @@ public void onSuccess(T writeResult) { MoreExecutors.directExecutor()); return flushCallback; } + + private int applyJitter(int backoffMs) { + if (backoffMs == 0) return 0; + // Random value in [-0.3, 0.3]. + double jitter = DEFAULT_JITTER_FACTOR * (Math.random() * 2 - 1); + return (int) Math.min(DEFAULT_BACKOFF_MAX_DELAY_MS, backoffMs + jitter * backoffMs); + } } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOperation.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOperation.java index 3642d8751..7d3a45103 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOperation.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOperation.java @@ -22,6 +22,7 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Status; /** * Represents a single write for BulkWriter, encapsulating operation dispatch and error handling. @@ -34,7 +35,22 @@ class BulkWriterOperation { private final ApiFunction> successListener; private final ApiFunction> errorListener; + /** + * The default initial backoff time in milliseconds after an error. Set to 1s according to + * https://cloud.google.com/apis/design/errors. + */ + private static final int DEFAULT_BACKOFF_INITIAL_DELAY_MS = 1000; + + /** The default maximum backoff time in milliseconds when retrying an operation. */ + public static final int DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000; + + /** The default factor to increase the backup by after each failed attempt. */ + private static final double DEFAULT_BACKOFF_FACTOR = 1.5; + private int failedAttempts = 0; + private Status lastStatus; + + private int backoffDuration = 0; /** * @param documentReference The document reference being written to. @@ -68,6 +84,10 @@ public DocumentReference getDocumentReference() { return documentReference; } + public int getBackoffDuration() { + return backoffDuration; + } + /** Callback invoked when an operation attempt fails. */ public ApiFuture onException(FirestoreException exception) { ++failedAttempts; @@ -94,6 +114,8 @@ public void onFailure(Throwable throwable) { @Override public void onSuccess(Boolean shouldRetry) { if (shouldRetry) { + lastStatus = bulkWriterException.getStatus(); + updateBackoffDuration(); scheduleWriteCallback.apply(BulkWriterOperation.this); } else { operationFuture.setException(bulkWriterException); @@ -106,6 +128,16 @@ public void onSuccess(Boolean shouldRetry) { return callbackFuture; } + private void updateBackoffDuration() { + if (lastStatus == Status.RESOURCE_EXHAUSTED) { + backoffDuration = DEFAULT_BACKOFF_MAX_DELAY_MS; + } else if (backoffDuration == 0) { + backoffDuration = DEFAULT_BACKOFF_INITIAL_DELAY_MS; + } else { + backoffDuration *= DEFAULT_BACKOFF_FACTOR; + } + } + /** Callback invoked when the operation succeeds. */ public ApiFuture onSuccess(final WriteResult result) { final SettableApiFuture callbackFuture = SettableApiFuture.create(); diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RateLimiter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RateLimiter.java index d8ad3c674..3caca7fbf 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RateLimiter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RateLimiter.java @@ -93,6 +93,7 @@ public boolean tryMakeRequest(int numOperations) { */ public boolean tryMakeRequest(int numOperations, long requestTimeMillis) { refillTokens(requestTimeMillis); + System.out.println("numOperations: " + numOperations + ", availableTokens: " + availableTokens); if (numOperations <= availableTokens) { availableTokens -= numOperations; return true; @@ -143,6 +144,8 @@ private void refillTokens(long requestTimeMillis) { if (tokensToAdd > 0) { availableTokens = Math.min(capacity, availableTokens + tokensToAdd); lastRefillTimeMillis = requestTimeMillis; + } else { + System.out.println("elapsed: " + elapsedTime + ", lastRefillMillis: " + lastRefillTimeMillis); } } @@ -153,6 +156,7 @@ public int calculateCapacity(long requestTimeMillis) { (int) (Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity), maximumRate); + System.out.println("capacity: " + operationsPerSecond); return operationsPerSecond; } } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java index 607c6a902..ed1ebd86b 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java @@ -61,9 +61,7 @@ import javax.annotation.Nonnull; import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; @@ -91,7 +89,7 @@ public class BulkWriterTest { GrpcStatusCode.of(Status.Code.ABORTED), true)); - @Rule public Timeout timeout = new Timeout(1, TimeUnit.SECONDS); + // @Rule public Timeout timeout = new Timeout(1, TimeUnit.SECONDS); @Spy private final FirestoreRpc firestoreRpc = Mockito.mock(FirestoreRpc.class); @@ -151,7 +149,18 @@ private ApiFuture mergeResponses(ApiFuture schedule(Runnable command, long delay, TimeUnit unit) { + return super.schedule(command, 0, TimeUnit.MILLISECONDS); + } + }; + + bulkWriter = + firestoreMock.bulkWriter(BulkWriterOptions.builder().setExecutor(timeoutExecutor).build()); doc1 = firestoreMock.document("coll/doc1"); doc2 = firestoreMock.document("coll/doc2"); }