diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 228e56fd1..0c98eae8d 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -141,16 +141,16 @@ public BatcherImpl( // to avoid deadlocking if (flowController.getLimitExceededBehavior() != LimitExceededBehavior.Ignore) { Preconditions.checkArgument( - flowController.getMaxOutstandingElementCount() == null + flowController.getMaxElementCountLimit() == null || batchingSettings.getElementCountThreshold() == null - || flowController.getMaxOutstandingElementCount() + || flowController.getMaxElementCountLimit() >= batchingSettings.getElementCountThreshold(), "If throttling and batching on element count are enabled, FlowController" + "#maxOutstandingElementCount must be greater or equal to elementCountThreshold"); Preconditions.checkArgument( - flowController.getMaxOutstandingRequestBytes() == null + flowController.getMaxRequestBytesLimit() == null || batchingSettings.getRequestByteThreshold() == null - || flowController.getMaxOutstandingRequestBytes() + || flowController.getMaxRequestBytesLimit() >= batchingSettings.getRequestByteThreshold(), "If throttling and batching on request bytes are enabled, FlowController" + "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold"); diff --git a/gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java b/gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java index c4df9cc6b..0271f7e8a 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java +++ b/gax/src/main/java/com/google/api/gax/batching/BlockingSemaphore.java @@ -34,7 +34,8 @@ /** A {@link Semaphore64} that blocks until permits become available. */ class BlockingSemaphore implements Semaphore64 { - private long currentPermits; + private long availablePermits; + private long limit; private static void checkNotNegative(long l) { Preconditions.checkArgument(l >= 0, "negative permits not allowed: %s", l); @@ -42,32 +43,81 @@ private static void checkNotNegative(long l) { BlockingSemaphore(long permits) { checkNotNegative(permits); - this.currentPermits = permits; + this.availablePermits = permits; + this.limit = permits; } + @Override public synchronized void release(long permits) { checkNotNegative(permits); - - currentPermits += permits; + // TODO: throw exceptions when the permits overflow + availablePermits = Math.min(availablePermits + permits, limit); notifyAll(); } + @Override public synchronized boolean acquire(long permits) { checkNotNegative(permits); boolean interrupted = false; - while (currentPermits < permits) { + while (availablePermits < permits) { + try { + wait(); + } catch (InterruptedException e) { + interrupted = true; + } + } + // TODO: if thread is interrupted, we should not grant the permits + availablePermits -= permits; + + if (interrupted) { + Thread.currentThread().interrupt(); + } + return true; + } + + @Override + public synchronized boolean acquirePartial(long permits) { + checkNotNegative(permits); + + boolean interrupted = false; + // To allow individual oversized requests to be sent, clamp the requested permits to the maximum + // limit. This will allow individual large requests to be sent. Please note that this behavior + // will result in availablePermits going negative. + while (availablePermits < Math.min(limit, permits)) { try { wait(); } catch (InterruptedException e) { interrupted = true; } } - currentPermits -= permits; if (interrupted) { Thread.currentThread().interrupt(); } + + availablePermits -= permits; return true; } + + @Override + public synchronized void increasePermitLimit(long permits) { + checkNotNegative(permits); + availablePermits += permits; + limit += permits; + notifyAll(); + } + + @Override + public synchronized void reducePermitLimit(long reduction) { + checkNotNegative(reduction); + Preconditions.checkState(limit - reduction > 0, "permit limit underflow"); + availablePermits -= reduction; + limit -= reduction; + } + + @Override + public synchronized long getPermitLimit() { + return limit; + } } diff --git a/gax/src/main/java/com/google/api/gax/batching/DynamicFlowControlSettings.java b/gax/src/main/java/com/google/api/gax/batching/DynamicFlowControlSettings.java new file mode 100644 index 000000000..60783a6e2 --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/batching/DynamicFlowControlSettings.java @@ -0,0 +1,169 @@ +/* + * Copyright 2021 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import com.google.api.core.InternalApi; +import com.google.api.gax.batching.FlowController.LimitExceededBehavior; +import com.google.auto.value.AutoValue; +import com.google.common.base.Preconditions; +import javax.annotation.Nullable; + +/** Settings for dynamic flow control */ +@AutoValue +@InternalApi("For google-cloud-java client use only") +public abstract class DynamicFlowControlSettings { + + /** Number of outstanding elements that {@link FlowController} allows when it's initiated. */ + @Nullable + public abstract Long getInitialOutstandingElementCount(); + + /** Number of outstanding bytes that {@link FlowController} allows when it's initiated. */ + @Nullable + public abstract Long getInitialOutstandingRequestBytes(); + + /** + * Maximum number of outstanding elements {@link FlowController} allows before enforcing flow + * control. + */ + @Nullable + public abstract Long getMaxOutstandingElementCount(); + + /** + * Maximum number of outstanding bytes {@link FlowController} allows before enforcing flow + * control. + */ + @Nullable + public abstract Long getMaxOutstandingRequestBytes(); + + /** + * Minimum number of outstanding elements {@link FlowController} allows before enforcing flow + * control. + */ + @Nullable + public abstract Long getMinOutstandingElementCount(); + + /** + * Minimum number of outstanding bytes {@link FlowController} allows before enforcing flow + * control. + */ + @Nullable + public abstract Long getMinOutstandingRequestBytes(); + + /** @see FlowControlSettings#getLimitExceededBehavior() */ + public abstract LimitExceededBehavior getLimitExceededBehavior(); + + public abstract Builder toBuilder(); + + public static Builder newBuilder() { + return new AutoValue_DynamicFlowControlSettings.Builder() + .setLimitExceededBehavior(LimitExceededBehavior.Block); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setInitialOutstandingElementCount(Long value); + + public abstract Builder setInitialOutstandingRequestBytes(Long value); + + public abstract Builder setMaxOutstandingElementCount(Long value); + + public abstract Builder setMaxOutstandingRequestBytes(Long value); + + public abstract Builder setMinOutstandingElementCount(Long value); + + public abstract Builder setMinOutstandingRequestBytes(Long value); + + public abstract Builder setLimitExceededBehavior(LimitExceededBehavior value); + + abstract DynamicFlowControlSettings autoBuild(); + + public DynamicFlowControlSettings build() { + DynamicFlowControlSettings settings = autoBuild(); + + verifyElementCountSettings(settings); + verifyRequestBytesSettings(settings); + + return settings; + } + + private void verifyElementCountSettings(DynamicFlowControlSettings settings) { + boolean isEnabled = + settings.getInitialOutstandingElementCount() != null + || settings.getMinOutstandingElementCount() != null + || settings.getMaxOutstandingElementCount() != null; + if (!isEnabled) { + return; + } + Preconditions.checkState( + settings.getInitialOutstandingElementCount() != null + && settings.getMinOutstandingElementCount() != null + && settings.getMaxOutstandingElementCount() != null, + "Throttling on element count is disabled by default. To enable this setting," + + " minOutstandingElementCount, initialOutstandingElementCount, and " + + "maxOutstandingElementCount must all be set."); + Preconditions.checkState( + settings.getMinOutstandingElementCount() > 0 + && settings.getInitialOutstandingElementCount() + <= settings.getMaxOutstandingElementCount() + && settings.getInitialOutstandingElementCount() + >= settings.getMinOutstandingElementCount(), + "If throttling on element count is set, minOutstandingElementCount must be" + + " greater than 0, and minOutstandingElementCount <= " + + "initialOutstandingElementCount <= maxOutstandingElementCount"); + } + + private void verifyRequestBytesSettings(DynamicFlowControlSettings settings) { + boolean isEnabled = + settings.getInitialOutstandingRequestBytes() != null + || settings.getMinOutstandingRequestBytes() != null + || settings.getMaxOutstandingRequestBytes() != null; + if (!isEnabled) { + return; + } + Preconditions.checkState( + settings.getInitialOutstandingRequestBytes() != null + && settings.getMinOutstandingRequestBytes() != null + && settings.getMaxOutstandingRequestBytes() != null, + "Throttling on number of bytes is disabled by default. To enable this " + + "setting, minOutstandingRequestBytes, initialOutstandingRequestBytes, and " + + "maxOutstandingRequestBytes must all be set"); + Preconditions.checkState( + settings.getMinOutstandingRequestBytes() > 0 + && settings.getInitialOutstandingRequestBytes() + <= settings.getMaxOutstandingRequestBytes() + && settings.getInitialOutstandingRequestBytes() + >= settings.getMinOutstandingRequestBytes(), + "If throttling on number of bytes is set, minOutstandingRequestBytes must " + + "be greater than 0, and minOutstandingRequestBytes <= " + + "initialOutstandingRequestBytes <= maxOutstandingRequestBytes"); + } + } +} diff --git a/gax/src/main/java/com/google/api/gax/batching/FlowController.java b/gax/src/main/java/com/google/api/gax/batching/FlowController.java index 75c1379c0..a7b906c24 100644 --- a/gax/src/main/java/com/google/api/gax/batching/FlowController.java +++ b/gax/src/main/java/com/google/api/gax/batching/FlowController.java @@ -142,19 +142,34 @@ public enum LimitExceededBehavior { @Nullable private final Semaphore64 outstandingElementCount; @Nullable private final Semaphore64 outstandingByteCount; - @Nullable private final Long maxOutstandingElementCount; - @Nullable private final Long maxOutstandingRequestBytes; + @Nullable private final Long maxElementCountLimit; + @Nullable private final Long maxRequestBytesLimit; + @Nullable private final Long minElementCountLimit; + @Nullable private final Long minRequestBytesLimit; private final LimitExceededBehavior limitExceededBehavior; + private final Object updateLimitLock; public FlowController(FlowControlSettings settings) { + // When the FlowController is initialized with FlowControlSettings, flow control limits can't be + // adjusted. min, current, max element count and request bytes are initialized with the max + // values in FlowControlSettings. + this(convertFlowControlSettings(settings)); + } + + @InternalApi("For google-cloud-java client use only") + public FlowController(DynamicFlowControlSettings settings) { this.limitExceededBehavior = settings.getLimitExceededBehavior(); + this.updateLimitLock = new Object(); switch (settings.getLimitExceededBehavior()) { case ThrowException: case Block: break; case Ignore: - this.maxOutstandingElementCount = null; - this.maxOutstandingRequestBytes = null; + this.maxElementCountLimit = null; + this.maxRequestBytesLimit = null; + this.minElementCountLimit = null; + this.minRequestBytesLimit = null; + this.outstandingElementCount = null; this.outstandingByteCount = null; return; @@ -162,23 +177,26 @@ public FlowController(FlowControlSettings settings) { throw new IllegalArgumentException( "Unknown LimitBehaviour: " + settings.getLimitExceededBehavior()); } - - this.maxOutstandingElementCount = settings.getMaxOutstandingElementCount(); - if (maxOutstandingElementCount == null) { + this.maxElementCountLimit = settings.getMaxOutstandingElementCount(); + this.minElementCountLimit = settings.getMinOutstandingElementCount(); + Long initialElementCountLimit = settings.getInitialOutstandingElementCount(); + if (initialElementCountLimit == null) { outstandingElementCount = null; } else if (settings.getLimitExceededBehavior() == FlowController.LimitExceededBehavior.Block) { - outstandingElementCount = new BlockingSemaphore(maxOutstandingElementCount); + outstandingElementCount = new BlockingSemaphore(initialElementCountLimit); } else { - outstandingElementCount = new NonBlockingSemaphore(maxOutstandingElementCount); + outstandingElementCount = new NonBlockingSemaphore(initialElementCountLimit); } - this.maxOutstandingRequestBytes = settings.getMaxOutstandingRequestBytes(); - if (maxOutstandingRequestBytes == null) { + this.maxRequestBytesLimit = settings.getMaxOutstandingRequestBytes(); + this.minRequestBytesLimit = settings.getMinOutstandingRequestBytes(); + Long initialRequestBytesLimit = settings.getInitialOutstandingRequestBytes(); + if (initialRequestBytesLimit == null) { outstandingByteCount = null; } else if (settings.getLimitExceededBehavior() == FlowController.LimitExceededBehavior.Block) { - outstandingByteCount = new BlockingSemaphore(maxOutstandingRequestBytes); + outstandingByteCount = new BlockingSemaphore(initialRequestBytesLimit); } else { - outstandingByteCount = new NonBlockingSemaphore(maxOutstandingRequestBytes); + outstandingByteCount = new NonBlockingSemaphore(initialRequestBytesLimit); } } @@ -188,19 +206,19 @@ public void reserve(long elements, long bytes) throws FlowControlException { if (outstandingElementCount != null) { if (!outstandingElementCount.acquire(elements)) { - throw new MaxOutstandingElementCountReachedException(maxOutstandingElementCount); + throw new MaxOutstandingElementCountReachedException( + outstandingElementCount.getPermitLimit()); } } - // Will always allow to send a request even if it is larger than the flow control limit, + // Always allows to send a request even if it is larger than the flow control limit, // if it doesn't then it will deadlock the thread. if (outstandingByteCount != null) { - long permitsToDraw = Math.min(bytes, maxOutstandingRequestBytes); - if (!outstandingByteCount.acquire(permitsToDraw)) { + if (!outstandingByteCount.acquirePartial(bytes)) { if (outstandingElementCount != null) { outstandingElementCount.release(elements); } - throw new MaxOutstandingRequestBytesReachedException(maxOutstandingRequestBytes); + throw new MaxOutstandingRequestBytesReachedException(outstandingByteCount.getPermitLimit()); } } } @@ -213,25 +231,106 @@ public void release(long elements, long bytes) { outstandingElementCount.release(elements); } if (outstandingByteCount != null) { - // Need to return at most as much bytes as it can be drawn. - long permitsToReturn = Math.min(bytes, maxOutstandingRequestBytes); - outstandingByteCount.release(permitsToReturn); + outstandingByteCount.release(bytes); + } + } + + /** + * Increase flow control limits to allow extra elementSteps elements and byteSteps request bytes + * before enforcing flow control. + */ + @InternalApi("For google-cloud-java client use only") + public void increaseThresholds(long elementSteps, long byteSteps) { + Preconditions.checkArgument(elementSteps >= 0); + Preconditions.checkArgument(byteSteps >= 0); + synchronized (updateLimitLock) { + if (outstandingElementCount != null) { + long actualStep = + Math.min(elementSteps, maxElementCountLimit - outstandingElementCount.getPermitLimit()); + outstandingElementCount.increasePermitLimit(actualStep); + } + + if (outstandingByteCount != null) { + long actualStep = + Math.min(byteSteps, maxRequestBytesLimit - outstandingByteCount.getPermitLimit()); + outstandingByteCount.increasePermitLimit(actualStep); + } + } + } + + /** + * Decrease flow control limits to allow elementSteps fewer elements and byteSteps fewer request + * bytes before enforcing flow control. + */ + @InternalApi("For google-cloud-java client use only") + public void decreaseThresholds(long elementSteps, long byteSteps) { + Preconditions.checkArgument(elementSteps >= 0); + Preconditions.checkArgument(byteSteps >= 0); + synchronized (updateLimitLock) { + if (outstandingElementCount != null) { + long actualStep = + Math.min(elementSteps, outstandingElementCount.getPermitLimit() - minElementCountLimit); + outstandingElementCount.reducePermitLimit(actualStep); + } + + if (outstandingByteCount != null) { + long actualStep = + Math.min(byteSteps, outstandingByteCount.getPermitLimit() - minRequestBytesLimit); + outstandingByteCount.reducePermitLimit(actualStep); + } } } + private static DynamicFlowControlSettings convertFlowControlSettings( + FlowControlSettings settings) { + return DynamicFlowControlSettings.newBuilder() + .setInitialOutstandingElementCount(settings.getMaxOutstandingElementCount()) + .setMinOutstandingElementCount(settings.getMaxOutstandingElementCount()) + .setMaxOutstandingElementCount(settings.getMaxOutstandingElementCount()) + .setInitialOutstandingRequestBytes(settings.getMaxOutstandingRequestBytes()) + .setMinOutstandingRequestBytes(settings.getMaxOutstandingRequestBytes()) + .setMaxOutstandingRequestBytes(settings.getMaxOutstandingRequestBytes()) + .setLimitExceededBehavior(settings.getLimitExceededBehavior()) + .build(); + } + LimitExceededBehavior getLimitExceededBehavior() { return limitExceededBehavior; } @InternalApi("For internal use by google-cloud-java clients only") @Nullable - public Long getMaxOutstandingElementCount() { - return maxOutstandingElementCount; + public Long getMaxElementCountLimit() { + return maxElementCountLimit; } @InternalApi("For internal use by google-cloud-java clients only") @Nullable - public Long getMaxOutstandingRequestBytes() { - return maxOutstandingRequestBytes; + public Long getMaxRequestBytesLimit() { + return maxRequestBytesLimit; + } + + @InternalApi("For google-cloud-java client use only") + @Nullable + public Long getMinElementCountLimit() { + return minElementCountLimit; + } + + @InternalApi("For google-cloud-java client use only") + @Nullable + public Long getMinRequestBytesLimit() { + return minRequestBytesLimit; + } + + @InternalApi("For google-cloud-java client use only") + @Nullable + public Long getCurrentElementCountLimit() { + return outstandingElementCount == null ? null : outstandingElementCount.getPermitLimit(); + } + + @InternalApi("For google-cloud-java client use only") + @Nullable + public Long getCurrentRequestBytesLimit() { + return outstandingByteCount == null ? null : outstandingByteCount.getPermitLimit(); } } diff --git a/gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java b/gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java index 62d881838..efa54d8bd 100644 --- a/gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java +++ b/gax/src/main/java/com/google/api/gax/batching/NonBlockingSemaphore.java @@ -35,7 +35,8 @@ /** A {@link Semaphore64} that immediately returns with failure if permits are not available. */ class NonBlockingSemaphore implements Semaphore64 { - private final AtomicLong currentPermits; + private AtomicLong availablePermits; + private AtomicLong limit; private static void checkNotNegative(long l) { Preconditions.checkArgument(l >= 0, "negative permits not allowed: %s", l); @@ -43,25 +44,76 @@ private static void checkNotNegative(long l) { NonBlockingSemaphore(long permits) { checkNotNegative(permits); - this.currentPermits = new AtomicLong(permits); + this.availablePermits = new AtomicLong(permits); + this.limit = new AtomicLong(permits); } + @Override public void release(long permits) { checkNotNegative(permits); - currentPermits.addAndGet(permits); + while (true) { + long old = availablePermits.get(); + // TODO: throw exceptions when the permits overflow + if (availablePermits.compareAndSet(old, Math.min(old + permits, limit.get()))) { + return; + } + } } + @Override public boolean acquire(long permits) { checkNotNegative(permits); - - for (; ; ) { - long old = currentPermits.get(); + while (true) { + long old = availablePermits.get(); if (old < permits) { return false; } - if (currentPermits.compareAndSet(old, old - permits)) { + if (availablePermits.compareAndSet(old, old - permits)) { + return true; + } + } + } + + @Override + public boolean acquirePartial(long permits) { + checkNotNegative(permits); + // To allow individual oversized requests to be sent, clamp the requested permits to the maximum + // limit. This will allow individual large requests to be sent. Please note that this behavior + // will result in availablePermits going negative. + while (true) { + long old = availablePermits.get(); + if (old < Math.min(limit.get(), permits)) { + return false; + } + if (availablePermits.compareAndSet(old, old - permits)) { return true; } } } + + @Override + public void increasePermitLimit(long permits) { + checkNotNegative(permits); + availablePermits.addAndGet(permits); + limit.addAndGet(permits); + } + + @Override + public void reducePermitLimit(long reduction) { + checkNotNegative(reduction); + + while (true) { + long oldLimit = limit.get(); + Preconditions.checkState(oldLimit - reduction > 0, "permit limit underflow"); + if (limit.compareAndSet(oldLimit, oldLimit - reduction)) { + availablePermits.addAndGet(-reduction); + return; + } + } + } + + @Override + public long getPermitLimit() { + return limit.get(); + } } diff --git a/gax/src/main/java/com/google/api/gax/batching/Semaphore64.java b/gax/src/main/java/com/google/api/gax/batching/Semaphore64.java index 742aec040..f0e751846 100644 --- a/gax/src/main/java/com/google/api/gax/batching/Semaphore64.java +++ b/gax/src/main/java/com/google/api/gax/batching/Semaphore64.java @@ -31,13 +31,27 @@ /** * Semaphore64 is similar to {@link java.util.concurrent.Semaphore} but allows up to {@code 2^63-1} + * permits. It also allows adding / reducing permits to the original limit and acquire partial * permits. * - *
Users who do not need such large number of permits are strongly encouraged to use Java's - * {@code Semaphore} instead. It is almost certainly faster and less error prone. + *
Users who do not need such large number of permits and the extra functionalities are strongly
+ * encouraged to use Java's {@code Semaphore} instead. It is almost certainly faster and less error
+ * prone.
*/
interface Semaphore64 {
boolean acquire(long permits);
void release(long permits);
+
+ /**
+ * When try to acquire more permits than what's allowed, acquiring the limit instead of what's
+ * asked.
+ */
+ boolean acquirePartial(long permits);
+
+ void increasePermitLimit(long permits);
+
+ void reducePermitLimit(long reduction);
+
+ long getPermitLimit();
}
diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java
index bb6cae532..96235d5cb 100644
--- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java
+++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java
@@ -617,7 +617,6 @@ public boolean isLoggable(LogRecord record) {
@Test
public void testCloseRace() throws ExecutionException, InterruptedException, TimeoutException {
int iterations = 1_000_000;
-
ExecutorService executor = Executors.newFixedThreadPool(100);
try {
@@ -684,16 +683,16 @@ public void testConstructors() throws InterruptedException {
assertThat(batcher1.getFlowController()).isNotNull();
assertThat(batcher1.getFlowController().getLimitExceededBehavior())
.isEqualTo(batchingSettings.getFlowControlSettings().getLimitExceededBehavior());
- assertThat(batcher1.getFlowController().getMaxOutstandingElementCount())
+ assertThat(batcher1.getFlowController().getMaxElementCountLimit())
.isEqualTo(batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount());
- assertThat(batcher1.getFlowController().getMaxOutstandingRequestBytes())
+ assertThat(batcher1.getFlowController().getMaxRequestBytesLimit())
.isEqualTo(batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes());
}
FlowController flowController =
new FlowController(
FlowControlSettings.newBuilder()
- .setLimitExceededBehavior(LimitExceededBehavior.ThrowException.ThrowException)
+ .setLimitExceededBehavior(LimitExceededBehavior.ThrowException)
.setMaxOutstandingRequestBytes(6000L)
.build());
try (BatcherImpl batcher2 = createDefaultBatcherImpl(batchingSettings, flowController)) {
diff --git a/gax/src/test/java/com/google/api/gax/batching/DynamicFlowControlSettingsTest.java b/gax/src/test/java/com/google/api/gax/batching/DynamicFlowControlSettingsTest.java
new file mode 100644
index 000000000..c1f326f39
--- /dev/null
+++ b/gax/src/test/java/com/google/api/gax/batching/DynamicFlowControlSettingsTest.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google LLC nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package com.google.api.gax.batching;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DynamicFlowControlSettingsTest {
+
+ @Test
+ public void testEmptyBuilder() {
+ DynamicFlowControlSettings.Builder builder = DynamicFlowControlSettings.newBuilder();
+ DynamicFlowControlSettings settings = builder.build();
+ assertNull(settings.getInitialOutstandingElementCount());
+ assertNull(settings.getInitialOutstandingRequestBytes());
+ assertNull(settings.getMaxOutstandingElementCount());
+ assertNull(settings.getMaxOutstandingRequestBytes());
+ assertNull(settings.getMinOutstandingElementCount());
+ assertNull(settings.getMinOutstandingRequestBytes());
+ assertEquals(LimitExceededBehavior.Block, settings.getLimitExceededBehavior());
+ }
+
+ @Test
+ public void testBuilder() {
+ DynamicFlowControlSettings.Builder builder =
+ DynamicFlowControlSettings.newBuilder()
+ .setInitialOutstandingElementCount(5L)
+ .setMaxOutstandingElementCount(10L)
+ .setMinOutstandingElementCount(1L)
+ .setInitialOutstandingRequestBytes(500L)
+ .setMaxOutstandingRequestBytes(1000L)
+ .setMinOutstandingRequestBytes(100L);
+ DynamicFlowControlSettings settings = builder.build();
+ assertThat(settings.getInitialOutstandingElementCount()).isEqualTo(5L);
+ assertThat(settings.getMaxOutstandingElementCount()).isEqualTo(10L);
+ assertThat(settings.getMinOutstandingElementCount()).isEqualTo(1L);
+ assertThat(settings.getInitialOutstandingRequestBytes()).isEqualTo(500L);
+ assertThat(settings.getMaxOutstandingRequestBytes()).isEqualTo(1000L);
+ assertThat(settings.getMinOutstandingRequestBytes()).isEqualTo(100L);
+ }
+
+ @Test
+ public void testValidElementCountSettings() {
+ DynamicFlowControlSettings.Builder builder =
+ DynamicFlowControlSettings.newBuilder()
+ .setInitialOutstandingElementCount(5L)
+ .setMaxOutstandingElementCount(10L)
+ .setMinOutstandingElementCount(1L);
+ DynamicFlowControlSettings settings = builder.build();
+ assertThat(settings.getInitialOutstandingElementCount()).isEqualTo(5L);
+ assertThat(settings.getMaxOutstandingElementCount()).isEqualTo(10L);
+ assertThat(settings.getMinOutstandingElementCount()).isEqualTo(1L);
+ }
+
+ @Test
+ public void testValidRequestByteSettings() {
+ DynamicFlowControlSettings.Builder builder =
+ DynamicFlowControlSettings.newBuilder()
+ .setInitialOutstandingRequestBytes(500L)
+ .setMaxOutstandingRequestBytes(1000L)
+ .setMinOutstandingRequestBytes(100L);
+ DynamicFlowControlSettings settings = builder.build();
+ assertThat(settings.getInitialOutstandingRequestBytes()).isEqualTo(500L);
+ assertThat(settings.getMaxOutstandingRequestBytes()).isEqualTo(1000L);
+ assertThat(settings.getMinOutstandingRequestBytes()).isEqualTo(100L);
+ }
+
+ @Test
+ public void testInvalidPartialSettings() {
+ DynamicFlowControlSettings.Builder builder =
+ DynamicFlowControlSettings.newBuilder().setInitialOutstandingElementCount(1L);
+ try {
+ builder.build();
+ fail("Did not throw an illegal state exception");
+ } catch (IllegalStateException e) {
+ // Expected, ignore
+ }
+ builder = DynamicFlowControlSettings.newBuilder().setMinOutstandingRequestBytes(1L);
+ try {
+ builder.build();
+ fail("Did not throw an illegal state exception");
+ } catch (IllegalStateException e) {
+ // Expected, ignore
+ }
+ }
+
+ @Test
+ public void testInvalidArguments() {
+ testInvalidElementCount(-1, -5, 10);
+ testInvalidElementCount(5, -1, 10);
+ testInvalidElementCount(5, 0, 10);
+ testInvalidElementCount(5, 6, 10);
+ testInvalidElementCount(5, 2, 2);
+
+ testInvalidNumberOfBytes(-1, -5, 10);
+ testInvalidNumberOfBytes(5, -1, 10);
+ testInvalidNumberOfBytes(5, 0, 10);
+ testInvalidNumberOfBytes(5, 6, 10);
+ testInvalidNumberOfBytes(5, 2, 2);
+ }
+
+ private void testInvalidElementCount(long initial, long min, long max) {
+ DynamicFlowControlSettings.Builder builder =
+ DynamicFlowControlSettings.newBuilder()
+ .setInitialOutstandingElementCount(initial)
+ .setMinOutstandingElementCount(min)
+ .setMaxOutstandingElementCount(max);
+ try {
+ builder.build();
+ fail("Did not throw an illegal state exception");
+ } catch (IllegalStateException e) {
+ // Expected, ignore
+ }
+ }
+
+ private void testInvalidNumberOfBytes(long initial, long min, long max) {
+ DynamicFlowControlSettings.Builder builder =
+ DynamicFlowControlSettings.newBuilder()
+ .setInitialOutstandingRequestBytes(initial)
+ .setMinOutstandingRequestBytes(min)
+ .setMaxOutstandingRequestBytes(max);
+ try {
+ builder.build();
+ fail("Did not throw an illegal state exception");
+ } catch (IllegalStateException e) {
+ // Expected, ignore
+ }
+ }
+}
diff --git a/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java b/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java
index 5411a8e13..8cd84b22b 100644
--- a/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java
+++ b/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java
@@ -29,13 +29,23 @@
*/
package com.google.api.gax.batching;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.google.api.gax.batching.FlowController.FlowControlException;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.common.util.concurrent.SettableFuture;
+import java.lang.Thread.State;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -81,6 +91,20 @@ public void testInvalidArguments() throws Exception {
} catch (IllegalArgumentException expected) {
// Expected
}
+ flowController.increaseThresholds(0, 0);
+ flowController.decreaseThresholds(0, 0);
+ try {
+ flowController.increaseThresholds(-1, 1);
+ fail("Must have thrown an illegal argument error");
+ } catch (IllegalArgumentException expected) {
+ // Expected
+ }
+ try {
+ flowController.decreaseThresholds(1, -1);
+ fail("Must have thrown an illegal argument error");
+ } catch (IllegalArgumentException expected) {
+ // Expected
+ }
}
@Test
@@ -182,9 +206,11 @@ public void run() {
});
permitsReserved.get();
+ assertFalse(finished.isDone());
flowController.release(1, 1);
- finished.get();
+ finished.get(50, TimeUnit.MILLISECONDS);
+ flowController.release(maxElementCount, maxNumBytes);
}
@Test
@@ -259,10 +285,192 @@ public void testRestoreAfterFail() throws FlowController.FlowControlException {
} catch (FlowController.MaxOutstandingRequestBytesReachedException e) {
// Ignore.
}
-
flowController.reserve(1, 0);
}
+ @Test
+ public void testConstructedByDynamicFlowControlSetting() {
+ FlowController flowController =
+ new FlowController(
+ DynamicFlowControlSettings.newBuilder()
+ .setMinOutstandingElementCount(1L)
+ .setInitialOutstandingElementCount(2L)
+ .setMaxOutstandingElementCount(3L)
+ .setMinOutstandingRequestBytes(10L)
+ .setInitialOutstandingRequestBytes(20L)
+ .setMaxOutstandingRequestBytes(30L)
+ .setLimitExceededBehavior(LimitExceededBehavior.Block)
+ .build());
+ assertEquals(1, flowController.getMinElementCountLimit().longValue());
+ assertEquals(2, flowController.getCurrentElementCountLimit().longValue());
+ assertEquals(3, flowController.getMaxElementCountLimit().longValue());
+ assertEquals(10, flowController.getMinRequestBytesLimit().longValue());
+ assertEquals(20, flowController.getCurrentRequestBytesLimit().longValue());
+ assertEquals(30, flowController.getMaxRequestBytesLimit().longValue());
+
+ flowController =
+ new FlowController(
+ DynamicFlowControlSettings.newBuilder()
+ .setMinOutstandingElementCount(1L)
+ .setInitialOutstandingElementCount(2L)
+ .setMaxOutstandingElementCount(3L)
+ .setMinOutstandingRequestBytes(10L)
+ .setInitialOutstandingRequestBytes(20L)
+ .setMaxOutstandingRequestBytes(30L)
+ .setLimitExceededBehavior(LimitExceededBehavior.Ignore)
+ .build());
+ assertNull(flowController.getMinElementCountLimit());
+ assertNull(flowController.getCurrentElementCountLimit());
+ assertNull(flowController.getMaxElementCountLimit());
+ assertNull(flowController.getMinRequestBytesLimit());
+ assertNull(flowController.getCurrentRequestBytesLimit());
+ assertNull(flowController.getMaxRequestBytesLimit());
+ }
+
+ @Test
+ public void testIncreaseThresholds_blocking() throws Exception {
+ FlowController flowController =
+ new FlowController(
+ DynamicFlowControlSettings.newBuilder()
+ .setInitialOutstandingElementCount(10L)
+ .setMinOutstandingElementCount(2L)
+ .setMaxOutstandingElementCount(20L)
+ .setInitialOutstandingRequestBytes(10L)
+ .setMinOutstandingRequestBytes(2L)
+ .setMaxOutstandingRequestBytes(20L)
+ .setLimitExceededBehavior(LimitExceededBehavior.Block)
+ .build());
+
+ assertEquals(10L, flowController.getCurrentElementCountLimit().longValue());
+ assertEquals(10L, flowController.getCurrentRequestBytesLimit().longValue());
+ testBlockingReserveRelease(flowController, 10, 0);
+ testBlockingReserveRelease(flowController, 0, 10);
+
+ flowController.increaseThresholds(5, 5);
+ assertEquals(15L, flowController.getCurrentElementCountLimit().longValue());
+ assertEquals(15L, flowController.getCurrentRequestBytesLimit().longValue());
+ testBlockingReserveRelease(flowController, 15, 0);
+ testBlockingReserveRelease(flowController, 0, 15);
+
+ // Thresholds can't go over max values. FlowController will set the values to max.
+ flowController.increaseThresholds(10, 10);
+ assertEquals(20L, flowController.getCurrentElementCountLimit().longValue());
+ assertEquals(20L, flowController.getCurrentRequestBytesLimit().longValue());
+ testBlockingReserveRelease(flowController, 20, 0);
+ testBlockingReserveRelease(flowController, 0, 20);
+ }
+
+ @Test
+ public void testDecreaseThresholds_blocking() throws Exception {
+ FlowController flowController =
+ new FlowController(
+ DynamicFlowControlSettings.newBuilder()
+ .setInitialOutstandingElementCount(10L)
+ .setMinOutstandingElementCount(2L)
+ .setMaxOutstandingElementCount(20L)
+ .setInitialOutstandingRequestBytes(10L)
+ .setMinOutstandingRequestBytes(2L)
+ .setMaxOutstandingRequestBytes(20L)
+ .setLimitExceededBehavior(LimitExceededBehavior.Block)
+ .build());
+
+ assertEquals(10L, flowController.getCurrentElementCountLimit().longValue());
+ assertEquals(10L, flowController.getCurrentRequestBytesLimit().longValue());
+ testBlockingReserveRelease(flowController, 10, 0);
+ testBlockingReserveRelease(flowController, 0, 10);
+
+ flowController.decreaseThresholds(5, 5);
+ assertEquals(5L, flowController.getCurrentElementCountLimit().longValue());
+ assertEquals(5L, flowController.getCurrentRequestBytesLimit().longValue());
+ testBlockingReserveRelease(flowController, 5, 0);
+ testBlockingReserveRelease(flowController, 0, 5);
+
+ // Thresholds can't go below min values. FlowController will set them to min svalues.
+ flowController.decreaseThresholds(5, 5);
+ assertEquals(2L, flowController.getCurrentElementCountLimit().longValue());
+ assertEquals(2L, flowController.getCurrentRequestBytesLimit().longValue());
+ testBlockingReserveRelease(flowController, 2, 0);
+ testBlockingReserveRelease(flowController, 0, 2);
+ }
+
+ @Test
+ public void testIncreaseThresholds_nonBlocking() throws Exception {
+ FlowController flowController =
+ new FlowController(
+ DynamicFlowControlSettings.newBuilder()
+ .setInitialOutstandingElementCount(10L)
+ .setMinOutstandingElementCount(2L)
+ .setMaxOutstandingElementCount(20L)
+ .setInitialOutstandingRequestBytes(10L)
+ .setMinOutstandingRequestBytes(2L)
+ .setMaxOutstandingRequestBytes(20L)
+ .setLimitExceededBehavior(LimitExceededBehavior.ThrowException)
+ .build());
+
+ assertEquals(10L, flowController.getCurrentElementCountLimit().longValue());
+ assertEquals(10L, flowController.getCurrentRequestBytesLimit().longValue());
+ testRejectedReserveRelease(
+ flowController, 10, 0, FlowController.MaxOutstandingElementCountReachedException.class);
+ testRejectedReserveRelease(
+ flowController, 0, 10, FlowController.MaxOutstandingRequestBytesReachedException.class);
+
+ flowController.increaseThresholds(5, 5);
+ assertEquals(15L, flowController.getCurrentElementCountLimit().longValue());
+ assertEquals(15L, flowController.getCurrentRequestBytesLimit().longValue());
+ testRejectedReserveRelease(
+ flowController, 15, 0, FlowController.MaxOutstandingElementCountReachedException.class);
+ testRejectedReserveRelease(
+ flowController, 0, 15, FlowController.MaxOutstandingRequestBytesReachedException.class);
+
+ // Thresholds can't go over max values. FlowController will set the values to max.
+ flowController.increaseThresholds(10, 10);
+ assertEquals(20L, flowController.getCurrentElementCountLimit().longValue());
+ assertEquals(20L, flowController.getCurrentRequestBytesLimit().longValue());
+ testRejectedReserveRelease(
+ flowController, 20, 0, FlowController.MaxOutstandingElementCountReachedException.class);
+ testRejectedReserveRelease(
+ flowController, 0, 20, FlowController.MaxOutstandingRequestBytesReachedException.class);
+ }
+
+ @Test
+ public void testDecreaseThresholds_nonBlocking() throws Exception {
+ FlowController flowController =
+ new FlowController(
+ DynamicFlowControlSettings.newBuilder()
+ .setInitialOutstandingElementCount(10L)
+ .setMinOutstandingElementCount(2L)
+ .setMaxOutstandingElementCount(20L)
+ .setInitialOutstandingRequestBytes(10L)
+ .setMinOutstandingRequestBytes(2L)
+ .setMaxOutstandingRequestBytes(20L)
+ .setLimitExceededBehavior(LimitExceededBehavior.ThrowException)
+ .build());
+
+ assertEquals(10L, flowController.getCurrentElementCountLimit().longValue());
+ assertEquals(10L, flowController.getCurrentRequestBytesLimit().longValue());
+ testRejectedReserveRelease(
+ flowController, 10, 0, FlowController.MaxOutstandingElementCountReachedException.class);
+ testRejectedReserveRelease(
+ flowController, 0, 10, FlowController.MaxOutstandingRequestBytesReachedException.class);
+
+ flowController.decreaseThresholds(5, 5);
+ assertEquals(5L, flowController.getCurrentElementCountLimit().longValue());
+ assertEquals(5L, flowController.getCurrentRequestBytesLimit().longValue());
+ testRejectedReserveRelease(
+ flowController, 5, 0, FlowController.MaxOutstandingElementCountReachedException.class);
+ testRejectedReserveRelease(
+ flowController, 0, 5, FlowController.MaxOutstandingRequestBytesReachedException.class);
+
+ // Thresholds can't go below min values. FlowController will set them to min svalues.
+ flowController.decreaseThresholds(5, 5);
+ assertEquals(2L, flowController.getCurrentElementCountLimit().longValue());
+ assertEquals(2L, flowController.getCurrentRequestBytesLimit().longValue());
+ testRejectedReserveRelease(
+ flowController, 2, 0, FlowController.MaxOutstandingElementCountReachedException.class);
+ testRejectedReserveRelease(
+ flowController, 0, 2, FlowController.MaxOutstandingRequestBytesReachedException.class);
+ }
+
private void testRejectedReserveRelease(
FlowController flowController,
int maxElementCount,
@@ -282,5 +490,227 @@ private void testRejectedReserveRelease(
flowController.release(1, 1);
flowController.reserve(maxElementCount, maxNumBytes);
+ flowController.release(maxElementCount, maxNumBytes);
+ }
+
+ @Test
+ public void testConcurrentUpdateThresholds_blocking() throws Exception {
+ int initialValue = 5000;
+ FlowController flowController =
+ new FlowController(
+ DynamicFlowControlSettings.newBuilder()
+ .setInitialOutstandingElementCount((long) initialValue)
+ .setMinOutstandingElementCount(1L)
+ .setMaxOutstandingElementCount(10000L)
+ .setInitialOutstandingRequestBytes((long) initialValue)
+ .setMinOutstandingRequestBytes(1L)
+ .setMaxOutstandingRequestBytes(50000L)
+ .setLimitExceededBehavior(LimitExceededBehavior.Block)
+ .build());
+ final AtomicInteger totalIncreased = new AtomicInteger(0);
+ final AtomicInteger totalDecreased = new AtomicInteger(0);
+ final AtomicInteger releasedCounter = new AtomicInteger(0);
+
+ List