From 146b21dd6d5772bfd9e023dbf5a1147b29076cdd Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Fri, 2 Jul 2021 17:23:41 -0700 Subject: [PATCH] fix: lower batch size on BulkWriter retry (#688) --- .../cloud/firestore/BulkCommitBatch.java | 15 +++++++- .../google/cloud/firestore/BulkWriter.java | 23 ++++++++++-- .../cloud/firestore/BulkWriterTest.java | 37 +++++++++++++++++++ 3 files changed, 71 insertions(+), 4 deletions(-) diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java index 5a368fd77..7d6dd81c6 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java @@ -42,10 +42,23 @@ class BulkCommitBatch extends UpdateBuilder> { final List pendingOperations = new ArrayList<>(); private final Set documents = new CopyOnWriteArraySet<>(); private final Executor executor; + private int maxBatchSize; - BulkCommitBatch(FirestoreImpl firestore, Executor executor) { + BulkCommitBatch(FirestoreImpl firestore, Executor executor, int maxBatchSize) { super(firestore); this.executor = executor; + this.maxBatchSize = maxBatchSize; + } + + int getMaxBatchSize() { + return maxBatchSize; + } + + void setMaxBatchSize(int size) { + Preconditions.checkState( + getMutationsSize() <= size, + "New batch size cannot be less than the number of enqueued writes"); + this.maxBatchSize = size; } ApiFuture wrapResult(int writeIndex) { diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java index b927dd6a5..365fa6c3e 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java @@ -29,6 +29,7 @@ import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.firestore.v1.FirestoreSettings; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.List; @@ -82,6 +83,9 @@ enum OperationType { /** The maximum number of writes that can be in a single batch. */ public static final int MAX_BATCH_SIZE = 20; + /** The maximum number of writes that can be in a batch containing retries. */ + public static final int RETRY_MAX_BATCH_SIZE = 10; + /** * The maximum number of retries that will be attempted with backoff before stopping all retry * attempts. @@ -237,7 +241,7 @@ public boolean onError(BulkWriterException error) { : Executors.newSingleThreadScheduledExecutor(); this.successExecutor = MoreExecutors.directExecutor(); this.errorExecutor = MoreExecutors.directExecutor(); - this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor); + this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize); if (!options.getThrottlingEnabled()) { this.rateLimiter = @@ -962,7 +966,7 @@ private void scheduleCurrentBatchLocked(final boolean flush) { if (bulkCommitBatch.getMutationsSize() == 0) return; final BulkCommitBatch pendingBatch = bulkCommitBatch; - bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor); + bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize); // Use the write with the longest backoff duration when determining backoff. int highestBackoffDuration = 0; @@ -1025,7 +1029,10 @@ public void run() { @VisibleForTesting void setMaxBatchSize(int size) { + Preconditions.checkState( + bulkCommitBatch.getMutationsSize() == 0, "BulkCommitBatch should be empty"); maxBatchSize = size; + bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, size); } @VisibleForTesting @@ -1050,6 +1057,16 @@ void setMaxPendingOpCount(int newMax) { private void sendOperationLocked( ApiFunction> enqueueOperationOnBatchCallback, final BulkWriterOperation op) { + // A backoff duration greater than 0 implies that this batch is a retry. + // Retried writes are sent with a batch size of 10 in order to guarantee + // that the batch is under the 10MiB limit. + if (op.getBackoffDuration() > 0) { + if (bulkCommitBatch.getMutationsSize() >= RETRY_MAX_BATCH_SIZE) { + scheduleCurrentBatchLocked(/* flush= */ false); + } + bulkCommitBatch.setMaxBatchSize(RETRY_MAX_BATCH_SIZE); + } + if (bulkCommitBatch.has(op.getDocumentReference())) { // Create a new batch since the backend doesn't support batches with two writes to the same // document. @@ -1062,7 +1079,7 @@ private void sendOperationLocked( bulkCommitBatch.enqueueOperation(op); enqueueOperationOnBatchCallback.apply(bulkCommitBatch); - if (bulkCommitBatch.getMutationsSize() == maxBatchSize) { + if (bulkCommitBatch.getMutationsSize() == bulkCommitBatch.getMaxBatchSize()) { scheduleCurrentBatchLocked(/* flush= */ false); } } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java index 5df76ff61..e3ae68635 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java @@ -44,6 +44,7 @@ import com.google.firestore.v1.BatchWriteRequest; import com.google.firestore.v1.BatchWriteResponse; import com.google.firestore.v1.Value; +import com.google.firestore.v1.Write; import com.google.protobuf.GeneratedMessageV3; import com.google.rpc.Code; import io.grpc.Status; @@ -727,6 +728,42 @@ public boolean onError(BulkWriterException error) { assertEquals(Timestamp.ofTimeSecondsAndNanos(1, 0), result1.get().getUpdateTime()); } + @Test + public void retriesWithSmallerBatchSize() throws Exception { + + final List writes = new ArrayList<>(); + final List> successResponses = new ArrayList<>(); + final List> failedResponses = new ArrayList<>(); + for (int i = 0; i < 15; i++) { + writes.add(set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc" + i)); + failedResponses.add(failedResponse(Code.ABORTED_VALUE)); + successResponses.add(successResponse(1)); + } + + ResponseStubber responseStubber = + new ResponseStubber() { + { + put( + batchWrite(writes.toArray(new Write[0])), + mergeResponses(failedResponses.toArray(new ApiFuture[0]))); + put( + batchWrite( + writes.subList(0, BulkWriter.RETRY_MAX_BATCH_SIZE).toArray(new Write[0])), + mergeResponses(successResponses.subList(0, 10).toArray(new ApiFuture[0]))); + put( + batchWrite( + writes.subList(BulkWriter.RETRY_MAX_BATCH_SIZE, 15).toArray(new Write[0])), + mergeResponses(successResponses.subList(10, 15).toArray(new ApiFuture[0]))); + } + }; + responseStubber.initializeStub(batchWriteCapture, firestoreMock); + + for (int i = 0; i < 15; i++) { + bulkWriter.set(firestoreMock.document("coll/doc" + i), LocalFirestoreHelper.SINGLE_FIELD_MAP); + } + bulkWriter.close(); + } + @Test public void retryResolvesBeforeFlush() throws Exception { ResponseStubber responseStubber =