From a7caff2c15ad8ad4e98165bf4029d9615c079637 Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Thu, 29 Apr 2021 10:19:45 -0500 Subject: [PATCH] feat: add buffering layer to BulkWriter (#611) --- .../google/cloud/firestore/BulkWriter.java | 121 ++++++++++++++++-- .../cloud/firestore/BulkWriterTest.java | 37 ++++++ 2 files changed, 144 insertions(+), 14 deletions(-) diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java index 6f6bf755b..8e104ed7a 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java @@ -25,10 +25,13 @@ import com.google.api.core.ApiFutures; import com.google.api.core.BetaApi; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.firestore.v1.FirestoreSettings; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.MoreExecutors; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -110,6 +113,14 @@ enum OperationType { */ private static final int RATE_LIMITER_MULTIPLIER_MILLIS = 5 * 60 * 1000; + /** + * The default maximum number of pending operations that can be enqueued onto a BulkWriter + * instance. An operation is considered pending if BulkWriter has sent it via RPC and is awaiting + * the result. BulkWriter buffers additional writes after this many pending operations in order to + * avoiding going OOM. + */ + private static final int DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT = 500; + /** * The default jitter to apply to the exponential backoff used in retries. For example, a factor * of 0.3 means a 30% jitter is applied. @@ -158,6 +169,26 @@ public boolean onError(BulkWriterException error) { @GuardedBy("lock") private final RateLimiter rateLimiter; + /** + * The number of pending operations enqueued on this BulkWriter instance. An operation is + * considered pending if BulkWriter has sent it via RPC and is awaiting the result. + */ + @GuardedBy("lock") + private int pendingOpsCount = 0; + + /** + * An array containing buffered BulkWriter operations after the maximum number of pending + * operations has been enqueued. + */ + @GuardedBy("lock") + private final List bufferedOperations = new ArrayList<>(); + + /** + * The maximum number of pending operations that can be enqueued onto this BulkWriter instance. + * Once the this number of writes have been enqueued, subsequent writes are buffered. + */ + private int maxPendingOpCount = DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT; + /** * The batch that is currently used to schedule operations. Once this batch reaches maximum * capacity, a new batch is created. @@ -627,7 +658,7 @@ private ApiFuture executeWrite( final DocumentReference documentReference, final OperationType operationType, final ApiFunction> enqueueOperationOnBatchCallback) { - BulkWriterOperation operation = + final BulkWriterOperation operation = new BulkWriterOperation( documentReference, operationType, @@ -660,10 +691,73 @@ public ApiFuture apply(BulkWriterException e) { synchronized (lock) { verifyNotClosedLocked(); writesEnqueued = true; - sendOperationLocked(enqueueOperationOnBatchCallback, operation); + + // Advance the lastOperation pointer. This ensures that lastOperation only completes when + // both the previous and the current write complete. + lastOperation = + ApiFutures.transformAsync( + lastOperation, + new ApiAsyncFunction() { + @Override + public ApiFuture apply(Void aVoid) { + return silenceFuture(operation.getFuture()); + } + }, + MoreExecutors.directExecutor()); + + // Schedule the operation if the BulkWriter has fewer than the maximum number of allowed + // pending operations, or add the operation to the buffer. + if (pendingOpsCount < maxPendingOpCount) { + pendingOpsCount++; + sendOperationLocked(enqueueOperationOnBatchCallback, operation); + } else { + bufferedOperations.add( + new Runnable() { + @Override + public void run() { + synchronized (lock) { + pendingOpsCount++; + sendOperationLocked(enqueueOperationOnBatchCallback, operation); + } + } + }); + } } - return operation.getFuture(); + ApiFuture processedOperationFuture = + ApiFutures.transformAsync( + operation.getFuture(), + new ApiAsyncFunction() { + public ApiFuture apply(WriteResult result) throws Exception { + pendingOpsCount--; + processBufferedOperations(); + return ApiFutures.immediateFuture(result); + } + }, + MoreExecutors.directExecutor()); + + return ApiFutures.catchingAsync( + processedOperationFuture, + ApiException.class, + new ApiAsyncFunction() { + public ApiFuture apply(ApiException e) throws Exception { + pendingOpsCount--; + processBufferedOperations(); + throw e; + } + }, + MoreExecutors.directExecutor()); + } + + /** + * Manages the pending operation counter and schedules the next BulkWriter operation if we're + * under the maximum limit. + */ + private void processBufferedOperations() { + if (pendingOpsCount < maxPendingOpCount && bufferedOperations.size() > 0) { + Runnable nextOp = bufferedOperations.remove(0); + nextOp.run(); + } } /** @@ -927,6 +1021,16 @@ RateLimiter getRateLimiter() { return rateLimiter; } + @VisibleForTesting + int getBufferedOperationsCount() { + return bufferedOperations.size(); + } + + @VisibleForTesting + void setMaxPendingOpCount(int newMax) { + maxPendingOpCount = newMax; + } + /** * Schedules the provided operations on the current BulkCommitBatch. Sends the BulkCommitBatch if * it reaches maximum capacity. @@ -946,17 +1050,6 @@ private void sendOperationLocked( bulkCommitBatch.enqueueOperation(op); enqueueOperationOnBatchCallback.apply(bulkCommitBatch); - lastOperation = - ApiFutures.transformAsync( - lastOperation, - new ApiAsyncFunction() { - @Override - public ApiFuture apply(Void aVoid) { - return silenceFuture(op.getFuture()); - } - }, - MoreExecutors.directExecutor()); - if (bulkCommitBatch.getMutationsSize() == maxBatchSize) { scheduleCurrentBatchLocked(/* flush= */ false); } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java index 289952238..daf90171c 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java @@ -422,6 +422,41 @@ public void sendWritesToDifferentDocsInSameBatch() throws Exception { assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime()); } + @Test + public void buffersSubsequentOpsAfterReachingMaxPendingOpCount() throws Exception { + ResponseStubber responseStubber = + new ResponseStubber() { + { + put( + batchWrite( + set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc1"), + set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc2"), + set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc3")), + mergeResponses( + successResponse(1), + successResponse(2), + failedResponse(Code.FAILED_PRECONDITION_VALUE))); + put( + batchWrite( + set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc4"), + set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc5")), + mergeResponses(successResponse(4), successResponse(5))); + } + }; + responseStubber.initializeStub(batchWriteCapture, firestoreMock); + + bulkWriter.setMaxPendingOpCount(3); + bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP); + bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP); + bulkWriter.set(firestoreMock.document("coll/doc3"), LocalFirestoreHelper.SINGLE_FIELD_MAP); + bulkWriter.set(firestoreMock.document("coll/doc4"), LocalFirestoreHelper.SINGLE_FIELD_MAP); + assertEquals(1, bulkWriter.getBufferedOperationsCount()); + bulkWriter.set(firestoreMock.document("coll/doc5"), LocalFirestoreHelper.SINGLE_FIELD_MAP); + assertEquals(2, bulkWriter.getBufferedOperationsCount()); + bulkWriter.close(); + responseStubber.verifyAllRequestsSent(); + } + @Test public void runsSuccessHandler() throws Exception { ResponseStubber responseStubber = @@ -1260,6 +1295,7 @@ public boolean onError(BulkWriterException error) { bulkWriter.create(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP); bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP); bulkWriter.close(); + responseStubber.verifyAllRequestsSent(); assertEquals(2, retryAttempts[0]); } @@ -1291,6 +1327,7 @@ public boolean onError(BulkWriterException error) { bulkWriter.flush(); bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP); bulkWriter.close(); + responseStubber.verifyAllRequestsSent(); } @Test