The throttler's allowed operations per second does not ramp up past the specified operations + * per second. + * + * @return The maximum number of operations per second allowed by the throttler. */ - @Nonnull - public static BulkWriterOptions withThrottlingDisabled() { - return new BulkWriterOptions(false); + @Nullable + abstract Double getMaxOpsPerSecond(); + + static Builder builder() { + return new AutoValue_BulkWriterOptions.Builder() + .setMaxOpsPerSecond(null) + .setInitialOpsPerSecond(null) + .setThrottlingEnabled(true); + } + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + /** + * Sets whether throttling should be enabled. By default, throttling is enabled. + * + * @param enabled Whether throttling should be enabled. + */ + abstract Builder setThrottlingEnabled(boolean enabled); + + /** + * Set the initial maximum number of operations per second allowed by the throttler. + * + * @param initialOpsPerSecond The initial maximum number of operations per second allowed by the + * throttler. + */ + abstract Builder setInitialOpsPerSecond(@Nullable Double initialOpsPerSecond); + + /** + * Set the initial maximum number of operations per second allowed by the throttler. + * + * @param initialOpsPerSecond The initial maximum number of operations per second allowed by the + * throttler. + */ + Builder setInitialOpsPerSecond(int initialOpsPerSecond) { + return setInitialOpsPerSecond(new Double(initialOpsPerSecond)); + } + + /** + * Set the maximum number of operations per second allowed by the throttler. + * + * @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler. + * The throttler's allowed operations per second does not ramp up past the specified + * operations per second. + */ + abstract Builder setMaxOpsPerSecond(@Nullable Double maxOpsPerSecond); + + /** + * Set the maximum number of operations per second allowed by the throttler. + * + * @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler. + * The throttler's allowed operations per second does not ramp up past the specified + * operations per second. + */ + Builder setMaxOpsPerSecond(int maxOpsPerSecond) { + return setMaxOpsPerSecond(new Double(maxOpsPerSecond)); + } + + abstract BulkWriterOptions autoBuild(); + + BulkWriterOptions build() { + BulkWriterOptions options = autoBuild(); + Double initialRate = options.getInitialOpsPerSecond(); + Double maxRate = options.getMaxOpsPerSecond(); + + if (initialRate != null && initialRate < 1) { + throw FirestoreException.invalidState( + "Value for argument 'initialOpsPerSecond' must be greater than 1, but was: " + + initialRate.intValue()); + } + + if (maxRate != null && maxRate < 1) { + throw FirestoreException.invalidState( + "Value for argument 'maxOpsPerSecond' must be greater than 1, but was: " + + maxRate.intValue()); + } + + if (maxRate != null && initialRate != null && initialRate > maxRate) { + throw FirestoreException.invalidState( + "'maxOpsPerSecond' cannot be less than 'initialOpsPerSecond'."); + } + + if (!options.getThrottlingEnabled() && (maxRate != null || initialRate != null)) { + throw FirestoreException.invalidState( + "Cannot set 'initialOpsPerSecond' or 'maxOpsPerSecond' when 'throttlingEnabled' is set to false."); + } + return options; + } } } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java index 57afa9938..d56bec0c2 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java @@ -94,12 +94,12 @@ public WriteBatch batch() { @Nonnull BulkWriter bulkWriter() { - return new BulkWriter(this, /* enableThrottling= */ true); + return new BulkWriter(this, BulkWriterOptions.builder().setThrottlingEnabled(true).build()); } @Nonnull BulkWriter bulkWriter(BulkWriterOptions options) { - return new BulkWriter(this, options.isThrottlingEnabled()); + return new BulkWriter(this, options); } @Nonnull diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RateLimiter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RateLimiter.java index 8bd6e5ce3..d8ad3c674 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RateLimiter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RateLimiter.java @@ -38,31 +38,48 @@ class RateLimiter { private final double multiplier; private final int multiplierMillis; private final long startTimeMillis; + private final int maximumRate; private int availableTokens; private long lastRefillTimeMillis; - RateLimiter(int initialCapacity, double multiplier, int multiplierMillis) { - this(initialCapacity, multiplier, multiplierMillis, new Date().getTime()); + RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, int maximumRate) { + this(initialCapacity, multiplier, multiplierMillis, maximumRate, new Date().getTime()); } /** * @param initialCapacity Initial maximum number of operations per second. * @param multiplier Rate by which to increase the capacity. * @param multiplierMillis How often the capacity should increase in milliseconds. + * @param maximumRate Maximum number of allowed operations per second. The number of tokens added + * per second will never exceed this number. * @param startTimeMillis The starting time in epoch milliseconds that the rate limit is based on. * Used for testing the limiter. */ - RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, long startTimeMillis) { + RateLimiter( + int initialCapacity, + double multiplier, + int multiplierMillis, + int maximumRate, + long startTimeMillis) { this.initialCapacity = initialCapacity; this.multiplier = multiplier; this.multiplierMillis = multiplierMillis; + this.maximumRate = maximumRate; this.startTimeMillis = startTimeMillis; this.availableTokens = initialCapacity; this.lastRefillTimeMillis = startTimeMillis; } + public int getInitialCapacity() { + return initialCapacity; + } + + public int getMaximumRate() { + return maximumRate; + } + public boolean tryMakeRequest(int numOperations) { return tryMakeRequest(numOperations, new Date().getTime()); } @@ -132,7 +149,10 @@ private void refillTokens(long requestTimeMillis) { public int calculateCapacity(long requestTimeMillis) { long millisElapsed = requestTimeMillis - startTimeMillis; int operationsPerSecond = - (int) (Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity); + Math.min( + (int) + (Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity), + maximumRate); return operationsPerSecond; } } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java index f51471009..e4e4b0230 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java @@ -50,7 +50,6 @@ import javax.annotation.Nonnull; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -549,8 +548,6 @@ public void flushCompletesWhenAllWritesComplete() throws Exception { } @Test - @Ignore - // TODO(chenbrian): Fix this test after throttling options are added. public void doesNotSendBatchesIfDoingSoExceedsRateLimit() { final boolean[] timeoutCalled = {false}; final ScheduledExecutorService timeoutExecutor = @@ -565,7 +562,8 @@ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) } }; doReturn(timeoutExecutor).when(firestoreRpc).getExecutor(); - BulkWriter bulkWriter = firestoreMock.bulkWriter(); + BulkWriter bulkWriter = + firestoreMock.bulkWriter(BulkWriterOptions.builder().setInitialOpsPerSecond(5).build()); for (int i = 0; i < 600; ++i) { bulkWriter.set(firestoreMock.document("coll/doc" + i), LocalFirestoreHelper.SINGLE_FIELD_MAP); @@ -706,4 +704,100 @@ public ApiFuture