New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add max/min throttling options to BulkWriterOptions #400
Changes from 4 commits
f5b8e0c
79e247f
201594d
3658104
64d0689
e9b48f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,7 +43,7 @@ | |
|
||
final class BulkWriter implements AutoCloseable { | ||
/** The maximum number of writes that can be in a single batch. */ | ||
public static final int MAX_BATCH_SIZE = 500; | ||
public static final int MAX_BATCH_SIZE = 25; | ||
|
||
public static final int MAX_RETRY_ATTEMPTS = 10; | ||
|
||
|
@@ -53,7 +53,7 @@ final class BulkWriter implements AutoCloseable { | |
* @see <a href=https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic>Ramping | ||
* up traffic</a> | ||
*/ | ||
private static final int STARTING_MAXIMUM_OPS_PER_SECOND = 500; | ||
static final int DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND = 500; | ||
|
||
/** | ||
* The rate by which to increase the capacity as specified by the 500/50/5 rule. | ||
|
@@ -97,22 +97,50 @@ final class BulkWriter implements AutoCloseable { | |
private final ExponentialRetryAlgorithm backoff; | ||
private TimedAttemptSettings nextAttempt; | ||
|
||
BulkWriter(FirestoreImpl firestore, boolean enableThrottling) { | ||
BulkWriter(FirestoreImpl firestore, BulkWriterOptions options) { | ||
validateBulkWriterOptions(options); | ||
this.firestore = firestore; | ||
this.backoff = | ||
new ExponentialRetryAlgorithm( | ||
firestore.getOptions().getRetrySettings(), CurrentMillisClock.getDefaultClock()); | ||
this.nextAttempt = backoff.createFirstAttempt(); | ||
this.firestoreExecutor = firestore.getClient().getExecutor(); | ||
|
||
if (enableThrottling) { | ||
rateLimiter = | ||
if (!options.getThrottlingEnabled()) { | ||
this.rateLimiter = | ||
new RateLimiter( | ||
STARTING_MAXIMUM_OPS_PER_SECOND, | ||
RATE_LIMITER_MULTIPLIER, | ||
RATE_LIMITER_MULTIPLIER_MILLIS); | ||
Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE); | ||
} else { | ||
rateLimiter = new RateLimiter(Integer.MAX_VALUE, Double.MAX_VALUE, Integer.MAX_VALUE); | ||
double startingRate = DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND; | ||
double maxRate = Integer.MAX_VALUE; | ||
|
||
if (!Double.isNaN(options.getInitialOpsPerSecond())) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use nullable values ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, but had to modify the AutoValue setters to support passing in |
||
startingRate = options.getInitialOpsPerSecond(); | ||
} | ||
|
||
if (!Double.isNaN(options.getMaxOpsPerSecond())) { | ||
maxRate = options.getMaxOpsPerSecond(); | ||
} | ||
|
||
// The initial validation step ensures that the maxOpsPerSecond is greater than | ||
// initialOpsPerSecond. If this inequality is true, that means initialOpsPerSecond was not | ||
// set and maxOpsPerSecond is less than the default starting rate. | ||
if (maxRate < startingRate) { | ||
startingRate = maxRate; | ||
} | ||
|
||
// Ensure that the batch size is not larger than the number of allowed | ||
// operations per second. | ||
if (startingRate < maxBatchSize) { | ||
this.maxBatchSize = (int) startingRate; | ||
} | ||
|
||
this.rateLimiter = | ||
new RateLimiter( | ||
(int) startingRate, | ||
RATE_LIMITER_MULTIPLIER, | ||
RATE_LIMITER_MULTIPLIER_MILLIS, | ||
(int) maxRate); | ||
} | ||
} | ||
|
||
|
@@ -679,4 +707,35 @@ public ApiFuture<Void> apply(List<BatchWriteResult> results) { | |
void setMaxBatchSize(int size) { | ||
maxBatchSize = size; | ||
} | ||
|
||
@VisibleForTesting | ||
RateLimiter getRateLimiter() { | ||
return rateLimiter; | ||
} | ||
|
||
private void validateBulkWriterOptions(BulkWriterOptions options) { | ||
double initialRate = options.getInitialOpsPerSecond(); | ||
double maxRate = options.getMaxOpsPerSecond(); | ||
|
||
if (initialRate < 1) { | ||
throw FirestoreException.invalidState( | ||
"Value for argument 'initialOpsPerSecond' must be greater than 1, but was: " | ||
+ (int) initialRate); | ||
} | ||
|
||
if (maxRate < 1) { | ||
throw FirestoreException.invalidState( | ||
"Value for argument 'maxOpsPerSecond' must be greater than 1, but was: " + (int) maxRate); | ||
} | ||
|
||
if (!Double.isNaN(initialRate) && !Double.isNaN(maxRate) && initialRate > maxRate) { | ||
throw FirestoreException.invalidState( | ||
"'maxOpsPerSecond' cannot be less than 'initialOpsPerSecond'."); | ||
} | ||
|
||
if (!options.getThrottlingEnabled() && (!Double.isNaN(initialRate) || !Double.isNaN(maxRate))) { | ||
throw FirestoreException.invalidState( | ||
"Cannot set 'initialRate' or 'maxRate' when 'throttlingEnabled' is set to false."); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally, this validation would happen in the BulkWriterOptions.build() step. This would follow the precedent in other builders. This might help: https://github.com/google/auto/blob/master/value/userguide/builders-howto.md#validate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,31 +38,48 @@ class RateLimiter { | |
private final double multiplier; | ||
private final int multiplierMillis; | ||
private final long startTimeMillis; | ||
private final int maximumCapacity; | ||
|
||
private int availableTokens; | ||
private long lastRefillTimeMillis; | ||
|
||
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis) { | ||
this(initialCapacity, multiplier, multiplierMillis, new Date().getTime()); | ||
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, int maximumCapacity) { | ||
this(initialCapacity, multiplier, multiplierMillis, maximumCapacity, new Date().getTime()); | ||
} | ||
|
||
/** | ||
* @param initialCapacity Initial maximum number of operations per second. | ||
* @param multiplier Rate by which to increase the capacity. | ||
* @param multiplierMillis How often the capacity should increase in milliseconds. | ||
* @param maximumCapacity Maximum number of allowed operations per second. The number of tokens | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this different from the capacity? From what I can tell, the capacity grows unbounded and the rate is reduced artificially. The implementation is fine, but the name of the setting is a bit misleading. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the capacity is calculated, it's capped at the When a new request is made, the number of tokens to allocate is done in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think in your implementation the total capacity can grow beyond As stated, this is just a naming nit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for elaborating! Renamed to |
||
* added per second will never exceed this number. | ||
* @param startTimeMillis The starting time in epoch milliseconds that the rate limit is based on. | ||
* Used for testing the limiter. | ||
*/ | ||
RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, long startTimeMillis) { | ||
RateLimiter( | ||
int initialCapacity, | ||
double multiplier, | ||
int multiplierMillis, | ||
int maximumCapacity, | ||
long startTimeMillis) { | ||
this.initialCapacity = initialCapacity; | ||
this.multiplier = multiplier; | ||
this.multiplierMillis = multiplierMillis; | ||
this.maximumCapacity = maximumCapacity; | ||
this.startTimeMillis = startTimeMillis; | ||
|
||
this.availableTokens = initialCapacity; | ||
this.lastRefillTimeMillis = startTimeMillis; | ||
} | ||
|
||
public int getInitialCapacity() { | ||
return initialCapacity; | ||
} | ||
|
||
public int getMaximumCapacity() { | ||
return maximumCapacity; | ||
} | ||
|
||
public boolean tryMakeRequest(int numOperations) { | ||
return tryMakeRequest(numOperations, new Date().getTime()); | ||
} | ||
|
@@ -132,7 +149,10 @@ private void refillTokens(long requestTimeMillis) { | |
public int calculateCapacity(long requestTimeMillis) { | ||
long millisElapsed = requestTimeMillis - startTimeMillis; | ||
int operationsPerSecond = | ||
(int) (Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity); | ||
Math.min( | ||
(int) | ||
(Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity), | ||
maximumCapacity); | ||
return operationsPerSecond; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems strange. Should we use Double.POSITIVE_INFINITY?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.