Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add max/min throttling options to BulkWriterOptions #400

Merged
merged 6 commits into from Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -43,7 +43,7 @@

final class BulkWriter implements AutoCloseable {
/** The maximum number of writes that can be in a single batch. */
public static final int MAX_BATCH_SIZE = 500;
public static final int MAX_BATCH_SIZE = 25;

public static final int MAX_RETRY_ATTEMPTS = 10;

Expand All @@ -53,7 +53,7 @@ final class BulkWriter implements AutoCloseable {
* @see <a href=https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic>Ramping
* up traffic</a>
*/
private static final int STARTING_MAXIMUM_OPS_PER_SECOND = 500;
static final int DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND = 500;

/**
* The rate by which to increase the capacity as specified by the 500/50/5 rule.
Expand Down Expand Up @@ -97,22 +97,50 @@ final class BulkWriter implements AutoCloseable {
private final ExponentialRetryAlgorithm backoff;
private TimedAttemptSettings nextAttempt;

BulkWriter(FirestoreImpl firestore, boolean enableThrottling) {
BulkWriter(FirestoreImpl firestore, BulkWriterOptions options) {
validateBulkWriterOptions(options);
this.firestore = firestore;
this.backoff =
new ExponentialRetryAlgorithm(
firestore.getOptions().getRetrySettings(), CurrentMillisClock.getDefaultClock());
this.nextAttempt = backoff.createFirstAttempt();
this.firestoreExecutor = firestore.getClient().getExecutor();

if (enableThrottling) {
rateLimiter =
if (!options.getThrottlingEnabled()) {
this.rateLimiter =
new RateLimiter(
STARTING_MAXIMUM_OPS_PER_SECOND,
RATE_LIMITER_MULTIPLIER,
RATE_LIMITER_MULTIPLIER_MILLIS);
Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE);
} else {
rateLimiter = new RateLimiter(Integer.MAX_VALUE, Double.MAX_VALUE, Integer.MAX_VALUE);
double startingRate = DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND;
double maxRate = Integer.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems strange. Should we use Double.POSITIVE_INFINITY?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


if (!Double.isNaN(options.getInitialOpsPerSecond())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use nullable values (Double instead of double) and use a null-check.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, but had to modify the AutoValue setters to support passing in int arguments.

startingRate = options.getInitialOpsPerSecond();
}

if (!Double.isNaN(options.getMaxOpsPerSecond())) {
maxRate = options.getMaxOpsPerSecond();
}

// The initial validation step ensures that the maxOpsPerSecond is greater than
// initialOpsPerSecond. If this inequality is true, that means initialOpsPerSecond was not
// set and maxOpsPerSecond is less than the default starting rate.
if (maxRate < startingRate) {
startingRate = maxRate;
}

// Ensure that the batch size is not larger than the number of allowed
// operations per second.
if (startingRate < maxBatchSize) {
this.maxBatchSize = (int) startingRate;
}

this.rateLimiter =
new RateLimiter(
(int) startingRate,
RATE_LIMITER_MULTIPLIER,
RATE_LIMITER_MULTIPLIER_MILLIS,
(int) maxRate);
}
}

Expand Down Expand Up @@ -679,4 +707,35 @@ public ApiFuture<Void> apply(List<BatchWriteResult> results) {
void setMaxBatchSize(int size) {
maxBatchSize = size;
}

@VisibleForTesting
RateLimiter getRateLimiter() {
return rateLimiter;
}

private void validateBulkWriterOptions(BulkWriterOptions options) {
double initialRate = options.getInitialOpsPerSecond();
double maxRate = options.getMaxOpsPerSecond();

if (initialRate < 1) {
throw FirestoreException.invalidState(
"Value for argument 'initialOpsPerSecond' must be greater than 1, but was: "
+ (int) initialRate);
}

if (maxRate < 1) {
throw FirestoreException.invalidState(
"Value for argument 'maxOpsPerSecond' must be greater than 1, but was: " + (int) maxRate);
}

if (!Double.isNaN(initialRate) && !Double.isNaN(maxRate) && initialRate > maxRate) {
throw FirestoreException.invalidState(
"'maxOpsPerSecond' cannot be less than 'initialOpsPerSecond'.");
}

if (!options.getThrottlingEnabled() && (!Double.isNaN(initialRate) || !Double.isNaN(maxRate))) {
throw FirestoreException.invalidState(
"Cannot set 'initialRate' or 'maxRate' when 'throttlingEnabled' is set to false.");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, this validation would happen in the BulkWriterOptions.build() step. This would follow the precedent in other builders.

This might help: https://github.com/google/auto/blob/master/value/userguide/builders-howto.md#validate

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved.

}
Expand Up @@ -16,28 +16,70 @@

package com.google.cloud.firestore;

import javax.annotation.Nonnull;
import com.google.auto.value.AutoValue;

/** Options used to disable request throttling in BulkWriter. */
final class BulkWriterOptions {

private final boolean enableThrottling;

private BulkWriterOptions(boolean enableThrottling) {
this.enableThrottling = enableThrottling;
}
/** Options used to configure request throttling in BulkWriter. */
@AutoValue
abstract class BulkWriterOptions {
/**
* Return whether throttling is enabled.
*
* @return Whether throttling is enabled.
*/
abstract boolean getThrottlingEnabled();

boolean isThrottlingEnabled() {
return enableThrottling;
}
/**
* Returns the initial maximum number of operations per second allowed by the throttler.
*
* @return The initial maximum number of operations per second allowed by the throttler.
*/
abstract double getInitialOpsPerSecond();

/**
* An options object that will disable throttling in the created BulkWriter.
* Returns the maximum number of operations per second allowed by the throttler.
*
* <p>The throttler's allowed operations per second does not ramp up past the specified operations
* per second.
*
* @return The BulkWriterOptions object.
* @return The maximum number of operations per second allowed by the throttler.
*/
@Nonnull
public static BulkWriterOptions withThrottlingDisabled() {
return new BulkWriterOptions(false);
abstract double getMaxOpsPerSecond();

static Builder builder() {
return new AutoValue_BulkWriterOptions.Builder()
.setMaxOpsPerSecond(Double.NaN)
.setInitialOpsPerSecond(Double.NaN)
.setThrottlingEnabled(true);
}

abstract Builder toBuilder();

@AutoValue.Builder
abstract static class Builder {
/**
* Sets whether throttling should be enabled. By default, throttling is enabled.
*
* @param enabled Whether throttling should be enabled.
*/
abstract Builder setThrottlingEnabled(boolean enabled);

/**
* Set the initial maximum number of operations per second allowed by the throttler.
*
* @param initialOpsPerSecond The initial maximum number of operations per second allowed by the
* throttler.
*/
abstract Builder setInitialOpsPerSecond(double initialOpsPerSecond);

/**
* Set the maximum number of operations per second allowed by the throttler.
*
* @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler.
* The throttler's allowed operations per second does not ramp up past the specified
* operations per second.
*/
abstract Builder setMaxOpsPerSecond(double maxOpsPerSecond);

abstract BulkWriterOptions build();
}
}
Expand Up @@ -94,12 +94,12 @@ public WriteBatch batch() {

@Nonnull
BulkWriter bulkWriter() {
return new BulkWriter(this, /* enableThrottling= */ true);
return new BulkWriter(this, BulkWriterOptions.builder().setThrottlingEnabled(true).build());
}

@Nonnull
BulkWriter bulkWriter(BulkWriterOptions options) {
return new BulkWriter(this, options.isThrottlingEnabled());
return new BulkWriter(this, options);
}

@Nonnull
Expand Down
Expand Up @@ -38,31 +38,48 @@ class RateLimiter {
private final double multiplier;
private final int multiplierMillis;
private final long startTimeMillis;
private final int maximumCapacity;

private int availableTokens;
private long lastRefillTimeMillis;

RateLimiter(int initialCapacity, double multiplier, int multiplierMillis) {
this(initialCapacity, multiplier, multiplierMillis, new Date().getTime());
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, int maximumCapacity) {
this(initialCapacity, multiplier, multiplierMillis, maximumCapacity, new Date().getTime());
}

/**
* @param initialCapacity Initial maximum number of operations per second.
* @param multiplier Rate by which to increase the capacity.
* @param multiplierMillis How often the capacity should increase in milliseconds.
* @param maximumCapacity Maximum number of allowed operations per second. The number of tokens
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this different from the capacity? From what I can tell, the capacity grows unbounded and the rate is reduced artificially. The implementation is fine, but the name of the setting is a bit misleading.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the capacity is calculated, it's capped at the maximumCapacity, which I thought is short for "maximum allowed capacity".

When a new request is made, the number of tokens to allocate is done in calculateCapacity, which performs the Math.min() operation. This means that the capacity is bounded by maximumCapacity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in your implementation the total capacity can grow beyond maximumCapacity, but the rate at which tokens get deducted is limited. This implementation is correct, but the value that is passed here is not the maximum capacity, but rather the limit of tokens used at a time. e.g. after 5 seconds without requests the total capacity of the throttler could be 500, even if maximumCapacity is 100.

As stated, this is just a naming nit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for elaborating! Renamed to maximumRate.

* added per second will never exceed this number.
* @param startTimeMillis The starting time in epoch milliseconds that the rate limit is based on.
* Used for testing the limiter.
*/
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, long startTimeMillis) {
RateLimiter(
int initialCapacity,
double multiplier,
int multiplierMillis,
int maximumCapacity,
long startTimeMillis) {
this.initialCapacity = initialCapacity;
this.multiplier = multiplier;
this.multiplierMillis = multiplierMillis;
this.maximumCapacity = maximumCapacity;
this.startTimeMillis = startTimeMillis;

this.availableTokens = initialCapacity;
this.lastRefillTimeMillis = startTimeMillis;
}

public int getInitialCapacity() {
return initialCapacity;
}

public int getMaximumCapacity() {
return maximumCapacity;
}

public boolean tryMakeRequest(int numOperations) {
return tryMakeRequest(numOperations, new Date().getTime());
}
Expand Down Expand Up @@ -132,7 +149,10 @@ private void refillTokens(long requestTimeMillis) {
public int calculateCapacity(long requestTimeMillis) {
long millisElapsed = requestTimeMillis - startTimeMillis;
int operationsPerSecond =
(int) (Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity);
Math.min(
(int)
(Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity),
maximumCapacity);
return operationsPerSecond;
}
}