Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add max/min throttling options to BulkWriterOptions #400

Merged
merged 6 commits into from Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -43,7 +43,7 @@

final class BulkWriter implements AutoCloseable {
/** The maximum number of writes that can be in a single batch. */
public static final int MAX_BATCH_SIZE = 500;
public static final int MAX_BATCH_SIZE = 20;

public static final int MAX_RETRY_ATTEMPTS = 10;

Expand All @@ -53,7 +53,7 @@ final class BulkWriter implements AutoCloseable {
* @see <a href=https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic>Ramping
* up traffic</a>
*/
private static final int STARTING_MAXIMUM_OPS_PER_SECOND = 500;
static final int DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND = 500;

/**
* The rate by which to increase the capacity as specified by the 500/50/5 rule.
Expand Down Expand Up @@ -97,22 +97,51 @@ final class BulkWriter implements AutoCloseable {
private final ExponentialRetryAlgorithm backoff;
private TimedAttemptSettings nextAttempt;

BulkWriter(FirestoreImpl firestore, boolean enableThrottling) {
BulkWriter(FirestoreImpl firestore, BulkWriterOptions options) {
validateBulkWriterOptions(options);
this.firestore = firestore;
this.backoff =
new ExponentialRetryAlgorithm(
firestore.getOptions().getRetrySettings(), CurrentMillisClock.getDefaultClock());
this.nextAttempt = backoff.createFirstAttempt();
this.firestoreExecutor = firestore.getClient().getExecutor();

if (enableThrottling) {
rateLimiter =
if (!options.isThrottlingEnabled()) {
this.rateLimiter =
new RateLimiter(
STARTING_MAXIMUM_OPS_PER_SECOND,
RATE_LIMITER_MULTIPLIER,
RATE_LIMITER_MULTIPLIER_MILLIS);
Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE);
} else {
rateLimiter = new RateLimiter(Integer.MAX_VALUE, Double.MAX_VALUE, Integer.MAX_VALUE);
double startingRate = DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND;
double maxRate = Integer.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems strange. Should we use Double.POSITIVE_INFINITY?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


if (options.getInitialOpsPerSecond() != BulkWriterOptions.DEFAULT_UNSET_VALUE) {
startingRate = options.getInitialOpsPerSecond();
}

if (options.getMaxOpsPerSecond() != BulkWriterOptions.DEFAULT_UNSET_VALUE) {
maxRate = options.getMaxOpsPerSecond();
}

// The initial validation step ensures that the maxOpsPerSecond is
// greater than initialOpsPerSecond. If this inequality is true, that
// means initialOpsPerSecond was not set and maxOpsPerSecond is less
// than the default starting rate.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments need some reflowing to match the Java line length.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

if (maxRate < startingRate) {
startingRate = maxRate;
}

// Ensure that the batch size is not larger than the number of allowed
// operations per second.
if (startingRate < maxBatchSize) {
this.maxBatchSize = (int) startingRate;
}

this.rateLimiter =
new RateLimiter(
(int) startingRate,
RATE_LIMITER_MULTIPLIER,
RATE_LIMITER_MULTIPLIER_MILLIS,
(int) maxRate);
}
}

Expand Down Expand Up @@ -679,4 +708,33 @@ public ApiFuture<Void> apply(List<BatchWriteResult> results) {
void setMaxBatchSize(int size) {
maxBatchSize = size;
}

@VisibleForTesting
RateLimiter getRateLimiter() {
return rateLimiter;
}

private void validateBulkWriterOptions(BulkWriterOptions options) {
double initialRate = options.getInitialOpsPerSecond();
double maxRate = options.getMaxOpsPerSecond();

if (initialRate < 1) {
throw FirestoreException.invalidState(
"Value for argument 'initialOpsPerSecond' must be an integer within [1, Infinity], but was: "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Value for argument 'initialOpsPerSecond' must be an integer within [1, Infinity], but was: "
"Value for argument 'initialOpsPerSecond' must be greater than 1, but was: "

Easier to read, me thinks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

+ (int) initialRate);
}

if (maxRate < 1) {
throw FirestoreException.invalidState(
"Value for argument 'maxOpsPerSecond' must be an integer within [1, Infinity], but was: "
+ (int) maxRate);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Samesies.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

donesies.

}

if (initialRate != BulkWriterOptions.DEFAULT_UNSET_VALUE
&& maxRate != BulkWriterOptions.DEFAULT_UNSET_VALUE
&& initialRate > maxRate) {
throw FirestoreException.invalidState(
"'maxOpsPerSecond' cannot be less than 'initialOpsPerSecond'.");
}
}
}
Expand Up @@ -20,15 +20,31 @@

/** Options used to disable request throttling in BulkWriter. */
final class BulkWriterOptions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be time to make this an actual Builder. You can use AutoValue to reduce the amount of code you have to write. See

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attempted.

static final double DEFAULT_UNSET_VALUE = 1.1;
private final boolean throttling;
private double initialOpsPerSecond = DEFAULT_UNSET_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make these Double and initialize to null?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java doesn't allow setting Double to null, but I used Double.NaN.

private double maxOpsPerSecond = DEFAULT_UNSET_VALUE;

private final boolean enableThrottling;
BulkWriterOptions(boolean enableThrottling) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be package-private now?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N/A w/ AutoValue.

this.throttling = enableThrottling;
}

private BulkWriterOptions(boolean enableThrottling) {
this.enableThrottling = enableThrottling;
private BulkWriterOptions(double initialOpsPerSecond, double maxOpsPerSecond) {
this.throttling = true;
this.initialOpsPerSecond = initialOpsPerSecond;
this.maxOpsPerSecond = maxOpsPerSecond;
}

boolean isThrottlingEnabled() {
return enableThrottling;
return throttling;
}

double getInitialOpsPerSecond() {
return initialOpsPerSecond;
}

double getMaxOpsPerSecond() {
return maxOpsPerSecond;
}

/**
Expand All @@ -40,4 +56,48 @@ boolean isThrottlingEnabled() {
public static BulkWriterOptions withThrottlingDisabled() {
return new BulkWriterOptions(false);
}

/**
* An options object that will enable throttling in the created BulkWriter with the provided
* starting rate.
*
* @param initialOpsPerSecond The initial maximum number of operations per second allowed by the
* throttler.
* @return The BulkWriterOptions object.
*/
@Nonnull
public static BulkWriterOptions withInitialOpsPerSecondThrottling(int initialOpsPerSecond) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static BulkWriterOptions withInitialOpsPerSecondThrottling(int initialOpsPerSecond) {
public static BulkWriterOptions withInitialOpsPerSecond(double initialOpsPerSecond) {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return new BulkWriterOptions(initialOpsPerSecond, DEFAULT_UNSET_VALUE);
}

/**
* An options object that will enable throttling in the created BulkWriter with the provided
* maximum rate.
*
* @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler.
* The throttler's allowed operations per second does not ramp up past the specified
* operations per second.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to follow from line above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

* @return The BulkWriterOptions object.
*/
@Nonnull
public static BulkWriterOptions withMaxOpsPerSecondThrottling(int maxOpsPerSecond) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static BulkWriterOptions withMaxOpsPerSecondThrottling(int maxOpsPerSecond) {
public static BulkWriterOptions withMaxOpsPerSecond(double maxOpsPerSecond) {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to double.

return new BulkWriterOptions(DEFAULT_UNSET_VALUE, maxOpsPerSecond);
}

/**
* An options object that will enable throttling in the created BulkWriter with the provided
* starting and maximum rate.
*
* @param initialOpsPerSecond The initial maximum number of operations per second allowed by the
* throttler.
* @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler.
* The throttler's allowed operations per second does not ramp up past the specified
* operations per second.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this is already implied in the line above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N/A w/ AutoValue.

* @return The BulkWriterOptions object.
*/
@Nonnull
public static BulkWriterOptions withCustomThrottling(
int initialOpsPerSecond, int maxOpsPerSecond) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int initialOpsPerSecond, int maxOpsPerSecond) {
double initialOpsPerSecond, double maxOpsPerSecond) {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

return new BulkWriterOptions(initialOpsPerSecond, maxOpsPerSecond);
}
}
Expand Up @@ -94,12 +94,12 @@ public WriteBatch batch() {

@Nonnull
BulkWriter bulkWriter() {
return new BulkWriter(this, /* enableThrottling= */ true);
return new BulkWriter(this, new BulkWriterOptions(true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add back /* enableThrottling= */ ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}

@Nonnull
BulkWriter bulkWriter(BulkWriterOptions options) {
return new BulkWriter(this, options.isThrottlingEnabled());
return new BulkWriter(this, options);
}

@Nonnull
Expand Down
Expand Up @@ -38,31 +38,48 @@ class RateLimiter {
private final double multiplier;
private final int multiplierMillis;
private final long startTimeMillis;
private final int maximumCapacity;

private int availableTokens;
private long lastRefillTimeMillis;

RateLimiter(int initialCapacity, double multiplier, int multiplierMillis) {
this(initialCapacity, multiplier, multiplierMillis, new Date().getTime());
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, int maximumCapacity) {
this(initialCapacity, multiplier, multiplierMillis, maximumCapacity, new Date().getTime());
}

/**
* @param initialCapacity Initial maximum number of operations per second.
* @param multiplier Rate by which to increase the capacity.
* @param multiplierMillis How often the capacity should increase in milliseconds.
* @param maximumCapacity Maximum number of allowed operations per second. The number of tokens
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this different from the capacity? From what I can tell, the capacity grows unbounded and the rate is reduced artificially. The implementation is fine, but the name of the setting is a bit misleading.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the capacity is calculated, it's capped at the maximumCapacity, which I thought is short for "maximum allowed capacity".

When a new request is made, the number of tokens to allocate is done in calculateCapacity, which performs the Math.min() operation. This means that the capacity is bounded by maximumCapacity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in your implementation the total capacity can grow beyond maximumCapacity, but the rate at which tokens get deducted is limited. This implementation is correct, but the value that is passed here is not the maximum capacity, but rather the limit of tokens used at a time. e.g. after 5 seconds without requests the total capacity of the throttler could be 500, even if maximumCapacity is 100.

As stated, this is just a naming nit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for elaborating! Renamed to maximumRate.

* added per second will never exceed this number.
* @param startTimeMillis The starting time in epoch milliseconds that the rate limit is based on.
* Used for testing the limiter.
*/
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, long startTimeMillis) {
RateLimiter(
int initialCapacity,
double multiplier,
int multiplierMillis,
int maximumCapacity,
long startTimeMillis) {
this.initialCapacity = initialCapacity;
this.multiplier = multiplier;
this.multiplierMillis = multiplierMillis;
this.maximumCapacity = maximumCapacity;
this.startTimeMillis = startTimeMillis;

this.availableTokens = initialCapacity;
this.lastRefillTimeMillis = startTimeMillis;
}

public int getInitialCapacity() {
return initialCapacity;
}

public int getMaximumCapacity() {
return maximumCapacity;
}

public boolean tryMakeRequest(int numOperations) {
return tryMakeRequest(numOperations, new Date().getTime());
}
Expand Down Expand Up @@ -132,7 +149,10 @@ private void refillTokens(long requestTimeMillis) {
public int calculateCapacity(long requestTimeMillis) {
long millisElapsed = requestTimeMillis - startTimeMillis;
int operationsPerSecond =
(int) (Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity);
Math.min(
(int)
(Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity),
maximumCapacity);
return operationsPerSecond;
}
}
Expand Up @@ -50,7 +50,6 @@
import javax.annotation.Nonnull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
Expand Down Expand Up @@ -549,8 +548,6 @@ public void flushCompletesWhenAllWritesComplete() throws Exception {
}

@Test
@Ignore
// TODO(chenbrian): Fix this test after throttling options are added.
public void doesNotSendBatchesIfDoingSoExceedsRateLimit() {
final boolean[] timeoutCalled = {false};
final ScheduledExecutorService timeoutExecutor =
Expand All @@ -565,7 +562,8 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
}
};
doReturn(timeoutExecutor).when(firestoreRpc).getExecutor();
BulkWriter bulkWriter = firestoreMock.bulkWriter();
BulkWriter bulkWriter =
firestoreMock.bulkWriter(BulkWriterOptions.withInitialOpsPerSecondThrottling(5));

for (int i = 0; i < 600; ++i) {
bulkWriter.set(firestoreMock.document("coll/doc" + i), LocalFirestoreHelper.SINGLE_FIELD_MAP);
Expand Down Expand Up @@ -706,4 +704,65 @@ public ApiFuture<Object> answer(InvocationOnMock mock) {
assertEquals(retryAttempts[0], BulkWriter.MAX_RETRY_ATTEMPTS + 1);
}
}

@Test
public void optionsRequiresPositiveInteger() throws Exception {
try {
firestoreMock.bulkWriter(BulkWriterOptions.withInitialOpsPerSecondThrottling(-1));
fail("bulkWriter() call should have failed");
} catch (Exception e) {
assertEquals(
e.getMessage(),
"Value for argument 'initialOpsPerSecond' must be an integer within [1, Infinity], but was: -1");
}

try {
firestoreMock.bulkWriter(BulkWriterOptions.withMaxOpsPerSecondThrottling(-1));
fail("bulkWriter() call should have failed");
} catch (Exception e) {
assertEquals(
e.getMessage(),
"Value for argument 'maxOpsPerSecond' must be an integer within [1, Infinity], but was: -1");
}
}

@Test
public void optionsRequiresMaxGreaterThanInitial() throws Exception {
try {
firestoreMock.bulkWriter(BulkWriterOptions.withCustomThrottling(550, 500));
fail("bulkWriter() call should have failed");
} catch (Exception e) {
assertEquals(e.getMessage(), "'maxOpsPerSecond' cannot be less than 'initialOpsPerSecond'.");
}
}

@Test
public void optionsInitialAndMaxRatesAreProperlySet() throws Exception {
BulkWriter bulkWriter =
firestoreMock.bulkWriter(BulkWriterOptions.withCustomThrottling(500, 550));
assertEquals(bulkWriter.getRateLimiter().getInitialCapacity(), 500);
assertEquals(bulkWriter.getRateLimiter().getMaximumCapacity(), 550);

bulkWriter = firestoreMock.bulkWriter(BulkWriterOptions.withMaxOpsPerSecondThrottling(1000));
assertEquals(bulkWriter.getRateLimiter().getInitialCapacity(), 500);
assertEquals(bulkWriter.getRateLimiter().getMaximumCapacity(), 1000);

bulkWriter = firestoreMock.bulkWriter(BulkWriterOptions.withInitialOpsPerSecondThrottling(100));
assertEquals(bulkWriter.getRateLimiter().getInitialCapacity(), 100);
assertEquals(bulkWriter.getRateLimiter().getMaximumCapacity(), Integer.MAX_VALUE);

bulkWriter = firestoreMock.bulkWriter(BulkWriterOptions.withMaxOpsPerSecondThrottling(100));
assertEquals(bulkWriter.getRateLimiter().getInitialCapacity(), 100);
assertEquals(bulkWriter.getRateLimiter().getMaximumCapacity(), 100);

bulkWriter = firestoreMock.bulkWriter();
assertEquals(
bulkWriter.getRateLimiter().getInitialCapacity(),
BulkWriter.DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND);
assertEquals(bulkWriter.getRateLimiter().getMaximumCapacity(), Integer.MAX_VALUE);

bulkWriter = firestoreMock.bulkWriter(BulkWriterOptions.withThrottlingDisabled());
assertEquals(bulkWriter.getRateLimiter().getInitialCapacity(), Integer.MAX_VALUE);
assertEquals(bulkWriter.getRateLimiter().getMaximumCapacity(), Integer.MAX_VALUE);
}
}
Expand Up @@ -38,6 +38,7 @@ public void before() {
/* initialCapacity= */ 500,
/* multiplier= */ 1.5,
/* multiplierMillis= */ 5 * 60 * 1000,
/* maximumCapacity= */ 1000000,
/* startTime= */ new Date(0).getTime());
}

Expand Down Expand Up @@ -108,5 +109,8 @@ public void calculatesMaxOperations() {
assertEquals(1125, limiter.calculateCapacity(new Date(10 * 60 * 1000).getTime()));
assertEquals(1687, limiter.calculateCapacity(new Date(15 * 60 * 1000).getTime()));
assertEquals(738945, limiter.calculateCapacity(new Date(90 * 60 * 1000).getTime()));

// Check that maximum rate limit is enforced.
assertEquals(1000000, limiter.calculateCapacity(new Date(1000 * 60 * 1000).getTime()));
}
}