Skip to content

Commit

Permalink
feat: add buffering layer to BulkWriter (#611)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Chen committed Apr 29, 2021
1 parent 0a2bc53 commit a7caff2
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 14 deletions.
Expand Up @@ -25,10 +25,13 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.firestore.v1.FirestoreSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -110,6 +113,14 @@ enum OperationType {
*/
private static final int RATE_LIMITER_MULTIPLIER_MILLIS = 5 * 60 * 1000;

/**
* The default maximum number of pending operations that can be enqueued onto a BulkWriter
* instance. An operation is considered pending if BulkWriter has sent it via RPC and is awaiting
* the result. BulkWriter buffers additional writes after this many pending operations in order to
* avoiding going OOM.
*/
private static final int DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT = 500;

/**
* The default jitter to apply to the exponential backoff used in retries. For example, a factor
* of 0.3 means a 30% jitter is applied.
Expand Down Expand Up @@ -158,6 +169,26 @@ public boolean onError(BulkWriterException error) {
@GuardedBy("lock")
private final RateLimiter rateLimiter;

/**
* The number of pending operations enqueued on this BulkWriter instance. An operation is
* considered pending if BulkWriter has sent it via RPC and is awaiting the result.
*/
@GuardedBy("lock")
private int pendingOpsCount = 0;

/**
* An array containing buffered BulkWriter operations after the maximum number of pending
* operations has been enqueued.
*/
@GuardedBy("lock")
private final List<Runnable> bufferedOperations = new ArrayList<>();

/**
* The maximum number of pending operations that can be enqueued onto this BulkWriter instance.
* Once the this number of writes have been enqueued, subsequent writes are buffered.
*/
private int maxPendingOpCount = DEFAULT_MAXIMUM_PENDING_OPERATIONS_COUNT;

/**
* The batch that is currently used to schedule operations. Once this batch reaches maximum
* capacity, a new batch is created.
Expand Down Expand Up @@ -627,7 +658,7 @@ private ApiFuture<WriteResult> executeWrite(
final DocumentReference documentReference,
final OperationType operationType,
final ApiFunction<BulkCommitBatch, ApiFuture<WriteResult>> enqueueOperationOnBatchCallback) {
BulkWriterOperation operation =
final BulkWriterOperation operation =
new BulkWriterOperation(
documentReference,
operationType,
Expand Down Expand Up @@ -660,10 +691,73 @@ public ApiFuture<Boolean> apply(BulkWriterException e) {
synchronized (lock) {
verifyNotClosedLocked();
writesEnqueued = true;
sendOperationLocked(enqueueOperationOnBatchCallback, operation);

// Advance the lastOperation pointer. This ensures that lastOperation only completes when
// both the previous and the current write complete.
lastOperation =
ApiFutures.transformAsync(
lastOperation,
new ApiAsyncFunction<Void, Void>() {
@Override
public ApiFuture<Void> apply(Void aVoid) {
return silenceFuture(operation.getFuture());
}
},
MoreExecutors.directExecutor());

// Schedule the operation if the BulkWriter has fewer than the maximum number of allowed
// pending operations, or add the operation to the buffer.
if (pendingOpsCount < maxPendingOpCount) {
pendingOpsCount++;
sendOperationLocked(enqueueOperationOnBatchCallback, operation);
} else {
bufferedOperations.add(
new Runnable() {
@Override
public void run() {
synchronized (lock) {
pendingOpsCount++;
sendOperationLocked(enqueueOperationOnBatchCallback, operation);
}
}
});
}
}

return operation.getFuture();
ApiFuture<WriteResult> processedOperationFuture =
ApiFutures.transformAsync(
operation.getFuture(),
new ApiAsyncFunction<WriteResult, WriteResult>() {
public ApiFuture<WriteResult> apply(WriteResult result) throws Exception {
pendingOpsCount--;
processBufferedOperations();
return ApiFutures.immediateFuture(result);
}
},
MoreExecutors.directExecutor());

return ApiFutures.catchingAsync(
processedOperationFuture,
ApiException.class,
new ApiAsyncFunction<ApiException, WriteResult>() {
public ApiFuture<WriteResult> apply(ApiException e) throws Exception {
pendingOpsCount--;
processBufferedOperations();
throw e;
}
},
MoreExecutors.directExecutor());
}

/**
* Manages the pending operation counter and schedules the next BulkWriter operation if we're
* under the maximum limit.
*/
private void processBufferedOperations() {
if (pendingOpsCount < maxPendingOpCount && bufferedOperations.size() > 0) {
Runnable nextOp = bufferedOperations.remove(0);
nextOp.run();
}
}

/**
Expand Down Expand Up @@ -927,6 +1021,16 @@ RateLimiter getRateLimiter() {
return rateLimiter;
}

@VisibleForTesting
int getBufferedOperationsCount() {
return bufferedOperations.size();
}

@VisibleForTesting
void setMaxPendingOpCount(int newMax) {
maxPendingOpCount = newMax;
}

/**
* Schedules the provided operations on the current BulkCommitBatch. Sends the BulkCommitBatch if
* it reaches maximum capacity.
Expand All @@ -946,17 +1050,6 @@ private void sendOperationLocked(
bulkCommitBatch.enqueueOperation(op);
enqueueOperationOnBatchCallback.apply(bulkCommitBatch);

lastOperation =
ApiFutures.transformAsync(
lastOperation,
new ApiAsyncFunction<Void, Void>() {
@Override
public ApiFuture<Void> apply(Void aVoid) {
return silenceFuture(op.getFuture());
}
},
MoreExecutors.directExecutor());

if (bulkCommitBatch.getMutationsSize() == maxBatchSize) {
scheduleCurrentBatchLocked(/* flush= */ false);
}
Expand Down
Expand Up @@ -422,6 +422,41 @@ public void sendWritesToDifferentDocsInSameBatch() throws Exception {
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
}

