Skip to content

Commit

Permalink
fix: lower batch size on BulkWriter retry (#688)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Chen committed Jul 3, 2021
1 parent 6532099 commit 146b21d
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 4 deletions.
Expand Up @@ -42,10 +42,23 @@ class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
final List<BulkWriterOperation> pendingOperations = new ArrayList<>();
private final Set<DocumentReference> documents = new CopyOnWriteArraySet<>();
private final Executor executor;
private int maxBatchSize;

BulkCommitBatch(FirestoreImpl firestore, Executor executor) {
BulkCommitBatch(FirestoreImpl firestore, Executor executor, int maxBatchSize) {
super(firestore);
this.executor = executor;
this.maxBatchSize = maxBatchSize;
}

int getMaxBatchSize() {
return maxBatchSize;
}

void setMaxBatchSize(int size) {
Preconditions.checkState(
getMutationsSize() <= size,
"New batch size cannot be less than the number of enqueued writes");
this.maxBatchSize = size;
}

ApiFuture<WriteResult> wrapResult(int writeIndex) {
Expand Down
Expand Up @@ -29,6 +29,7 @@
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.firestore.v1.FirestoreSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -82,6 +83,9 @@ enum OperationType {
/** The maximum number of writes that can be in a single batch. */
public static final int MAX_BATCH_SIZE = 20;

/** The maximum number of writes that can be in a batch containing retries. */
public static final int RETRY_MAX_BATCH_SIZE = 10;

/**
* The maximum number of retries that will be attempted with backoff before stopping all retry
* attempts.
Expand Down Expand Up @@ -237,7 +241,7 @@ public boolean onError(BulkWriterException error) {
: Executors.newSingleThreadScheduledExecutor();
this.successExecutor = MoreExecutors.directExecutor();
this.errorExecutor = MoreExecutors.directExecutor();
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor);
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize);

if (!options.getThrottlingEnabled()) {
this.rateLimiter =
Expand Down Expand Up @@ -962,7 +966,7 @@ private void scheduleCurrentBatchLocked(final boolean flush) {
if (bulkCommitBatch.getMutationsSize() == 0) return;

final BulkCommitBatch pendingBatch = bulkCommitBatch;
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor);
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize);

// Use the write with the longest backoff duration when determining backoff.
int highestBackoffDuration = 0;
Expand Down Expand Up @@ -1025,7 +1029,10 @@ public void run() {

@VisibleForTesting
void setMaxBatchSize(int size) {
Preconditions.checkState(
bulkCommitBatch.getMutationsSize() == 0, "BulkCommitBatch should be empty");
maxBatchSize = size;
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, size);
}

@VisibleForTesting
Expand All @@ -1050,6 +1057,16 @@ void setMaxPendingOpCount(int newMax) {
private void sendOperationLocked(
ApiFunction<BulkCommitBatch, ApiFuture<WriteResult>> enqueueOperationOnBatchCallback,
final BulkWriterOperation op) {
// A backoff duration greater than 0 implies that this batch is a retry.
// Retried writes are sent with a batch size of 10 in order to guarantee
// that the batch is under the 10MiB limit.
if (op.getBackoffDuration() > 0) {
if (bulkCommitBatch.getMutationsSize() >= RETRY_MAX_BATCH_SIZE) {
scheduleCurrentBatchLocked(/* flush= */ false);
}
bulkCommitBatch.setMaxBatchSize(RETRY_MAX_BATCH_SIZE);
}

if (bulkCommitBatch.has(op.getDocumentReference())) {
// Create a new batch since the backend doesn't support batches with two writes to the same
// document.
Expand All @@ -1062,7 +1079,7 @@ private void sendOperationLocked(
bulkCommitBatch.enqueueOperation(op);
enqueueOperationOnBatchCallback.apply(bulkCommitBatch);

if (bulkCommitBatch.getMutationsSize() == maxBatchSize) {
if (bulkCommitBatch.getMutationsSize() == bulkCommitBatch.getMaxBatchSize()) {
scheduleCurrentBatchLocked(/* flush= */ false);
}
}
Expand Down
Expand Up @@ -44,6 +44,7 @@
import com.google.firestore.v1.BatchWriteRequest;
import com.google.firestore.v1.BatchWriteResponse;
import com.google.firestore.v1.Value;
import com.google.firestore.v1.Write;
import com.google.protobuf.GeneratedMessageV3;
import com.google.rpc.Code;
import io.grpc.Status;
Expand Down Expand Up @@ -727,6 +728,42 @@ public boolean onError(BulkWriterException error) {
assertEquals(Timestamp.ofTimeSecondsAndNanos(1, 0), result1.get().getUpdateTime());
}

@Test
public void retriesWithSmallerBatchSize() throws Exception {

final List<Write> writes = new ArrayList<>();
final List<ApiFuture<BatchWriteResponse>> successResponses = new ArrayList<>();
final List<ApiFuture<BatchWriteResponse>> failedResponses = new ArrayList<>();
for (int i = 0; i < 15; i++) {
writes.add(set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc" + i));
failedResponses.add(failedResponse(Code.ABORTED_VALUE));
successResponses.add(successResponse(1));
}

ResponseStubber responseStubber =
new ResponseStubber() {
{
put(
batchWrite(writes.toArray(new Write[0])),
mergeResponses(failedResponses.toArray(new ApiFuture[0])));
put(
batchWrite(
writes.subList(0, BulkWriter.RETRY_MAX_BATCH_SIZE).toArray(new Write[0])),
mergeResponses(successResponses.subList(0, 10).toArray(new ApiFuture[0])));
put(
batchWrite(
writes.subList(BulkWriter.RETRY_MAX_BATCH_SIZE, 15).toArray(new Write[0])),
mergeResponses(successResponses.subList(10, 15).toArray(new ApiFuture[0])));
}
};
responseStubber.initializeStub(batchWriteCapture, firestoreMock);

for (int i = 0; i < 15; i++) {
bulkWriter.set(firestoreMock.document("coll/doc" + i), LocalFirestoreHelper.SINGLE_FIELD_MAP);
}
bulkWriter.close();
}

@Test
public void retryResolvesBeforeFlush() throws Exception {
ResponseStubber responseStubber =
Expand Down

0 comments on commit 146b21d

Please sign in to comment.