Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: add backoff to BulkWriter (#600)
  • Loading branch information
Brian Chen committed Apr 23, 2021
1 parent fd565a1 commit e295aa5
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 15 deletions.
Expand Up @@ -39,7 +39,7 @@
/** Used to represent a batch that contains scheduled BulkWriterOperations. */
class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {

private final List<BulkWriterOperation> pendingOperations = new ArrayList<>();
final List<BulkWriterOperation> pendingOperations = new ArrayList<>();
private final Set<DocumentReference> documents = new CopyOnWriteArraySet<>();
private final Executor executor;

Expand Down
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.firestore;

import static com.google.cloud.firestore.BulkWriterOperation.DEFAULT_BACKOFF_MAX_DELAY_MS;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
Expand Down Expand Up @@ -108,6 +110,12 @@ enum OperationType {
*/
private static final int RATE_LIMITER_MULTIPLIER_MILLIS = 5 * 60 * 1000;

/**
* The default jitter to apply to the exponential backoff used in retries. For example, a factor
* of 0.3 means a 30% jitter is applied.
*/
static final double DEFAULT_JITTER_FACTOR = 0.3;

private static final WriteResultCallback DEFAULT_SUCCESS_LISTENER =
new WriteResultCallback() {
public void onResult(DocumentReference documentReference, WriteResult result) {}
Expand Down Expand Up @@ -681,7 +689,7 @@ public ApiFuture<Void> flush() {

private ApiFuture<Void> flushLocked() {
verifyNotClosedLocked();
sendCurrentBatchLocked(/* flush= */ true);
scheduleCurrentBatchLocked(/* flush= */ true);
return lastOperation;
}

Expand Down Expand Up @@ -846,37 +854,61 @@ public void addWriteErrorListener(@Nonnull Executor executor, WriteErrorCallback
* This allows retries to resolve as part of a {@link BulkWriter#flush()} or {@link
* BulkWriter#close()} call.
*/
private void sendCurrentBatchLocked(final boolean flush) {
private void scheduleCurrentBatchLocked(final boolean flush) {
if (bulkCommitBatch.getMutationsSize() == 0) return;
BulkCommitBatch pendingBatch = bulkCommitBatch;

final BulkCommitBatch pendingBatch = bulkCommitBatch;
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor);

// Send the batch if it is under the rate limit, or schedule another attempt after the
// Use the write with the longest backoff duration when determining backoff.
int highestBackoffDuration = 0;
for (BulkWriterOperation op : pendingBatch.pendingOperations) {
if (op.getBackoffDuration() > highestBackoffDuration) {
highestBackoffDuration = op.getBackoffDuration();
}
}
final int backoffMsWithJitter = applyJitter(highestBackoffDuration);

bulkWriterExecutor.schedule(
new Runnable() {
@Override
public void run() {
synchronized (lock) {
sendBatchLocked(pendingBatch, flush);
}
}
},
backoffMsWithJitter,
TimeUnit.MILLISECONDS);
}

/** Sends the provided batch once the rate limiter does not require any delay. */
private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) {
// Send the batch if it is does not require any delay, or schedule another attempt after the
// appropriate timeout.
boolean underRateLimit = rateLimiter.tryMakeRequest(pendingBatch.getMutationsSize());
boolean underRateLimit = rateLimiter.tryMakeRequest(batch.getMutationsSize());
if (underRateLimit) {
pendingBatch
batch
.bulkCommit()
.addListener(
new Runnable() {
@Override
public void run() {
synchronized (lock) {
sendCurrentBatchLocked(flush);
scheduleCurrentBatchLocked(flush);
}
}
},
bulkWriterExecutor);

} else {
long delayMs = rateLimiter.getNextRequestDelayMs(pendingBatch.getMutationsSize());
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize());
logger.log(Level.FINE, String.format("Backing off for %d seconds", delayMs / 1000));
bulkWriterExecutor.schedule(
new Runnable() {
@Override
public void run() {
synchronized (lock) {
sendCurrentBatchLocked(flush);
sendBatchLocked(batch, flush);
}
}
},
Expand Down Expand Up @@ -905,7 +937,7 @@ private void sendOperationLocked(
if (bulkCommitBatch.has(op.getDocumentReference())) {
// Create a new batch since the backend doesn't support batches with two writes to the same
// document.
sendCurrentBatchLocked(/* flush= */ false);
scheduleCurrentBatchLocked(/* flush= */ false);
}

// Run the operation on the current batch and advance the `lastOperation` pointer. This
Expand All @@ -926,7 +958,7 @@ public ApiFuture<Void> apply(Void aVoid) {
MoreExecutors.directExecutor());

if (bulkCommitBatch.getMutationsSize() == maxBatchSize) {
sendCurrentBatchLocked(/* flush= */ false);
scheduleCurrentBatchLocked(/* flush= */ false);
}
}

Expand Down Expand Up @@ -989,4 +1021,11 @@ public void onSuccess(T writeResult) {
MoreExecutors.directExecutor());
return flushCallback;
}

private int applyJitter(int backoffMs) {
if (backoffMs == 0) return 0;
// Random value in [-0.3, 0.3].
double jitter = DEFAULT_JITTER_FACTOR * (Math.random() * 2 - 1);
return (int) Math.min(DEFAULT_BACKOFF_MAX_DELAY_MS, backoffMs + jitter * backoffMs);
}
}
Expand Up @@ -22,6 +22,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Status;

/**
* Represents a single write for BulkWriter, encapsulating operation dispatch and error handling.
Expand All @@ -34,7 +35,22 @@ class BulkWriterOperation {
private final ApiFunction<WriteResult, ApiFuture<Void>> successListener;
private final ApiFunction<BulkWriterException, ApiFuture<Boolean>> errorListener;

/**
* The default initial backoff time in milliseconds after an error. Set to 1s according to
* https://cloud.google.com/apis/design/errors.
*/
public static final int DEFAULT_BACKOFF_INITIAL_DELAY_MS = 1000;

/** The default maximum backoff time in milliseconds when retrying an operation. */
public static final int DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000;

/** The default factor to increase the backup by after each failed attempt. */
public static final double DEFAULT_BACKOFF_FACTOR = 1.5;

private int failedAttempts = 0;
private Status lastStatus;

private int backoffDuration = 0;

/**
* @param documentReference The document reference being written to.
Expand Down Expand Up @@ -68,6 +84,10 @@ public DocumentReference getDocumentReference() {
return documentReference;
}

public int getBackoffDuration() {
return backoffDuration;
}

/** Callback invoked when an operation attempt fails. */
public ApiFuture<Void> onException(FirestoreException exception) {
++failedAttempts;
Expand All @@ -94,6 +114,8 @@ public void onFailure(Throwable throwable) {
@Override
public void onSuccess(Boolean shouldRetry) {
if (shouldRetry) {
lastStatus = bulkWriterException.getStatus();
updateBackoffDuration();
scheduleWriteCallback.apply(BulkWriterOperation.this);
} else {
operationFuture.setException(bulkWriterException);
Expand All @@ -106,6 +128,16 @@ public void onSuccess(Boolean shouldRetry) {
return callbackFuture;
}

private void updateBackoffDuration() {
if (lastStatus == Status.RESOURCE_EXHAUSTED) {
backoffDuration = DEFAULT_BACKOFF_MAX_DELAY_MS;
} else if (backoffDuration == 0) {
backoffDuration = DEFAULT_BACKOFF_INITIAL_DELAY_MS;
} else {
backoffDuration *= DEFAULT_BACKOFF_FACTOR;
}
}

/** Callback invoked when the operation succeeds. */
public ApiFuture<Void> onSuccess(final WriteResult result) {
final SettableApiFuture<Void> callbackFuture = SettableApiFuture.create();
Expand Down

0 comments on commit e295aa5

Please sign in to comment.