Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add max/min throttling options to BulkWriterOptions #400

Merged
merged 6 commits into from Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -43,7 +43,7 @@

final class BulkWriter implements AutoCloseable {
/** The maximum number of writes that can be in a single batch. */
public static final int MAX_BATCH_SIZE = 500;
public static final int MAX_BATCH_SIZE = 25;

public static final int MAX_RETRY_ATTEMPTS = 10;

Expand All @@ -53,7 +53,7 @@ final class BulkWriter implements AutoCloseable {
* @see <a href=https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic>Ramping
* up traffic</a>
*/
private static final int STARTING_MAXIMUM_OPS_PER_SECOND = 500;
static final int DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND = 500;

/**
* The rate by which to increase the capacity as specified by the 500/50/5 rule.
Expand Down Expand Up @@ -97,22 +97,49 @@ final class BulkWriter implements AutoCloseable {
private final ExponentialRetryAlgorithm backoff;
private TimedAttemptSettings nextAttempt;

BulkWriter(FirestoreImpl firestore, boolean enableThrottling) {
BulkWriter(FirestoreImpl firestore, BulkWriterOptions options) {
this.firestore = firestore;
this.backoff =
new ExponentialRetryAlgorithm(
firestore.getOptions().getRetrySettings(), CurrentMillisClock.getDefaultClock());
this.nextAttempt = backoff.createFirstAttempt();
this.firestoreExecutor = firestore.getClient().getExecutor();

if (enableThrottling) {
rateLimiter =
if (!options.getThrottlingEnabled()) {
this.rateLimiter =
new RateLimiter(
STARTING_MAXIMUM_OPS_PER_SECOND,
RATE_LIMITER_MULTIPLIER,
RATE_LIMITER_MULTIPLIER_MILLIS);
Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE);
} else {
rateLimiter = new RateLimiter(Integer.MAX_VALUE, Double.MAX_VALUE, Integer.MAX_VALUE);
double startingRate = DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND;
double maxRate = Double.POSITIVE_INFINITY;

if (options.getInitialOpsPerSecond() != null) {
startingRate = options.getInitialOpsPerSecond();
}

if (options.getMaxOpsPerSecond() != null) {
maxRate = options.getMaxOpsPerSecond();
}

// The initial validation step ensures that the maxOpsPerSecond is greater than
// initialOpsPerSecond. If this inequality is true, that means initialOpsPerSecond was not
// set and maxOpsPerSecond is less than the default starting rate.
if (maxRate < startingRate) {
startingRate = maxRate;
}

// Ensure that the batch size is not larger than the number of allowed
// operations per second.
if (startingRate < maxBatchSize) {
this.maxBatchSize = (int) startingRate;
}

this.rateLimiter =
new RateLimiter(
(int) startingRate,
RATE_LIMITER_MULTIPLIER,
RATE_LIMITER_MULTIPLIER_MILLIS,
(int) maxRate);
}
}

Expand Down Expand Up @@ -679,4 +706,9 @@ public ApiFuture<Void> apply(List<BatchWriteResult> results) {
void setMaxBatchSize(int size) {
maxBatchSize = size;
}

@VisibleForTesting
RateLimiter getRateLimiter() {
return rateLimiter;
}
}
Expand Up @@ -16,28 +16,123 @@

package com.google.cloud.firestore;

import javax.annotation.Nonnull;
import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;

/** Options used to disable request throttling in BulkWriter. */
final class BulkWriterOptions {

private final boolean enableThrottling;

private BulkWriterOptions(boolean enableThrottling) {
this.enableThrottling = enableThrottling;
}
/** Options used to configure request throttling in BulkWriter. */
@AutoValue
abstract class BulkWriterOptions {
/**
* Return whether throttling is enabled.
*
* @return Whether throttling is enabled.
*/
abstract boolean getThrottlingEnabled();

boolean isThrottlingEnabled() {
return enableThrottling;
}
/**
* Returns the initial maximum number of operations per second allowed by the throttler.
*
* @return The initial maximum number of operations per second allowed by the throttler.
*/
@Nullable
abstract Double getInitialOpsPerSecond();

/**
* An options object that will disable throttling in the created BulkWriter.
* Returns the maximum number of operations per second allowed by the throttler.
*
* @return The BulkWriterOptions object.
* <p>The throttler's allowed operations per second does not ramp up past the specified operations
* per second.
*
* @return The maximum number of operations per second allowed by the throttler.
*/
@Nonnull
public static BulkWriterOptions withThrottlingDisabled() {
return new BulkWriterOptions(false);
@Nullable
abstract Double getMaxOpsPerSecond();

static Builder builder() {
return new AutoValue_BulkWriterOptions.Builder()
.setMaxOpsPerSecond(null)
.setInitialOpsPerSecond(null)
.setThrottlingEnabled(true);
}

abstract Builder toBuilder();

@AutoValue.Builder
abstract static class Builder {
/**
* Sets whether throttling should be enabled. By default, throttling is enabled.
*
* @param enabled Whether throttling should be enabled.
*/
abstract Builder setThrottlingEnabled(boolean enabled);

/**
* Set the initial maximum number of operations per second allowed by the throttler.
*
* @param initialOpsPerSecond The initial maximum number of operations per second allowed by the
* throttler.
*/
abstract Builder setInitialOpsPerSecond(@Nullable Double initialOpsPerSecond);

/**
* Set the initial maximum number of operations per second allowed by the throttler.
*
* @param initialOpsPerSecond The initial maximum number of operations per second allowed by the
* throttler.
*/
Builder setInitialOpsPerSecond(int initialOpsPerSecond) {
return setInitialOpsPerSecond(new Double(initialOpsPerSecond));
}

/**
* Set the maximum number of operations per second allowed by the throttler.
*
* @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler.
* The throttler's allowed operations per second does not ramp up past the specified
* operations per second.
*/
abstract Builder setMaxOpsPerSecond(@Nullable Double maxOpsPerSecond);

/**
* Set the maximum number of operations per second allowed by the throttler.
*
* @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler.
* The throttler's allowed operations per second does not ramp up past the specified
* operations per second.
*/
Builder setMaxOpsPerSecond(int maxOpsPerSecond) {
return setMaxOpsPerSecond(new Double(maxOpsPerSecond));
}

abstract BulkWriterOptions autoBuild();

BulkWriterOptions build() {
BulkWriterOptions options = autoBuild();
Double initialRate = options.getInitialOpsPerSecond();
Double maxRate = options.getMaxOpsPerSecond();

if (initialRate != null && initialRate < 1) {
throw FirestoreException.invalidState(
"Value for argument 'initialOpsPerSecond' must be greater than 1, but was: "
+ initialRate.intValue());
}

if (maxRate != null && maxRate < 1) {
throw FirestoreException.invalidState(
"Value for argument 'maxOpsPerSecond' must be greater than 1, but was: "
+ maxRate.intValue());
}

if (maxRate != null && initialRate != null && initialRate > maxRate) {
throw FirestoreException.invalidState(
"'maxOpsPerSecond' cannot be less than 'initialOpsPerSecond'.");
}

if (!options.getThrottlingEnabled() && (maxRate != null || initialRate != null)) {
throw FirestoreException.invalidState(
"Cannot set 'initialOpsPerSecond' or 'maxOpsPerSecond' when 'throttlingEnabled' is set to false.");
}
return options;
}
}
}
Expand Up @@ -94,12 +94,12 @@ public WriteBatch batch() {

@Nonnull
BulkWriter bulkWriter() {
return new BulkWriter(this, /* enableThrottling= */ true);
return new BulkWriter(this, BulkWriterOptions.builder().setThrottlingEnabled(true).build());
}

@Nonnull
BulkWriter bulkWriter(BulkWriterOptions options) {
return new BulkWriter(this, options.isThrottlingEnabled());
return new BulkWriter(this, options);
}

@Nonnull
Expand Down
Expand Up @@ -38,31 +38,48 @@ class RateLimiter {
private final double multiplier;
private final int multiplierMillis;
private final long startTimeMillis;
private final int maximumRate;

private int availableTokens;
private long lastRefillTimeMillis;

RateLimiter(int initialCapacity, double multiplier, int multiplierMillis) {
this(initialCapacity, multiplier, multiplierMillis, new Date().getTime());
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, int maximumRate) {
this(initialCapacity, multiplier, multiplierMillis, maximumRate, new Date().getTime());
}

/**
* @param initialCapacity Initial maximum number of operations per second.
* @param multiplier Rate by which to increase the capacity.
* @param multiplierMillis How often the capacity should increase in milliseconds.
* @param maximumRate Maximum number of allowed operations per second. The number of tokens added
* per second will never exceed this number.
* @param startTimeMillis The starting time in epoch milliseconds that the rate limit is based on.
* Used for testing the limiter.
*/
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, long startTimeMillis) {
RateLimiter(
int initialCapacity,
double multiplier,
int multiplierMillis,
int maximumRate,
long startTimeMillis) {
this.initialCapacity = initialCapacity;
this.multiplier = multiplier;
this.multiplierMillis = multiplierMillis;
this.maximumRate = maximumRate;
this.startTimeMillis = startTimeMillis;

this.availableTokens = initialCapacity;
this.lastRefillTimeMillis = startTimeMillis;
}

public int getInitialCapacity() {
return initialCapacity;
}

public int getMaximumRate() {
return maximumRate;
}

public boolean tryMakeRequest(int numOperations) {
return tryMakeRequest(numOperations, new Date().getTime());
}
Expand Down Expand Up @@ -132,7 +149,10 @@ private void refillTokens(long requestTimeMillis) {
public int calculateCapacity(long requestTimeMillis) {
long millisElapsed = requestTimeMillis - startTimeMillis;
int operationsPerSecond =
(int) (Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity);
Math.min(
(int)
(Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity),
maximumRate);
return operationsPerSecond;
}
}