Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add backoff to BulkWriter #600

Merged
merged 3 commits into from Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -39,7 +39,7 @@
/** Used to represent a batch that contains scheduled BulkWriterOperations. */
class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {

private final List<BulkWriterOperation> pendingOperations = new ArrayList<>();
final List<BulkWriterOperation> pendingOperations = new ArrayList<>();
private final Set<DocumentReference> documents = new CopyOnWriteArraySet<>();
private final Executor executor;

Expand Down
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.firestore;

import static com.google.cloud.firestore.BulkWriterOperation.DEFAULT_BACKOFF_MAX_DELAY_MS;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
Expand Down Expand Up @@ -108,6 +110,12 @@ enum OperationType {
*/
private static final int RATE_LIMITER_MULTIPLIER_MILLIS = 5 * 60 * 1000;

/**
* The default jitter to apply to the exponential backoff used in retries. For example, a factor
* of 0.3 means a 30% jitter is applied.
*/
static final double DEFAULT_JITTER_FACTOR = 0.3;

private static final WriteResultCallback DEFAULT_SUCCESS_LISTENER =
new WriteResultCallback() {
public void onResult(DocumentReference documentReference, WriteResult result) {}
Expand Down Expand Up @@ -681,7 +689,7 @@ public ApiFuture<Void> flush() {

private ApiFuture<Void> flushLocked() {
verifyNotClosedLocked();
sendCurrentBatchLocked(/* flush= */ true);
scheduleCurrentBatchLocked(/* flush= */ true);
return lastOperation;
}

Expand Down Expand Up @@ -846,37 +854,61 @@ public void addWriteErrorListener(@Nonnull Executor executor, WriteErrorCallback
* This allows retries to resolve as part of a {@link BulkWriter#flush()} or {@link
* BulkWriter#close()} call.
*/
private void sendCurrentBatchLocked(final boolean flush) {
private void scheduleCurrentBatchLocked(final boolean flush) {
if (bulkCommitBatch.getMutationsSize() == 0) return;
BulkCommitBatch pendingBatch = bulkCommitBatch;

final BulkCommitBatch pendingBatch = bulkCommitBatch;
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor);

// Send the batch if it is under the rate limit, or schedule another attempt after the
// Use the write with the longest backoff duration when determining backoff.
int highestBackoffDuration = 0;
for (BulkWriterOperation op : pendingBatch.pendingOperations) {
if (op.getBackoffDuration() > highestBackoffDuration) {
highestBackoffDuration = op.getBackoffDuration();
}
}
int backoffMsWithJitter = applyJitter(highestBackoffDuration);

bulkWriterExecutor.schedule(
new Runnable() {
@Override
public void run() {
synchronized (lock) {
sendBatchLocked(pendingBatch, flush);
}
}
},
backoffMsWithJitter,
TimeUnit.MILLISECONDS);
}

/** Sends the provided batch once the rate limiter does not require any delay. */
private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) {
// Send the batch if it is does not require any delay, or schedule another attempt after the
// appropriate timeout.
boolean underRateLimit = rateLimiter.tryMakeRequest(pendingBatch.getMutationsSize());
boolean underRateLimit = rateLimiter.tryMakeRequest(batch.getMutationsSize());
if (underRateLimit) {
pendingBatch
batch
.bulkCommit()
.addListener(
new Runnable() {
@Override
public void run() {
synchronized (lock) {
sendCurrentBatchLocked(flush);
scheduleCurrentBatchLocked(flush);
}
}
},
bulkWriterExecutor);

} else {
long delayMs = rateLimiter.getNextRequestDelayMs(pendingBatch.getMutationsSize());
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize());
logger.log(Level.FINE, String.format("Backing off for %d seconds", delayMs / 1000));
bulkWriterExecutor.schedule(
new Runnable() {
@Override
public void run() {
synchronized (lock) {
sendCurrentBatchLocked(flush);
sendBatchLocked(batch, flush);
}
}
},
Expand Down Expand Up @@ -905,7 +937,7 @@ private void sendOperationLocked(
if (bulkCommitBatch.has(op.getDocumentReference())) {
// Create a new batch since the backend doesn't support batches with two writes to the same
// document.
sendCurrentBatchLocked(/* flush= */ false);
scheduleCurrentBatchLocked(/* flush= */ false);
}

// Run the operation on the current batch and advance the `lastOperation` pointer. This
Expand All @@ -926,7 +958,7 @@ public ApiFuture<Void> apply(Void aVoid) {
MoreExecutors.directExecutor());

if (bulkCommitBatch.getMutationsSize() == maxBatchSize) {
sendCurrentBatchLocked(/* flush= */ false);
scheduleCurrentBatchLocked(/* flush= */ false);
}
}

Expand Down Expand Up @@ -989,4 +1021,11 @@ public void onSuccess(T writeResult) {
MoreExecutors.directExecutor());
return flushCallback;
}

private int applyJitter(int backoffMs) {
if (backoffMs == 0) return 0;
// Random value in [-0.3, 0.3].
double jitter = DEFAULT_JITTER_FACTOR * (Math.random() * 2 - 1);
return (int) Math.min(DEFAULT_BACKOFF_MAX_DELAY_MS, backoffMs + jitter * backoffMs);
}
}
Expand Up @@ -22,6 +22,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Status;

/**
* Represents a single write for BulkWriter, encapsulating operation dispatch and error handling.
Expand All @@ -34,7 +35,22 @@ class BulkWriterOperation {
private final ApiFunction<WriteResult, ApiFuture<Void>> successListener;
private final ApiFunction<BulkWriterException, ApiFuture<Boolean>> errorListener;

/**
* The default initial backoff time in milliseconds after an error. Set to 1s according to
* https://cloud.google.com/apis/design/errors.
*/
public static final int DEFAULT_BACKOFF_INITIAL_DELAY_MS = 1000;

/** The default maximum backoff time in milliseconds when retrying an operation. */
public static final int DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000;

/** The default factor to increase the backup by after each failed attempt. */
public static final double DEFAULT_BACKOFF_FACTOR = 1.5;

private int failedAttempts = 0;
private Status lastStatus;

private int backoffDuration = 0;

/**
* @param documentReference The document reference being written to.
Expand Down Expand Up @@ -68,6 +84,10 @@ public DocumentReference getDocumentReference() {
return documentReference;
}

public int getBackoffDuration() {
return backoffDuration;
}

/** Callback invoked when an operation attempt fails. */
public ApiFuture<Void> onException(FirestoreException exception) {
++failedAttempts;
Expand All @@ -94,6 +114,8 @@ public void onFailure(Throwable throwable) {
@Override
public void onSuccess(Boolean shouldRetry) {
if (shouldRetry) {
lastStatus = bulkWriterException.getStatus();
updateBackoffDuration();
scheduleWriteCallback.apply(BulkWriterOperation.this);
} else {
operationFuture.setException(bulkWriterException);
Expand All @@ -106,6 +128,16 @@ public void onSuccess(Boolean shouldRetry) {
return callbackFuture;
}

private void updateBackoffDuration() {
schmidt-sebastian marked this conversation as resolved.
Show resolved Hide resolved
if (lastStatus == Status.RESOURCE_EXHAUSTED) {
backoffDuration = DEFAULT_BACKOFF_MAX_DELAY_MS;
} else if (backoffDuration == 0) {
backoffDuration = DEFAULT_BACKOFF_INITIAL_DELAY_MS;
} else {
backoffDuration *= DEFAULT_BACKOFF_FACTOR;
}
}

/** Callback invoked when the operation succeeds. */
public ApiFuture<Void> onSuccess(final WriteResult result) {
final SettableApiFuture<Void> callbackFuture = SettableApiFuture.create();
Expand Down