@Test
public void buffersSubsequentOpsAfterReachingMaxPendingOpCount() throws Exception {
ResponseStubber responseStubber =
new ResponseStubber() {
{
put(
batchWrite(
set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc1"),
set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc2"),
set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc3")),
mergeResponses(
successResponse(1),
successResponse(2),
failedResponse(Code.FAILED_PRECONDITION_VALUE)));
put(
batchWrite(
set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc4"),
set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc5")),
mergeResponses(successResponse(4), successResponse(5)));
}
};
responseStubber.initializeStub(batchWriteCapture, firestoreMock);

bulkWriter.setMaxPendingOpCount(3);
bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.set(firestoreMock.document("coll/doc3"), LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.set(firestoreMock.document("coll/doc4"), LocalFirestoreHelper.SINGLE_FIELD_MAP);
assertEquals(1, bulkWriter.getBufferedOperationsCount());
bulkWriter.set(firestoreMock.document("coll/doc5"), LocalFirestoreHelper.SINGLE_FIELD_MAP);
assertEquals(2, bulkWriter.getBufferedOperationsCount());
bulkWriter.close();
responseStubber.verifyAllRequestsSent();
}

@Test
public void runsSuccessHandler() throws Exception {
ResponseStubber responseStubber =
Expand Down Expand Up @@ -1260,6 +1295,7 @@ public boolean onError(BulkWriterException error) {
bulkWriter.create(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();
responseStubber.verifyAllRequestsSent();
assertEquals(2, retryAttempts[0]);
}

Expand Down Expand Up @@ -1291,6 +1327,7 @@ public boolean onError(BulkWriterException error) {
bulkWriter.flush();
bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP);
bulkWriter.close();
responseStubber.verifyAllRequestsSent();
}

@Test
Expand Down

0 comments on commit a7caff2

Please sign in to comment.