From 27a9397f67e151d723241c80ccb2ec9f1bfbba1c Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Fri, 9 Oct 2020 13:49:08 -0500 Subject: [PATCH] feat: add max/min throttling options to BulkWriterOptions (#400) --- .../google/cloud/firestore/BulkWriter.java | 48 +++++-- .../cloud/firestore/BulkWriterOptions.java | 129 +++++++++++++++--- .../google/cloud/firestore/FirestoreImpl.java | 4 +- .../google/cloud/firestore/RateLimiter.java | 28 +++- .../cloud/firestore/BulkWriterTest.java | 102 +++++++++++++- .../cloud/firestore/RateLimiterTest.java | 4 + 6 files changed, 280 insertions(+), 35 deletions(-) diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java index 1fcfada8e..8fad7dea2 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java @@ -53,7 +53,7 @@ final class BulkWriter implements AutoCloseable { * @see Ramping * up traffic */ - private static final int STARTING_MAXIMUM_OPS_PER_SECOND = 500; + static final int DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND = 500; /** * The rate by which to increase the capacity as specified by the 500/50/5 rule. @@ -97,7 +97,7 @@ final class BulkWriter implements AutoCloseable { private final ExponentialRetryAlgorithm backoff; private TimedAttemptSettings nextAttempt; - BulkWriter(FirestoreImpl firestore, boolean enableThrottling) { + BulkWriter(FirestoreImpl firestore, BulkWriterOptions options) { this.firestore = firestore; this.backoff = new ExponentialRetryAlgorithm( @@ -105,14 +105,41 @@ final class BulkWriter implements AutoCloseable { this.nextAttempt = backoff.createFirstAttempt(); this.firestoreExecutor = firestore.getClient().getExecutor(); - if (enableThrottling) { - rateLimiter = + if (!options.getThrottlingEnabled()) { + this.rateLimiter = new RateLimiter( - STARTING_MAXIMUM_OPS_PER_SECOND, - RATE_LIMITER_MULTIPLIER, - RATE_LIMITER_MULTIPLIER_MILLIS); + Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE); } else { - rateLimiter = new RateLimiter(Integer.MAX_VALUE, Double.MAX_VALUE, Integer.MAX_VALUE); + double startingRate = DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND; + double maxRate = Double.POSITIVE_INFINITY; + + if (options.getInitialOpsPerSecond() != null) { + startingRate = options.getInitialOpsPerSecond(); + } + + if (options.getMaxOpsPerSecond() != null) { + maxRate = options.getMaxOpsPerSecond(); + } + + // The initial validation step ensures that the maxOpsPerSecond is greater than + // initialOpsPerSecond. If this inequality is true, that means initialOpsPerSecond was not + // set and maxOpsPerSecond is less than the default starting rate. + if (maxRate < startingRate) { + startingRate = maxRate; + } + + // Ensure that the batch size is not larger than the number of allowed + // operations per second. + if (startingRate < maxBatchSize) { + this.maxBatchSize = (int) startingRate; + } + + this.rateLimiter = + new RateLimiter( + (int) startingRate, + RATE_LIMITER_MULTIPLIER, + RATE_LIMITER_MULTIPLIER_MILLIS, + (int) maxRate); } } @@ -679,4 +706,9 @@ public ApiFuture apply(List results) { void setMaxBatchSize(int size) { maxBatchSize = size; } + + @VisibleForTesting + RateLimiter getRateLimiter() { + return rateLimiter; + } } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java index 8e114e08c..aa04f0bff 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOptions.java @@ -16,28 +16,123 @@ package com.google.cloud.firestore; -import javax.annotation.Nonnull; +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; -/** Options used to disable request throttling in BulkWriter. */ -final class BulkWriterOptions { - - private final boolean enableThrottling; - - private BulkWriterOptions(boolean enableThrottling) { - this.enableThrottling = enableThrottling; - } +/** Options used to configure request throttling in BulkWriter. */ +@AutoValue +abstract class BulkWriterOptions { + /** + * Return whether throttling is enabled. + * + * @return Whether throttling is enabled. + */ + abstract boolean getThrottlingEnabled(); - boolean isThrottlingEnabled() { - return enableThrottling; - } + /** + * Returns the initial maximum number of operations per second allowed by the throttler. + * + * @return The initial maximum number of operations per second allowed by the throttler. + */ + @Nullable + abstract Double getInitialOpsPerSecond(); /** - * An options object that will disable throttling in the created BulkWriter. + * Returns the maximum number of operations per second allowed by the throttler. * - * @return The BulkWriterOptions object. + *

The throttler's allowed operations per second does not ramp up past the specified operations + * per second. + * + * @return The maximum number of operations per second allowed by the throttler. */ - @Nonnull - public static BulkWriterOptions withThrottlingDisabled() { - return new BulkWriterOptions(false); + @Nullable + abstract Double getMaxOpsPerSecond(); + + static Builder builder() { + return new AutoValue_BulkWriterOptions.Builder() + .setMaxOpsPerSecond(null) + .setInitialOpsPerSecond(null) + .setThrottlingEnabled(true); + } + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + /** + * Sets whether throttling should be enabled. By default, throttling is enabled. + * + * @param enabled Whether throttling should be enabled. + */ + abstract Builder setThrottlingEnabled(boolean enabled); + + /** + * Set the initial maximum number of operations per second allowed by the throttler. + * + * @param initialOpsPerSecond The initial maximum number of operations per second allowed by the + * throttler. + */ + abstract Builder setInitialOpsPerSecond(@Nullable Double initialOpsPerSecond); + + /** + * Set the initial maximum number of operations per second allowed by the throttler. + * + * @param initialOpsPerSecond The initial maximum number of operations per second allowed by the + * throttler. + */ + Builder setInitialOpsPerSecond(int initialOpsPerSecond) { + return setInitialOpsPerSecond(new Double(initialOpsPerSecond)); + } + + /** + * Set the maximum number of operations per second allowed by the throttler. + * + * @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler. + * The throttler's allowed operations per second does not ramp up past the specified + * operations per second. + */ + abstract Builder setMaxOpsPerSecond(@Nullable Double maxOpsPerSecond); + + /** + * Set the maximum number of operations per second allowed by the throttler. + * + * @param maxOpsPerSecond The maximum number of operations per second allowed by the throttler. + * The throttler's allowed operations per second does not ramp up past the specified + * operations per second. + */ + Builder setMaxOpsPerSecond(int maxOpsPerSecond) { + return setMaxOpsPerSecond(new Double(maxOpsPerSecond)); + } + + abstract BulkWriterOptions autoBuild(); + + BulkWriterOptions build() { + BulkWriterOptions options = autoBuild(); + Double initialRate = options.getInitialOpsPerSecond(); + Double maxRate = options.getMaxOpsPerSecond(); + + if (initialRate != null && initialRate < 1) { + throw FirestoreException.invalidState( + "Value for argument 'initialOpsPerSecond' must be greater than 1, but was: " + + initialRate.intValue()); + } + + if (maxRate != null && maxRate < 1) { + throw FirestoreException.invalidState( + "Value for argument 'maxOpsPerSecond' must be greater than 1, but was: " + + maxRate.intValue()); + } + + if (maxRate != null && initialRate != null && initialRate > maxRate) { + throw FirestoreException.invalidState( + "'maxOpsPerSecond' cannot be less than 'initialOpsPerSecond'."); + } + + if (!options.getThrottlingEnabled() && (maxRate != null || initialRate != null)) { + throw FirestoreException.invalidState( + "Cannot set 'initialOpsPerSecond' or 'maxOpsPerSecond' when 'throttlingEnabled' is set to false."); + } + return options; + } } } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java index 57afa9938..d56bec0c2 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java @@ -94,12 +94,12 @@ public WriteBatch batch() { @Nonnull BulkWriter bulkWriter() { - return new BulkWriter(this, /* enableThrottling= */ true); + return new BulkWriter(this, BulkWriterOptions.builder().setThrottlingEnabled(true).build()); } @Nonnull BulkWriter bulkWriter(BulkWriterOptions options) { - return new BulkWriter(this, options.isThrottlingEnabled()); + return new BulkWriter(this, options); } @Nonnull diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RateLimiter.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RateLimiter.java index 8bd6e5ce3..d8ad3c674 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RateLimiter.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/RateLimiter.java @@ -38,31 +38,48 @@ class RateLimiter { private final double multiplier; private final int multiplierMillis; private final long startTimeMillis; + private final int maximumRate; private int availableTokens; private long lastRefillTimeMillis; - RateLimiter(int initialCapacity, double multiplier, int multiplierMillis) { - this(initialCapacity, multiplier, multiplierMillis, new Date().getTime()); + RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, int maximumRate) { + this(initialCapacity, multiplier, multiplierMillis, maximumRate, new Date().getTime()); } /** * @param initialCapacity Initial maximum number of operations per second. * @param multiplier Rate by which to increase the capacity. * @param multiplierMillis How often the capacity should increase in milliseconds. + * @param maximumRate Maximum number of allowed operations per second. The number of tokens added + * per second will never exceed this number. * @param startTimeMillis The starting time in epoch milliseconds that the rate limit is based on. * Used for testing the limiter. */ - RateLimiter(int initialCapacity, double multiplier, int multiplierMillis, long startTimeMillis) { + RateLimiter( + int initialCapacity, + double multiplier, + int multiplierMillis, + int maximumRate, + long startTimeMillis) { this.initialCapacity = initialCapacity; this.multiplier = multiplier; this.multiplierMillis = multiplierMillis; + this.maximumRate = maximumRate; this.startTimeMillis = startTimeMillis; this.availableTokens = initialCapacity; this.lastRefillTimeMillis = startTimeMillis; } + public int getInitialCapacity() { + return initialCapacity; + } + + public int getMaximumRate() { + return maximumRate; + } + public boolean tryMakeRequest(int numOperations) { return tryMakeRequest(numOperations, new Date().getTime()); } @@ -132,7 +149,10 @@ private void refillTokens(long requestTimeMillis) { public int calculateCapacity(long requestTimeMillis) { long millisElapsed = requestTimeMillis - startTimeMillis; int operationsPerSecond = - (int) (Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity); + Math.min( + (int) + (Math.pow(multiplier, (int) (millisElapsed / multiplierMillis)) * initialCapacity), + maximumRate); return operationsPerSecond; } } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java index f51471009..e4e4b0230 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java @@ -50,7 +50,6 @@ import javax.annotation.Nonnull; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -549,8 +548,6 @@ public void flushCompletesWhenAllWritesComplete() throws Exception { } @Test - @Ignore - // TODO(chenbrian): Fix this test after throttling options are added. public void doesNotSendBatchesIfDoingSoExceedsRateLimit() { final boolean[] timeoutCalled = {false}; final ScheduledExecutorService timeoutExecutor = @@ -565,7 +562,8 @@ public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) } }; doReturn(timeoutExecutor).when(firestoreRpc).getExecutor(); - BulkWriter bulkWriter = firestoreMock.bulkWriter(); + BulkWriter bulkWriter = + firestoreMock.bulkWriter(BulkWriterOptions.builder().setInitialOpsPerSecond(5).build()); for (int i = 0; i < 600; ++i) { bulkWriter.set(firestoreMock.document("coll/doc" + i), LocalFirestoreHelper.SINGLE_FIELD_MAP); @@ -706,4 +704,100 @@ public ApiFuture answer(InvocationOnMock mock) { assertEquals(retryAttempts[0], BulkWriter.MAX_RETRY_ATTEMPTS + 1); } } + + @Test + public void optionsRequiresPositiveInteger() throws Exception { + try { + firestoreMock.bulkWriter(BulkWriterOptions.builder().setInitialOpsPerSecond(-1).build()); + fail("bulkWriter() call should have failed"); + } catch (Exception e) { + assertEquals( + e.getMessage(), + "Value for argument 'initialOpsPerSecond' must be greater than 1, but was: -1"); + } + + try { + firestoreMock.bulkWriter(BulkWriterOptions.builder().setMaxOpsPerSecond(-1).build()); + fail("bulkWriter() call should have failed"); + } catch (Exception e) { + assertEquals( + e.getMessage(), + "Value for argument 'maxOpsPerSecond' must be greater than 1, but was: -1"); + } + } + + @Test + public void optionsRequiresMaxGreaterThanInitial() throws Exception { + try { + firestoreMock.bulkWriter( + BulkWriterOptions.builder().setInitialOpsPerSecond(550).setMaxOpsPerSecond(500).build()); + fail("bulkWriter() call should have failed"); + } catch (Exception e) { + assertEquals(e.getMessage(), "'maxOpsPerSecond' cannot be less than 'initialOpsPerSecond'."); + } + } + + @Test + public void cannotSetThrottlingOptionsWithThrottlingDisabled() throws Exception { + try { + firestoreMock.bulkWriter( + BulkWriterOptions.builder() + .setThrottlingEnabled(false) + .setInitialOpsPerSecond(500) + .build()); + fail("bulkWriter() call should have failed"); + } catch (Exception e) { + assertEquals( + e.getMessage(), + "Cannot set 'initialOpsPerSecond' or 'maxOpsPerSecond' when 'throttlingEnabled' is set to false."); + } + + try { + firestoreMock.bulkWriter( + BulkWriterOptions.builder().setThrottlingEnabled(false).setMaxOpsPerSecond(500).build()); + fail("bulkWriter() call should have failed"); + } catch (Exception e) { + assertEquals( + e.getMessage(), + "Cannot set 'initialOpsPerSecond' or 'maxOpsPerSecond' when 'throttlingEnabled' is set to false."); + } + } + + @Test + public void optionsInitialAndMaxRatesAreProperlySet() throws Exception { + BulkWriter bulkWriter = + firestoreMock.bulkWriter( + BulkWriterOptions.builder() + .setInitialOpsPerSecond(500) + .setMaxOpsPerSecond(550) + .build()); + assertEquals(bulkWriter.getRateLimiter().getInitialCapacity(), 500); + assertEquals(bulkWriter.getRateLimiter().getMaximumRate(), 550); + + bulkWriter = + firestoreMock.bulkWriter(BulkWriterOptions.builder().setMaxOpsPerSecond(1000).build()); + assertEquals(bulkWriter.getRateLimiter().getInitialCapacity(), 500); + assertEquals(bulkWriter.getRateLimiter().getMaximumRate(), 1000); + + bulkWriter = + firestoreMock.bulkWriter(BulkWriterOptions.builder().setInitialOpsPerSecond(100).build()); + assertEquals(bulkWriter.getRateLimiter().getInitialCapacity(), 100); + assertEquals(bulkWriter.getRateLimiter().getMaximumRate(), Integer.MAX_VALUE); + + bulkWriter = + firestoreMock.bulkWriter(BulkWriterOptions.builder().setMaxOpsPerSecond(100).build()); + assertEquals(bulkWriter.getRateLimiter().getInitialCapacity(), 100); + assertEquals(bulkWriter.getRateLimiter().getMaximumRate(), 100); + + bulkWriter = firestoreMock.bulkWriter(); + assertEquals( + bulkWriter.getRateLimiter().getInitialCapacity(), + BulkWriter.DEFAULT_STARTING_MAXIMUM_OPS_PER_SECOND); + assertEquals(bulkWriter.getRateLimiter().getMaximumRate(), Integer.MAX_VALUE); + + bulkWriter = + firestoreMock.bulkWriter(BulkWriterOptions.builder().setThrottlingEnabled(false).build()); + assertEquals(bulkWriter.getRateLimiter().getInitialCapacity(), Integer.MAX_VALUE); + assertEquals(bulkWriter.getRateLimiter().getMaximumRate(), Integer.MAX_VALUE); + } } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/RateLimiterTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/RateLimiterTest.java index 291f60733..baf142300 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/RateLimiterTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/RateLimiterTest.java @@ -38,6 +38,7 @@ public void before() { /* initialCapacity= */ 500, /* multiplier= */ 1.5, /* multiplierMillis= */ 5 * 60 * 1000, + /* maximumCapacity= */ 1000000, /* startTime= */ new Date(0).getTime()); } @@ -108,5 +109,8 @@ public void calculatesMaxOperations() { assertEquals(1125, limiter.calculateCapacity(new Date(10 * 60 * 1000).getTime())); assertEquals(1687, limiter.calculateCapacity(new Date(15 * 60 * 1000).getTime())); assertEquals(738945, limiter.calculateCapacity(new Date(90 * 60 * 1000).getTime())); + + // Check that maximum rate limit is enforced. + assertEquals(1000000, limiter.calculateCapacity(new Date(1000 * 60 * 1000).getTime())); } }