New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add max/min throttling options to BulkWriterOptions #400
Changes from 2 commits
f5b8e0c
79e247f
201594d
3658104
64d0689
e9b48f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -43,7 +43,7 @@ | |||||
|
||||||
final class BulkWriter implements AutoCloseable { | ||||||
/** The maximum number of writes that can be in a single batch. */ | ||||||
public static final int MAX_BATCH_SIZE = 500; | ||||||
public static final int MAX_BATCH_SIZE = 20; | ||||||
|
||||||
public static final int MAX_RETRY_ATTEMPTS = 10; | ||||||
|
||||||
|
@@ -53,7 +53,7 @@ final class BulkWriter implements AutoCloseable { | |||||
* @see <a href=https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic>Ramping | ||||||
* up traffic</a> | ||||||
*/ | ||||||
private static final int STARTING_MAXIMUM_OPS_PER_SECOND = 500; | ||||||
static final int DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND = 500; | ||||||
|
||||||
/** | ||||||
* The rate by which to increase the capacity as specified by the 500/50/5 rule. | ||||||
|
@@ -97,22 +97,51 @@ final class BulkWriter implements AutoCloseable { | |||||
private final ExponentialRetryAlgorithm backoff; | ||||||
private TimedAttemptSettings nextAttempt; | ||||||
|
||||||
BulkWriter(FirestoreImpl firestore, boolean enableThrottling) { | ||||||
BulkWriter(FirestoreImpl firestore, BulkWriterOptions options) { | ||||||
validateBulkWriterOptions(options); | ||||||
this.firestore = firestore; | ||||||
this.backoff = | ||||||
new ExponentialRetryAlgorithm( | ||||||
firestore.getOptions().getRetrySettings(), CurrentMillisClock.getDefaultClock()); | ||||||
this.nextAttempt = backoff.createFirstAttempt(); | ||||||
this.firestoreExecutor = firestore.getClient().getExecutor(); | ||||||
|
||||||
if (enableThrottling) { | ||||||
rateLimiter = | ||||||
if (!options.isThrottlingEnabled()) { | ||||||
this.rateLimiter = | ||||||
new RateLimiter( | ||||||
STARTING_MAXIMUM_OPS_PER_SECOND, | ||||||
RATE_LIMITER_MULTIPLIER, | ||||||
RATE_LIMITER_MULTIPLIER_MILLIS); | ||||||
Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE); | ||||||
} else { | ||||||
rateLimiter = new RateLimiter(Integer.MAX_VALUE, Double.MAX_VALUE, Integer.MAX_VALUE); | ||||||
double startingRate = DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND; | ||||||
double maxRate = Integer.MAX_VALUE; | ||||||
|
||||||
if (options.getInitialOpsPerSecond() != BulkWriterOptions.DEFAULT_UNSET_VALUE) { | ||||||
startingRate = options.getInitialOpsPerSecond(); | ||||||
} | ||||||
|
||||||
if (options.getMaxOpsPerSecond() != BulkWriterOptions.DEFAULT_UNSET_VALUE) { | ||||||
maxRate = options.getMaxOpsPerSecond(); | ||||||
} | ||||||
|
||||||
// The initial validation step ensures that the maxOpsPerSecond is | ||||||
// greater than initialOpsPerSecond. If this inequality is true, that | ||||||
// means initialOpsPerSecond was not set and maxOpsPerSecond is less | ||||||
// than the default starting rate. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These comments need some reflowing to match the Java line length. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||||||
if (maxRate < startingRate) { | ||||||
startingRate = maxRate; | ||||||
} | ||||||
|
||||||
// Ensure that the batch size is not larger than the number of allowed | ||||||
// operations per second. | ||||||
if (startingRate < maxBatchSize) { | ||||||
this.maxBatchSize = (int) startingRate; | ||||||
} | ||||||
|
||||||
this.rateLimiter = | ||||||
new RateLimiter( | ||||||
(int) startingRate, | ||||||
RATE_LIMITER_MULTIPLIER, | ||||||
RATE_LIMITER_MULTIPLIER_MILLIS, | ||||||
(int) maxRate); | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -679,4 +708,33 @@ public ApiFuture<Void> apply(List<BatchWriteResult> results) { | |||||
void setMaxBatchSize(int size) { | ||||||
maxBatchSize = size; | ||||||
} | ||||||
|
||||||
@VisibleForTesting | ||||||
RateLimiter getRateLimiter() { | ||||||
return rateLimiter; | ||||||
} | ||||||
|
||||||
private void validateBulkWriterOptions(BulkWriterOptions options) { | ||||||
double initialRate = options.getInitialOpsPerSecond(); | ||||||
double maxRate = options.getMaxOpsPerSecond(); | ||||||
|
||||||
if (initialRate < 1) { | ||||||
throw FirestoreException.invalidState( | ||||||
"Value for argument 'initialOpsPerSecond' must be an integer within [1, Infinity], but was: " | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Easier to read, me thinks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||||||
+ (int) initialRate); | ||||||
} | ||||||
|
||||||
if (maxRate < 1) { | ||||||
throw FirestoreException.invalidState( | ||||||
"Value for argument 'maxOpsPerSecond' must be an integer within [1, Infinity], but was: " | ||||||
+ (int) maxRate); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Samesies. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. donesies. |
||||||
} | ||||||
|
||||||
if (initialRate != BulkWriterOptions.DEFAULT_UNSET_VALUE | ||||||
&& maxRate != BulkWriterOptions.DEFAULT_UNSET_VALUE | ||||||
&& initialRate > maxRate) { | ||||||
throw FirestoreException.invalidState( | ||||||
"'maxOpsPerSecond' cannot be less than 'initialOpsPerSecond'."); | ||||||
} | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -20,15 +20,31 @@ | |||||
|
||||||
/** Options used to disable request throttling in BulkWriter. */ | ||||||
final class BulkWriterOptions { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it might be time to make this an actual Builder. You can use AutoValue to reduce the amount of code you have to write. See java-firestore/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java Line 228 in edaa539
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. attempted. |
||||||
static final double DEFAULT_UNSET_VALUE = 1.1; | ||||||
private final boolean throttling; | ||||||
private double initialOpsPerSecond = DEFAULT_UNSET_VALUE; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make these There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Java doesn't allow setting |
||||||
private double maxOpsPerSecond = DEFAULT_UNSET_VALUE; | ||||||
|
||||||
private final boolean enableThrottling; | ||||||
BulkWriterOptions(boolean enableThrottling) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need to be package-private now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. N/A w/ AutoValue. |
||||||
this.throttling = enableThrottling; | ||||||
} | ||||||
|
||||||
private BulkWriterOptions(boolean enableThrottling) { | ||||||
this.enableThrottling = enableThrottling; | ||||||
private BulkWriterOptions(double initialOpsPerSecond, double maxOpsPerSecond) { | ||||||
this.throttling = true; | ||||||
this.initialOpsPerSecond = initialOpsPerSecond; | ||||||
this.maxOpsPerSecond = maxOpsPerSecond; | ||||||
} | ||||||
|
||||||
boolean isThrottlingEnabled() { | ||||||
return enableThrottling; | ||||||
return throttling; | ||||||
} | ||||||
|
||||||
double getInitialOpsPerSecond() { | ||||||
return initialOpsPerSecond; | ||||||
} | ||||||
|
||||||
double getMaxOpsPerSecond() { | ||||||
return maxOpsPerSecond; | ||||||
} | ||||||
|
||||||
/** | ||||||
|
@@ -40,4 +56,48 @@ boolean isThrottlingEnabled() { | |||||
public static BulkWriterOptions withThrottlingDisabled() { | ||||||
return new BulkWriterOptions(false); | ||||||
} | ||||||
|
||||||
/** | ||||||
* An options object that will enable throttling in the created BulkWriter with the provided | ||||||
* starting rate. | ||||||
* | ||||||
* @param initialOpsPerSecond The initial maximum number of operations per second allowed by the | ||||||
* throttler. | ||||||
* @return The BulkWriterOptions object. | ||||||
*/ | ||||||
@Nonnull | ||||||
public static BulkWriterOptions withInitialOpsPerSecondThrottling(int initialOpsPerSecond) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||||||
return new BulkWriterOptions(initialOpsPerSecond, DEFAULT_UNSET_VALUE); | ||||||
} | ||||||
|
||||||
/** | ||||||
* An options object that will enable throttling in the created BulkWriter with the provided | ||||||
* maximum rate. | ||||||
* | ||||||
* @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler. | ||||||
* The throttler's allowed operations per second does not ramp up past the specified | ||||||
* operations per second. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to follow from line above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||||||
* @return The BulkWriterOptions object. | ||||||
*/ | ||||||
@Nonnull | ||||||
public static BulkWriterOptions withMaxOpsPerSecondThrottling(int maxOpsPerSecond) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to double. |
||||||
return new BulkWriterOptions(DEFAULT_UNSET_VALUE, maxOpsPerSecond); | ||||||
} | ||||||
|
||||||
/** | ||||||
* An options object that will enable throttling in the created BulkWriter with the provided | ||||||
* starting and maximum rate. | ||||||
* | ||||||
* @param initialOpsPerSecond The initial maximum number of operations per second allowed by the | ||||||
* throttler. | ||||||
* @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler. | ||||||
* The throttler's allowed operations per second does not ramp up past the specified | ||||||
* operations per second. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like this is already implied in the line above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. N/A w/ AutoValue. |
||||||
* @return The BulkWriterOptions object. | ||||||
*/ | ||||||
@Nonnull | ||||||
public static BulkWriterOptions withCustomThrottling( | ||||||
int initialOpsPerSecond, int maxOpsPerSecond) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. |
||||||
return new BulkWriterOptions(initialOpsPerSecond, maxOpsPerSecond); | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -94,12 +94,12 @@ public WriteBatch batch() { | |
|
||
@Nonnull | ||
BulkWriter bulkWriter() { | ||
return new BulkWriter(this, /* enableThrottling= */ true); | ||
return new BulkWriter(this, new BulkWriterOptions(true)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add back There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
} | ||
|
||
@Nonnull | ||
BulkWriter bulkWriter(BulkWriterOptions options) { | ||
return new BulkWriter(this, options.isThrottlingEnabled()); | ||
return new BulkWriter(this, options); | ||
} | ||
|
||
@Nonnull | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,31 +38,48 @@ class RateLimiter { | |
private final double multiplier; | ||
private final int multiplierMillis; | ||
private final long startTimeMillis; | ||
private final int maximumCapacity; | ||
|
||
private int availableTokens; | ||
private long lastRefillTimeMillis; | ||
|
||
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis) { | ||
this(initialCapacity, multiplier, multiplierMillis, new Date().getTime()); | ||
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, int maximumCapacity) { | ||
this(initialCapacity, multiplier, multiplierMillis, maximumCapacity, new Date().getTime()); | ||
} | ||
|
||
/** | ||
* @param initialCapacity Initial maximum number of operations per second. | ||
* @param multiplier Rate by which to increase the capacity. | ||
* @param multiplierMillis How often the capacity should increase in milliseconds. | ||
* @param maximumCapacity Maximum number of allowed operations per second. The number of tokens | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this different from the capacity? From what I can tell, the capacity grows unbounded and the rate is reduced artificially. The implementation is fine, but the name of the setting is a bit misleading. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the capacity is calculated, it's capped at the When a new request is made, the number of tokens to allocate is done in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think in your implementation the total capacity can grow beyond As stated, this is just a naming nit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for elaborating! Renamed to |
||
* added per second will never exceed this number. | ||
* @param startTimeMillis The starting time in epoch milliseconds that the rate limit is based on. | ||
* Used for testing the limiter. | ||
*/ | ||
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, long startTimeMillis) { | ||
RateLimiter( | ||
int initialCapacity, | ||
double multiplier, | ||
int multiplierMillis, | ||
int maximumCapacity, | ||
long startTimeMillis) { | ||
this.initialCapacity = initialCapacity; | ||
this.multiplier = multiplier; | ||
this.multiplierMillis = multiplierMillis; | ||
this.maximumCapacity = maximumCapacity; | ||
this.startTimeMillis = startTimeMillis; | ||
|
||
this.availableTokens = initialCapacity; | ||
this.lastRefillTimeMillis = startTimeMillis; | ||
} | ||
|
||
public int getInitialCapacity() { | ||
return initialCapacity; | ||
} | ||
|
||
public int getMaximumCapacity() { | ||
return maximumCapacity; | ||
} | ||
|
||
public boolean tryMakeRequest(int numOperations) { | ||
return tryMakeRequest(numOperations, new Date().getTime()); | ||
} | ||
|
@@ -132,7 +149,10 @@ private void refillTokens(long requestTimeMillis) { | |
public int calculateCapacity(long requestTimeMillis) { | ||
long millisElapsed = requestTimeMillis - startTimeMillis; | ||
int operationsPerSecond = | ||
(int) (Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity); | ||
Math.min( | ||
(int) | ||
(Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity), | ||
maximumCapacity); | ||
return operationsPerSecond; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems strange. Should we use Double.POSITIVE_INFINITY?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.