Skip to content

Commit

Permalink
feat: add max/min throttling options to BulkWriterOptions (#400)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Chen committed Oct 9, 2020
1 parent 5444fed commit 27a9397
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 35 deletions.
Expand Up @@ -53,7 +53,7 @@ final class BulkWriter implements AutoCloseable {
* @see <a href=https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic>Ramping
* up traffic</a>
*/
private static final int STARTING_MAXIMUM_OPS_PER_SECOND = 500;
static final int DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND = 500;

/**
* The rate by which to increase the capacity as specified by the 500/50/5 rule.
Expand Down Expand Up @@ -97,22 +97,49 @@ final class BulkWriter implements AutoCloseable {
private final ExponentialRetryAlgorithm backoff;
private TimedAttemptSettings nextAttempt;

BulkWriter(FirestoreImpl firestore, boolean enableThrottling) {
BulkWriter(FirestoreImpl firestore, BulkWriterOptions options) {
this.firestore = firestore;
this.backoff =
new ExponentialRetryAlgorithm(
firestore.getOptions().getRetrySettings(), CurrentMillisClock.getDefaultClock());
this.nextAttempt = backoff.createFirstAttempt();
this.firestoreExecutor = firestore.getClient().getExecutor();

if (enableThrottling) {
rateLimiter =
if (!options.getThrottlingEnabled()) {
this.rateLimiter =
new RateLimiter(
STARTING_MAXIMUM_OPS_PER_SECOND,
RATE_LIMITER_MULTIPLIER,
RATE_LIMITER_MULTIPLIER_MILLIS);
Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE);
} else {
rateLimiter = new RateLimiter(Integer.MAX_VALUE, Double.MAX_VALUE, Integer.MAX_VALUE);
double startingRate = DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND;
double maxRate = Double.POSITIVE_INFINITY;

if (options.getInitialOpsPerSecond() != null) {
startingRate = options.getInitialOpsPerSecond();
}

if (options.getMaxOpsPerSecond() != null) {
maxRate = options.getMaxOpsPerSecond();
}

// The initial validation step ensures that the maxOpsPerSecond is greater than
// initialOpsPerSecond. If this inequality is true, that means initialOpsPerSecond was not
// set and maxOpsPerSecond is less than the default starting rate.
if (maxRate < startingRate) {
startingRate = maxRate;
}

// Ensure that the batch size is not larger than the number of allowed
// operations per second.
if (startingRate < maxBatchSize) {
this.maxBatchSize = (int) startingRate;
}

this.rateLimiter =
new RateLimiter(
(int) startingRate,
RATE_LIMITER_MULTIPLIER,
RATE_LIMITER_MULTIPLIER_MILLIS,
(int) maxRate);
}
}

Expand Down Expand Up @@ -679,4 +706,9 @@ public ApiFuture<Void> apply(List<BatchWriteResult> results) {
void setMaxBatchSize(int size) {
maxBatchSize = size;
}

@VisibleForTesting
RateLimiter getRateLimiter() {
return rateLimiter;
}
}
Expand Up @@ -16,28 +16,123 @@

package com.google.cloud.firestore;

import javax.annotation.Nonnull;
import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;

/** Options used to disable request throttling in BulkWriter. */
final class BulkWriterOptions {

private final boolean enableThrottling;

private BulkWriterOptions(boolean enableThrottling) {
this.enableThrottling = enableThrottling;
}
/** Options used to configure request throttling in BulkWriter. */
@AutoValue
abstract class BulkWriterOptions {
/**
* Return whether throttling is enabled.
*
* @return Whether throttling is enabled.
*/
abstract boolean getThrottlingEnabled();

boolean isThrottlingEnabled() {
return enableThrottling;
}
/**
* Returns the initial maximum number of operations per second allowed by the throttler.
*
* @return The initial maximum number of operations per second allowed by the throttler.
*/
@Nullable
abstract Double getInitialOpsPerSecond();

/**
* An options object that will disable throttling in the created BulkWriter.
* Returns the maximum number of operations per second allowed by the throttler.
*
* @return The BulkWriterOptions object.
* <p>The throttler's allowed operations per second does not ramp up past the specified operations
* per second.
*
* @return The maximum number of operations per second allowed by the throttler.
*/
@Nonnull
public static BulkWriterOptions withThrottlingDisabled() {
return new BulkWriterOptions(false);
@Nullable
abstract Double getMaxOpsPerSecond();

static Builder builder() {
return new AutoValue_BulkWriterOptions.Builder()
.setMaxOpsPerSecond(null)
.setInitialOpsPerSecond(null)
.setThrottlingEnabled(true);
}

abstract Builder toBuilder();

@AutoValue.Builder
abstract static class Builder {
/**
* Sets whether throttling should be enabled. By default, throttling is enabled.
*
* @param enabled Whether throttling should be enabled.
*/
abstract Builder setThrottlingEnabled(boolean enabled);

/**
* Set the initial maximum number of operations per second allowed by the throttler.
*
* @param initialOpsPerSecond The initial maximum number of operations per second allowed by the
* throttler.
*/
abstract Builder setInitialOpsPerSecond(@Nullable Double initialOpsPerSecond);

/**
* Set the initial maximum number of operations per second allowed by the throttler.
*
* @param initialOpsPerSecond The initial maximum number of operations per second allowed by the
* throttler.
*/
Builder setInitialOpsPerSecond(int initialOpsPerSecond) {
return setInitialOpsPerSecond(new Double(initialOpsPerSecond));
}

/**
* Set the maximum number of operations per second allowed by the throttler.
*
* @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler.
* The throttler's allowed operations per second does not ramp up past the specified
* operations per second.
*/
abstract Builder setMaxOpsPerSecond(@Nullable Double maxOpsPerSecond);

/**
* Set the maximum number of operations per second allowed by the throttler.
*
* @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler.
* The throttler's allowed operations per second does not ramp up past the specified
* operations per second.
*/
Builder setMaxOpsPerSecond(int maxOpsPerSecond) {
return setMaxOpsPerSecond(new Double(maxOpsPerSecond));
}

abstract BulkWriterOptions autoBuild();

BulkWriterOptions build() {
BulkWriterOptions options = autoBuild();
Double initialRate = options.getInitialOpsPerSecond();
Double maxRate = options.getMaxOpsPerSecond();

if (initialRate != null && initialRate < 1) {
throw FirestoreException.invalidState(
"Value for argument 'initialOpsPerSecond' must be greater than 1, but was: "
+ initialRate.intValue());
}

if (maxRate != null && maxRate < 1) {
throw FirestoreException.invalidState(
"Value for argument 'maxOpsPerSecond' must be greater than 1, but was: "
+ maxRate.intValue());
}

if (maxRate != null && initialRate != null && initialRate > maxRate) {
throw FirestoreException.invalidState(
"'maxOpsPerSecond' cannot be less than 'initialOpsPerSecond'.");
}

if (!options.getThrottlingEnabled() && (maxRate != null || initialRate != null)) {
throw FirestoreException.invalidState(
"Cannot set 'initialOpsPerSecond' or 'maxOpsPerSecond' when 'throttlingEnabled' is set to false.");
}
return options;
}
}
}
Expand Up @@ -94,12 +94,12 @@ public WriteBatch batch() {

@Nonnull
BulkWriter bulkWriter() {
return new BulkWriter(this, /* enableThrottling= */ true);
return new BulkWriter(this, BulkWriterOptions.builder().setThrottlingEnabled(true).build());
}

@Nonnull
BulkWriter bulkWriter(BulkWriterOptions options) {
return new BulkWriter(this, options.isThrottlingEnabled());
return new BulkWriter(this, options);
}

@Nonnull
Expand Down
Expand Up @@ -38,31 +38,48 @@ class RateLimiter {
private final double multiplier;
private final int multiplierMillis;
private final long startTimeMillis;
private final int maximumRate;

private int availableTokens;
private long lastRefillTimeMillis;

RateLimiter(int initialCapacity, double multiplier, int multiplierMillis) {
this(initialCapacity, multiplier, multiplierMillis, new Date().getTime());
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, int maximumRate) {
this(initialCapacity, multiplier, multiplierMillis, maximumRate, new Date().getTime());
}

/**
* @param initialCapacity Initial maximum number of operations per second.
* @param multiplier Rate by which to increase the capacity.
* @param multiplierMillis How often the capacity should increase in milliseconds.
* @param maximumRate Maximum number of allowed operations per second. The number of tokens added
* per second will never exceed this number.
* @param startTimeMillis The starting time in epoch milliseconds that the rate limit is based on.
* Used for testing the limiter.
*/
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, long startTimeMillis) {
RateLimiter(
int initialCapacity,
double multiplier,
int multiplierMillis,
int maximumRate,
long startTimeMillis) {
this.initialCapacity = initialCapacity;
this.multiplier = multiplier;
this.multiplierMillis = multiplierMillis;
this.maximumRate = maximumRate;
this.startTimeMillis = startTimeMillis;

this.availableTokens = initialCapacity;
this.lastRefillTimeMillis = startTimeMillis;
}

public int getInitialCapacity() {
return initialCapacity;
}

public int getMaximumRate() {
return maximumRate;
}

public boolean tryMakeRequest(int numOperations) {
return tryMakeRequest(numOperations, new Date().getTime());
}
Expand Down Expand Up @@ -132,7 +149,10 @@ private void refillTokens(long requestTimeMillis) {
public int calculateCapacity(long requestTimeMillis) {
long millisElapsed = requestTimeMillis - startTimeMillis;
int operationsPerSecond =
(int) (Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity);
Math.min(
(int)
(Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity),
maximumRate);
return operationsPerSecond;
}
}

0 comments on commit 27a9397

Please sign in to comment.