New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: lower batch size on BulkWriter retry #688
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ | |
import com.google.api.gax.rpc.StatusCode.Code; | ||
import com.google.cloud.firestore.v1.FirestoreSettings; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.util.concurrent.MoreExecutors; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
@@ -82,6 +83,9 @@ enum OperationType { | |
/** The maximum number of writes that can be in a single batch. */ | ||
public static final int MAX_BATCH_SIZE = 20; | ||
|
||
/** The maximum number of writes can be can in a single batch that is being retried. */ | ||
public static final int RETRY_MAX_BATCH_SIZE = 10; | ||
|
||
/** | ||
* The maximum number of retries that will be attempted with backoff before stopping all retry | ||
* attempts. | ||
|
@@ -237,7 +241,7 @@ public boolean onError(BulkWriterException error) { | |
: Executors.newSingleThreadScheduledExecutor(); | ||
this.successExecutor = MoreExecutors.directExecutor(); | ||
this.errorExecutor = MoreExecutors.directExecutor(); | ||
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor); | ||
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize); | ||
|
||
if (!options.getThrottlingEnabled()) { | ||
this.rateLimiter = | ||
|
@@ -962,7 +966,7 @@ private void scheduleCurrentBatchLocked(final boolean flush) { | |
if (bulkCommitBatch.getMutationsSize() == 0) return; | ||
|
||
final BulkCommitBatch pendingBatch = bulkCommitBatch; | ||
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor); | ||
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize); | ||
|
||
// Use the write with the longest backoff duration when determining backoff. | ||
int highestBackoffDuration = 0; | ||
|
@@ -1025,7 +1029,10 @@ public void run() { | |
|
||
@VisibleForTesting | ||
void setMaxBatchSize(int size) { | ||
Preconditions.checkState( | ||
bulkCommitBatch.getMutationsSize() == 0, "BulkCommitBatch should be empty"); | ||
maxBatchSize = size; | ||
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, size); | ||
} | ||
|
||
@VisibleForTesting | ||
|
@@ -1050,7 +1057,13 @@ void setMaxPendingOpCount(int newMax) { | |
private void sendOperationLocked( | ||
ApiFunction<BulkCommitBatch, ApiFuture<WriteResult>> enqueueOperationOnBatchCallback, | ||
final BulkWriterOperation op) { | ||
if (bulkCommitBatch.has(op.getDocumentReference())) { | ||
// A backoff duration greater than 0 implies that this batch is a retry. | ||
// Retried writes are sent with a batch size of 10 in order to guarantee | ||
// that the batch is under the 10MiB limit. | ||
boolean retryInSmallerBatch = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This boolean does more than just check whether a smaller batch should be used. It also checks the size of the current batch. I think we need to move some logic or change the name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed. |
||
op.getBackoffDuration() > 0 && bulkCommitBatch.getMutationsSize() >= RETRY_MAX_BATCH_SIZE; | ||
|
||
if (bulkCommitBatch.has(op.getDocumentReference()) || retryInSmallerBatch) { | ||
// Create a new batch since the backend doesn't support batches with two writes to the same | ||
// document. | ||
scheduleCurrentBatchLocked(/* flush= */ false); | ||
|
@@ -1062,7 +1075,11 @@ private void sendOperationLocked( | |
bulkCommitBatch.enqueueOperation(op); | ||
enqueueOperationOnBatchCallback.apply(bulkCommitBatch); | ||
|
||
if (bulkCommitBatch.getMutationsSize() == maxBatchSize) { | ||
if (op.getBackoffDuration() > 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am a bit confused. Why do we need this and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The first check involving Consolidated the checks into a single if statement at the start of the function. |
||
bulkCommitBatch.setMaxBatchSize(RETRY_MAX_BATCH_SIZE); | ||
} | ||
|
||
if (bulkCommitBatch.getMutationsSize() == bulkCommitBatch.getMaxBatchSize()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ... especially since we are reading the value right back here. |
||
scheduleCurrentBatchLocked(/* flush= */ false); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is Sebastian-level grammar. Please upgrade to Brian-level grammar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Attempted.