diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
index 0c98eae8d..8d3bbac26 100644
--- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
+++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
@@ -293,9 +293,8 @@ public void close() throws InterruptedException {
}
}
- /** Package-private for use in testing. */
- @VisibleForTesting
- FlowController getFlowController() {
+ @InternalApi("For google-cloud-java client use only")
+ public FlowController getFlowController() {
return flowController;
}
diff --git a/gax/src/main/java/com/google/api/gax/batching/FlowControlEventStats.java b/gax/src/main/java/com/google/api/gax/batching/FlowControlEventStats.java
new file mode 100644
index 000000000..005b2bced
--- /dev/null
+++ b/gax/src/main/java/com/google/api/gax/batching/FlowControlEventStats.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google LLC nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package com.google.api.gax.batching;
+
+import static com.google.api.gax.batching.FlowController.LimitExceededBehavior;
+
+import com.google.api.core.InternalApi;
+import com.google.api.gax.batching.FlowController.FlowControlException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
+/**
+ * Record the statistics of flow control events.
+ *
+ *
This class is populated by FlowController, which will record throttling events. Currently it
+ * only keeps the last flow control event, but it could be expanded to record more information in
+ * the future. The events can be used to dynamically adjust concurrency in the client. For example:
+ *
+ *
{@code
+ * // Increase flow control limits if there was throttling in the past 5 minutes and throttled time
+ * // was longer than 1 minute.
+ * while(true) {
+ * FlowControlEvent event = flowControlEventStats.getLastFlowControlEvent();
+ * if (event != null
+ * && event.getTimestampMs() > System.currentMillis() - TimeUnit.MINUTES.toMillis(5)
+ * && event.getThrottledTimeInMs() > TimeUnit.MINUTES.toMillis(1)) {
+ * flowController.increaseThresholds(elementSteps, byteSteps);
+ * }
+ * Thread.sleep(TimeUnit.MINUTE.toMillis(10));
+ * }
+ * }
+ */
+@InternalApi("For google-cloud-java client use only")
+public class FlowControlEventStats {
+
+ private volatile FlowControlEvent lastFlowControlEvent;
+
+ // We only need the last event to check if there was throttling in the past X minutes so this
+ // doesn't need to be super accurate.
+ void recordFlowControlEvent(FlowControlEvent event) {
+ if (lastFlowControlEvent == null || event.compareTo(lastFlowControlEvent) > 0) {
+ lastFlowControlEvent = event;
+ }
+ }
+
+ public FlowControlEvent getLastFlowControlEvent() {
+ return lastFlowControlEvent;
+ }
+
+ /**
+ * A flow control event. Record throttled time if {@link LimitExceededBehavior} is {@link
+ * LimitExceededBehavior#Block}, or the exception if the behavior is {@link
+ * LimitExceededBehavior#ThrowException}.
+ */
+ public static class FlowControlEvent implements Comparable {
+ static FlowControlEvent createReserveDelayed(long throttledTimeInMs) {
+ return createReserveDelayed(System.currentTimeMillis(), throttledTimeInMs);
+ }
+
+ static FlowControlEvent createReserveDenied(FlowControlException exception) {
+ return createReserveDenied(System.currentTimeMillis(), exception);
+ }
+
+ /** Package-private for use in testing. */
+ @VisibleForTesting
+ static FlowControlEvent createReserveDelayed(long timestampMs, long throttledTimeInMs) {
+ Preconditions.checkArgument(timestampMs > 0, "timestamp must be greater than 0");
+ Preconditions.checkArgument(throttledTimeInMs > 0, "throttled time must be greater than 0");
+ return new FlowControlEvent(timestampMs, throttledTimeInMs, null);
+ }
+
+ /** Package-private for use in testing. */
+ @VisibleForTesting
+ static FlowControlEvent createReserveDenied(long timestampMs, FlowControlException exception) {
+ Preconditions.checkArgument(timestampMs > 0, "timestamp must be greater than 0");
+ Preconditions.checkNotNull(
+ exception, "FlowControlException can't be null when reserve is denied");
+ return new FlowControlEvent(timestampMs, null, exception);
+ }
+
+ private long timestampMs;
+ private Long throttledTimeMs;
+ private FlowControlException exception;
+
+ private FlowControlEvent(
+ long timestampMs,
+ @Nullable Long throttledTimeMs,
+ @Nullable FlowControlException exception) {
+ this.timestampMs = timestampMs;
+ this.throttledTimeMs = throttledTimeMs;
+ this.exception = exception;
+ }
+
+ public long getTimestampMs() {
+ return timestampMs;
+ }
+
+ @Nullable
+ public FlowControlException getException() {
+ return exception;
+ }
+
+ @Nullable
+ public Long getThrottledTime(TimeUnit timeUnit) {
+ return throttledTimeMs == null
+ ? null
+ : timeUnit.convert(throttledTimeMs, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int compareTo(FlowControlEvent otherEvent) {
+ return Long.compare(this.getTimestampMs(), otherEvent.getTimestampMs());
+ }
+ }
+}
diff --git a/gax/src/main/java/com/google/api/gax/batching/FlowController.java b/gax/src/main/java/com/google/api/gax/batching/FlowController.java
index a7b906c24..15a6cd8e1 100644
--- a/gax/src/main/java/com/google/api/gax/batching/FlowController.java
+++ b/gax/src/main/java/com/google/api/gax/batching/FlowController.java
@@ -31,7 +31,10 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
+import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/** Provides flow control capability. */
@@ -149,6 +152,11 @@ public enum LimitExceededBehavior {
private final LimitExceededBehavior limitExceededBehavior;
private final Object updateLimitLock;
+ // Threshold to record throttling events. If reserve() takes longer than this threshold, it will
+ // be recorded as a throttling event.
+ private static final long RESERVE_FLOW_CONTROL_THRESHOLD_MS = 1;
+ private final FlowControlEventStats flowControlEventStats;
+
public FlowController(FlowControlSettings settings) {
// When the FlowController is initialized with FlowControlSettings, flow control limits can't be
// adjusted. min, current, max element count and request bytes are initialized with the max
@@ -160,6 +168,7 @@ public FlowController(FlowControlSettings settings) {
public FlowController(DynamicFlowControlSettings settings) {
this.limitExceededBehavior = settings.getLimitExceededBehavior();
this.updateLimitLock = new Object();
+ this.flowControlEventStats = new FlowControlEventStats();
switch (settings.getLimitExceededBehavior()) {
case ThrowException:
case Block:
@@ -204,10 +213,15 @@ public void reserve(long elements, long bytes) throws FlowControlException {
Preconditions.checkArgument(elements >= 0);
Preconditions.checkArgument(bytes >= 0);
+ Stopwatch stopwatch = Stopwatch.createStarted();
if (outstandingElementCount != null) {
if (!outstandingElementCount.acquire(elements)) {
- throw new MaxOutstandingElementCountReachedException(
- outstandingElementCount.getPermitLimit());
+ MaxOutstandingElementCountReachedException exception =
+ new MaxOutstandingElementCountReachedException(
+ outstandingElementCount.getPermitLimit());
+ flowControlEventStats.recordFlowControlEvent(
+ FlowControlEvent.createReserveDenied(exception));
+ throw exception;
}
}
@@ -218,9 +232,17 @@ public void reserve(long elements, long bytes) throws FlowControlException {
if (outstandingElementCount != null) {
outstandingElementCount.release(elements);
}
- throw new MaxOutstandingRequestBytesReachedException(outstandingByteCount.getPermitLimit());
+ MaxOutstandingRequestBytesReachedException exception =
+ new MaxOutstandingRequestBytesReachedException(outstandingByteCount.getPermitLimit());
+ flowControlEventStats.recordFlowControlEvent(
+ FlowControlEvent.createReserveDenied(exception));
+ throw exception;
}
}
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ if (elapsed >= RESERVE_FLOW_CONTROL_THRESHOLD_MS) {
+ flowControlEventStats.recordFlowControlEvent(FlowControlEvent.createReserveDelayed(elapsed));
+ }
}
public void release(long elements, long bytes) {
@@ -333,4 +355,9 @@ public Long getCurrentElementCountLimit() {
public Long getCurrentRequestBytesLimit() {
return outstandingByteCount == null ? null : outstandingByteCount.getPermitLimit();
}
+
+ @InternalApi("For google-cloud-java client use only")
+ public FlowControlEventStats getFlowControlEventStats() {
+ return flowControlEventStats;
+ }
}
diff --git a/gax/src/test/java/com/google/api/gax/batching/FlowControlEventStatsTest.java b/gax/src/test/java/com/google/api/gax/batching/FlowControlEventStatsTest.java
new file mode 100644
index 000000000..5136f312a
--- /dev/null
+++ b/gax/src/test/java/com/google/api/gax/batching/FlowControlEventStatsTest.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google LLC nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package com.google.api.gax.batching;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent;
+import com.google.api.gax.batching.FlowController.MaxOutstandingRequestBytesReachedException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class FlowControlEventStatsTest {
+
+ @Test
+ public void testCreateEvent() {
+ long timestamp = 12345, throttledTimeMs = 5000;
+ FlowControlEvent event = FlowControlEvent.createReserveDelayed(timestamp, throttledTimeMs);
+ assertEquals(event.getTimestampMs(), event.getTimestampMs());
+ assertEquals(throttledTimeMs / 1000, event.getThrottledTime(TimeUnit.SECONDS).longValue());
+ assertNull(event.getException());
+
+ MaxOutstandingRequestBytesReachedException exception =
+ new MaxOutstandingRequestBytesReachedException(100);
+ event = FlowControlEvent.createReserveDenied(timestamp, exception);
+ assertEquals(timestamp, event.getTimestampMs());
+ assertNotNull(event.getException());
+ assertEquals(exception, event.getException());
+ assertNull(event.getThrottledTime(TimeUnit.MILLISECONDS));
+
+ try {
+ event = FlowControlEvent.createReserveDenied(null);
+ fail("FlowControlEvent did not throw exception");
+ } catch (NullPointerException e) {
+ // expected, ignore
+ }
+ }
+
+ @Test
+ public void testGetLastEvent() throws InterruptedException {
+ final FlowControlEventStats stats = new FlowControlEventStats();
+ final long currentTime = System.currentTimeMillis();
+
+ List threads = new ArrayList<>();
+ for (int i = 1; i <= 10; i++) {
+ final int timeElapsed = i;
+ Thread t =
+ new Thread() {
+ @Override
+ public void run() {
+ stats.recordFlowControlEvent(
+ FlowControlEvent.createReserveDelayed(currentTime + timeElapsed, timeElapsed));
+ }
+ };
+ threads.add(t);
+ t.start();
+ }
+
+ for (Thread t : threads) {
+ t.join(10);
+ }
+
+ assertEquals(currentTime + 10, stats.getLastFlowControlEvent().getTimestampMs());
+ assertEquals(
+ 10, stats.getLastFlowControlEvent().getThrottledTime(TimeUnit.MILLISECONDS).longValue());
+ }
+}
diff --git a/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java b/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java
index 8cd84b22b..b0bc72531 100644
--- a/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java
+++ b/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java
@@ -29,8 +29,10 @@
*/
package com.google.api.gax.batching;
+import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -42,9 +44,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -225,6 +229,9 @@ public void testReserveRelease_rejectedByElementCount() throws Exception {
testRejectedReserveRelease(
flowController, 10, 10, FlowController.MaxOutstandingElementCountReachedException.class);
+ assertNotNull(flowController.getFlowControlEventStats().getLastFlowControlEvent());
+ assertThat(flowController.getFlowControlEventStats().getLastFlowControlEvent().getException())
+ .isInstanceOf(FlowController.MaxOutstandingElementCountReachedException.class);
}
@Test
@@ -252,6 +259,9 @@ public void testReserveRelease_rejectedByNumberOfBytes() throws Exception {
testRejectedReserveRelease(
flowController, 10, 10, FlowController.MaxOutstandingRequestBytesReachedException.class);
+ assertNotNull(flowController.getFlowControlEventStats().getLastFlowControlEvent());
+ assertThat(flowController.getFlowControlEventStats().getLastFlowControlEvent().getException())
+ .isInstanceOf(FlowController.MaxOutstandingRequestBytesReachedException.class);
}
@Test
@@ -325,6 +335,7 @@ public void testConstructedByDynamicFlowControlSetting() {
assertNull(flowController.getMinRequestBytesLimit());
assertNull(flowController.getCurrentRequestBytesLimit());
assertNull(flowController.getMaxRequestBytesLimit());
+ assertNotNull(flowController.getFlowControlEventStats());
}
@Test
@@ -515,7 +526,7 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception {
testConcurrentUpdates(
flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter);
for (Thread t : reserveThreads) {
- t.join(100);
+ t.join(200);
}
assertEquals(reserveThreads.size(), releasedCounter.get());
assertTrue(totalIncreased.get() > 0);
@@ -548,7 +559,7 @@ public void testConcurrentUpdateThresholds_nonBlocking() throws Exception {
testConcurrentUpdates(
flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter);
for (Thread t : reserveThreads) {
- t.join(100);
+ t.join(200);
}
assertEquals(reserveThreads.size(), releasedCounter.get());
assertTrue(totalIncreased.get() > 0);
@@ -655,6 +666,75 @@ public void run() {
t.join();
}
+ @Test
+ public void testFlowControlBlockEventIsRecorded() throws Exception {
+ // Test when reserve is blocked for at least FlowController#RESERVE_FLOW_CONTROL_THRESHOLD_MS,
+ // FlowController will record the FlowControlEvent in FlowControlEventStats
+ final FlowController flowController =
+ new FlowController(
+ DynamicFlowControlSettings.newBuilder()
+ .setInitialOutstandingElementCount(5L)
+ .setMinOutstandingElementCount(1L)
+ .setMaxOutstandingElementCount(10L)
+ .setInitialOutstandingRequestBytes(5L)
+ .setMinOutstandingRequestBytes(1L)
+ .setMaxOutstandingRequestBytes(10L)
+ .setLimitExceededBehavior(LimitExceededBehavior.Block)
+ .build());
+ Runnable runnable =
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ flowController.reserve(1, 1);
+ } catch (FlowController.FlowControlException e) {
+ throw new AssertionError(e);
+ }
+ }
+ };
+ // blocked by element. Reserve all 5 elements first, reserve in the runnable will be blocked
+ flowController.reserve(5, 1);
+ ExecutorService executor = Executors.newCachedThreadPool();
+ Future> finished1 = executor.submit(runnable);
+ try {
+ finished1.get(50, TimeUnit.MILLISECONDS);
+ fail("reserve should block");
+ } catch (TimeoutException e) {
+ // expected
+ }
+ assertFalse(finished1.isDone());
+ flowController.release(5, 1);
+ // After other elements are released, reserve in the runnable should go through. Since reserve
+ // was blocked for longer than 1 millisecond, FlowController should record this event in
+ // FlowControlEventStats.
+ finished1.get(50, TimeUnit.MILLISECONDS);
+ assertNotNull(flowController.getFlowControlEventStats().getLastFlowControlEvent());
+ assertNotNull(
+ flowController
+ .getFlowControlEventStats()
+ .getLastFlowControlEvent()
+ .getThrottledTime(TimeUnit.MILLISECONDS));
+ flowController.release(1, 1);
+
+ // Similar to blocked by element, test blocking by bytes.
+ flowController.reserve(1, 5);
+ Future> finished2 = executor.submit(runnable);
+ try {
+ finished2.get(50, TimeUnit.MILLISECONDS);
+ fail("reserve should block");
+ } catch (TimeoutException e) {
+ // expected
+ }
+ assertFalse(finished2.isDone());
+ long currentTime = System.currentTimeMillis();
+ flowController.release(1, 5);
+ finished2.get(50, TimeUnit.MILLISECONDS);
+ assertNotNull(flowController.getFlowControlEventStats().getLastFlowControlEvent());
+ // Make sure this newer event is recorded
+ assertThat(flowController.getFlowControlEventStats().getLastFlowControlEvent().getTimestampMs())
+ .isAtLeast(currentTime);
+ }
+
private List testConcurrentUpdates(
final FlowController flowController,
final int increaseStepRange,