Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: lower batch size on BulkWriter retry #688

Merged
merged 4 commits into from Jul 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -42,10 +42,23 @@ class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
final List<BulkWriterOperation> pendingOperations = new ArrayList<>();
private final Set<DocumentReference> documents = new CopyOnWriteArraySet<>();
private final Executor executor;
private int maxBatchSize;

BulkCommitBatch(FirestoreImpl firestore, Executor executor) {
BulkCommitBatch(FirestoreImpl firestore, Executor executor, int maxBatchSize) {
super(firestore);
this.executor = executor;
this.maxBatchSize = maxBatchSize;
}

int getMaxBatchSize() {
return maxBatchSize;
}

void setMaxBatchSize(int size) {
Preconditions.checkState(
getMutationsSize() <= size,
"New batch size cannot be less than the number of enqueued writes");
this.maxBatchSize = size;
}

ApiFuture<WriteResult> wrapResult(int writeIndex) {
Expand Down
Expand Up @@ -29,6 +29,7 @@
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.firestore.v1.FirestoreSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -82,6 +83,9 @@ enum OperationType {
/** The maximum number of writes that can be in a single batch. */
public static final int MAX_BATCH_SIZE = 20;

/** The maximum number of writes that can be in a batch containing retries. */
public static final int RETRY_MAX_BATCH_SIZE = 10;

/**
* The maximum number of retries that will be attempted with backoff before stopping all retry
* attempts.
Expand Down Expand Up @@ -237,7 +241,7 @@ public boolean onError(BulkWriterException error) {
: Executors.newSingleThreadScheduledExecutor();
this.successExecutor = MoreExecutors.directExecutor();
this.errorExecutor = MoreExecutors.directExecutor();
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor);
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize);

if (!options.getThrottlingEnabled()) {
this.rateLimiter =
Expand Down Expand Up @@ -962,7 +966,7 @@ private void scheduleCurrentBatchLocked(final boolean flush) {
if (bulkCommitBatch.getMutationsSize() == 0) return;

final BulkCommitBatch pendingBatch = bulkCommitBatch;
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor);
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize);

// Use the write with the longest backoff duration when determining backoff.
int highestBackoffDuration = 0;
Expand Down Expand Up @@ -1025,7 +1029,10 @@ public void run() {

@VisibleForTesting
void setMaxBatchSize(int size) {
Preconditions.checkState(
bulkCommitBatch.getMutationsSize() == 0, "BulkCommitBatch should be empty");
maxBatchSize = size;
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, size);
}

@VisibleForTesting
Expand All @@ -1050,6 +1057,16 @@ void setMaxPendingOpCount(int newMax) {
private void sendOperationLocked(
ApiFunction<BulkCommitBatch, ApiFuture<WriteResult>> enqueueOperationOnBatchCallback,
final BulkWriterOperation op) {
// A backoff duration greater than 0 implies that this batch is a retry.
// Retried writes are sent with a batch size of 10 in order to guarantee
// that the batch is under the 10MiB limit.
if (op.getBackoffDuration() > 0) {
if (bulkCommitBatch.getMutationsSize() >= RETRY_MAX_BATCH_SIZE) {
scheduleCurrentBatchLocked(/* flush= */ false);
}
bulkCommitBatch.setMaxBatchSize(RETRY_MAX_BATCH_SIZE);
}

if (bulkCommitBatch.has(op.getDocumentReference())) {
// Create a new batch since the backend doesn't support batches with two writes to the same
// document.
Expand All @@ -1062,7 +1079,7 @@ private void sendOperationLocked(
bulkCommitBatch.enqueueOperation(op);
enqueueOperationOnBatchCallback.apply(bulkCommitBatch);

if (bulkCommitBatch.getMutationsSize() == maxBatchSize) {
if (bulkCommitBatch.getMutationsSize() == bulkCommitBatch.getMaxBatchSize()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... especially since we are reading the value right back here.

scheduleCurrentBatchLocked(/* flush= */ false);
}
}
Expand Down
Expand Up @@ -44,6 +44,7 @@
import com.google.firestore.v1.BatchWriteRequest;
import com.google.firestore.v1.BatchWriteResponse;
import com.google.firestore.v1.Value;
import com.google.firestore.v1.Write;
import com.google.protobuf.GeneratedMessageV3;
import com.google.rpc.Code;
import io.grpc.Status;
Expand Down Expand Up @@ -727,6 +728,42 @@ public boolean onError(BulkWriterException error) {
assertEquals(Timestamp.ofTimeSecondsAndNanos(1, 0), result1.get().getUpdateTime());
}

@Test
public void retriesWithSmallerBatchSize() throws Exception {

final List<Write> writes = new ArrayList<>();
final List<ApiFuture<BatchWriteResponse>> successResponses = new ArrayList<>();
final List<ApiFuture<BatchWriteResponse>> failedResponses = new ArrayList<>();
for (int i = 0; i < 15; i++) {
writes.add(set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc" + i));
failedResponses.add(failedResponse(Code.ABORTED_VALUE));
successResponses.add(successResponse(1));
}

ResponseStubber responseStubber =
new ResponseStubber() {
{
put(
batchWrite(writes.toArray(new Write[0])),
mergeResponses(failedResponses.toArray(new ApiFuture[0])));
put(
batchWrite(
writes.subList(0, BulkWriter.RETRY_MAX_BATCH_SIZE).toArray(new Write[0])),
mergeResponses(successResponses.subList(0, 10).toArray(new ApiFuture[0])));
put(
batchWrite(
writes.subList(BulkWriter.RETRY_MAX_BATCH_SIZE, 15).toArray(new Write[0])),
mergeResponses(successResponses.subList(10, 15).toArray(new ApiFuture[0])));
}
};
responseStubber.initializeStub(batchWriteCapture, firestoreMock);

for (int i = 0; i < 15; i++) {
bulkWriter.set(firestoreMock.document("coll/doc" + i), LocalFirestoreHelper.SINGLE_FIELD_MAP);
}
bulkWriter.close();
}

@Test
public void retryResolvesBeforeFlush() throws Exception {
ResponseStubber responseStubber =
Expand Down