From 5329ea43c024ca14cea1012c5ab46e694e199492 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 5 Apr 2021 17:42:35 -0400 Subject: [PATCH] feat: dynamic flow control p3: add FlowControllerEventStats (#1332) * feat: dynamic flow control p3: add FlowControllerEventStats * add some more tests * add more documentation * fix the formatting * fix the failed test * Update comments * updates based on review * missed one review * remove the flaky test * deflake test * add some comments Co-authored-by: Igor Bernstein --- .../google/api/gax/batching/BatcherImpl.java | 5 +- .../gax/batching/FlowControlEventStats.java | 144 ++++++++++++++++++ .../api/gax/batching/FlowController.java | 33 +++- .../batching/FlowControlEventStatsTest.java | 101 ++++++++++++ .../api/gax/batching/FlowControllerTest.java | 84 +++++++++- 5 files changed, 359 insertions(+), 8 deletions(-) create mode 100644 gax/src/main/java/com/google/api/gax/batching/FlowControlEventStats.java create mode 100644 gax/src/test/java/com/google/api/gax/batching/FlowControlEventStatsTest.java diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 0c98eae8d..8d3bbac26 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -293,9 +293,8 @@ public void close() throws InterruptedException { } } - /** Package-private for use in testing. */ - @VisibleForTesting - FlowController getFlowController() { + @InternalApi("For google-cloud-java client use only") + public FlowController getFlowController() { return flowController; } diff --git a/gax/src/main/java/com/google/api/gax/batching/FlowControlEventStats.java b/gax/src/main/java/com/google/api/gax/batching/FlowControlEventStats.java new file mode 100644 index 000000000..005b2bced --- /dev/null +++ b/gax/src/main/java/com/google/api/gax/batching/FlowControlEventStats.java @@ -0,0 +1,144 @@ +/* + * Copyright 2021 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import static com.google.api.gax.batching.FlowController.LimitExceededBehavior; + +import com.google.api.core.InternalApi; +import com.google.api.gax.batching.FlowController.FlowControlException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +/** + * Record the statistics of flow control events. + * + *

This class is populated by FlowController, which will record throttling events. Currently it + * only keeps the last flow control event, but it could be expanded to record more information in + * the future. The events can be used to dynamically adjust concurrency in the client. For example: + * + *

{@code
+ * // Increase flow control limits if there was throttling in the past 5 minutes and throttled time
+ * // was longer than 1 minute.
+ * while(true) {
+ *    FlowControlEvent event = flowControlEventStats.getLastFlowControlEvent();
+ *    if (event != null
+ *         && event.getTimestampMs() > System.currentMillis() - TimeUnit.MINUTES.toMillis(5)
+ *         && event.getThrottledTimeInMs() > TimeUnit.MINUTES.toMillis(1)) {
+ *      flowController.increaseThresholds(elementSteps, byteSteps);
+ *    }
+ *    Thread.sleep(TimeUnit.MINUTE.toMillis(10));
+ * }
+ * }
+ */ +@InternalApi("For google-cloud-java client use only") +public class FlowControlEventStats { + + private volatile FlowControlEvent lastFlowControlEvent; + + // We only need the last event to check if there was throttling in the past X minutes so this + // doesn't need to be super accurate. + void recordFlowControlEvent(FlowControlEvent event) { + if (lastFlowControlEvent == null || event.compareTo(lastFlowControlEvent) > 0) { + lastFlowControlEvent = event; + } + } + + public FlowControlEvent getLastFlowControlEvent() { + return lastFlowControlEvent; + } + + /** + * A flow control event. Record throttled time if {@link LimitExceededBehavior} is {@link + * LimitExceededBehavior#Block}, or the exception if the behavior is {@link + * LimitExceededBehavior#ThrowException}. + */ + public static class FlowControlEvent implements Comparable { + static FlowControlEvent createReserveDelayed(long throttledTimeInMs) { + return createReserveDelayed(System.currentTimeMillis(), throttledTimeInMs); + } + + static FlowControlEvent createReserveDenied(FlowControlException exception) { + return createReserveDenied(System.currentTimeMillis(), exception); + } + + /** Package-private for use in testing. */ + @VisibleForTesting + static FlowControlEvent createReserveDelayed(long timestampMs, long throttledTimeInMs) { + Preconditions.checkArgument(timestampMs > 0, "timestamp must be greater than 0"); + Preconditions.checkArgument(throttledTimeInMs > 0, "throttled time must be greater than 0"); + return new FlowControlEvent(timestampMs, throttledTimeInMs, null); + } + + /** Package-private for use in testing. */ + @VisibleForTesting + static FlowControlEvent createReserveDenied(long timestampMs, FlowControlException exception) { + Preconditions.checkArgument(timestampMs > 0, "timestamp must be greater than 0"); + Preconditions.checkNotNull( + exception, "FlowControlException can't be null when reserve is denied"); + return new FlowControlEvent(timestampMs, null, exception); + } + + private long timestampMs; + private Long throttledTimeMs; + private FlowControlException exception; + + private FlowControlEvent( + long timestampMs, + @Nullable Long throttledTimeMs, + @Nullable FlowControlException exception) { + this.timestampMs = timestampMs; + this.throttledTimeMs = throttledTimeMs; + this.exception = exception; + } + + public long getTimestampMs() { + return timestampMs; + } + + @Nullable + public FlowControlException getException() { + return exception; + } + + @Nullable + public Long getThrottledTime(TimeUnit timeUnit) { + return throttledTimeMs == null + ? null + : timeUnit.convert(throttledTimeMs, TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(FlowControlEvent otherEvent) { + return Long.compare(this.getTimestampMs(), otherEvent.getTimestampMs()); + } + } +} diff --git a/gax/src/main/java/com/google/api/gax/batching/FlowController.java b/gax/src/main/java/com/google/api/gax/batching/FlowController.java index a7b906c24..15a6cd8e1 100644 --- a/gax/src/main/java/com/google/api/gax/batching/FlowController.java +++ b/gax/src/main/java/com/google/api/gax/batching/FlowController.java @@ -31,7 +31,10 @@ import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; +import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; /** Provides flow control capability. */ @@ -149,6 +152,11 @@ public enum LimitExceededBehavior { private final LimitExceededBehavior limitExceededBehavior; private final Object updateLimitLock; + // Threshold to record throttling events. If reserve() takes longer than this threshold, it will + // be recorded as a throttling event. + private static final long RESERVE_FLOW_CONTROL_THRESHOLD_MS = 1; + private final FlowControlEventStats flowControlEventStats; + public FlowController(FlowControlSettings settings) { // When the FlowController is initialized with FlowControlSettings, flow control limits can't be // adjusted. min, current, max element count and request bytes are initialized with the max @@ -160,6 +168,7 @@ public FlowController(FlowControlSettings settings) { public FlowController(DynamicFlowControlSettings settings) { this.limitExceededBehavior = settings.getLimitExceededBehavior(); this.updateLimitLock = new Object(); + this.flowControlEventStats = new FlowControlEventStats(); switch (settings.getLimitExceededBehavior()) { case ThrowException: case Block: @@ -204,10 +213,15 @@ public void reserve(long elements, long bytes) throws FlowControlException { Preconditions.checkArgument(elements >= 0); Preconditions.checkArgument(bytes >= 0); + Stopwatch stopwatch = Stopwatch.createStarted(); if (outstandingElementCount != null) { if (!outstandingElementCount.acquire(elements)) { - throw new MaxOutstandingElementCountReachedException( - outstandingElementCount.getPermitLimit()); + MaxOutstandingElementCountReachedException exception = + new MaxOutstandingElementCountReachedException( + outstandingElementCount.getPermitLimit()); + flowControlEventStats.recordFlowControlEvent( + FlowControlEvent.createReserveDenied(exception)); + throw exception; } } @@ -218,9 +232,17 @@ public void reserve(long elements, long bytes) throws FlowControlException { if (outstandingElementCount != null) { outstandingElementCount.release(elements); } - throw new MaxOutstandingRequestBytesReachedException(outstandingByteCount.getPermitLimit()); + MaxOutstandingRequestBytesReachedException exception = + new MaxOutstandingRequestBytesReachedException(outstandingByteCount.getPermitLimit()); + flowControlEventStats.recordFlowControlEvent( + FlowControlEvent.createReserveDenied(exception)); + throw exception; } } + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + if (elapsed >= RESERVE_FLOW_CONTROL_THRESHOLD_MS) { + flowControlEventStats.recordFlowControlEvent(FlowControlEvent.createReserveDelayed(elapsed)); + } } public void release(long elements, long bytes) { @@ -333,4 +355,9 @@ public Long getCurrentElementCountLimit() { public Long getCurrentRequestBytesLimit() { return outstandingByteCount == null ? null : outstandingByteCount.getPermitLimit(); } + + @InternalApi("For google-cloud-java client use only") + public FlowControlEventStats getFlowControlEventStats() { + return flowControlEventStats; + } } diff --git a/gax/src/test/java/com/google/api/gax/batching/FlowControlEventStatsTest.java b/gax/src/test/java/com/google/api/gax/batching/FlowControlEventStatsTest.java new file mode 100644 index 000000000..5136f312a --- /dev/null +++ b/gax/src/test/java/com/google/api/gax/batching/FlowControlEventStatsTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2021 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.batching; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent; +import com.google.api.gax.batching.FlowController.MaxOutstandingRequestBytesReachedException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class FlowControlEventStatsTest { + + @Test + public void testCreateEvent() { + long timestamp = 12345, throttledTimeMs = 5000; + FlowControlEvent event = FlowControlEvent.createReserveDelayed(timestamp, throttledTimeMs); + assertEquals(event.getTimestampMs(), event.getTimestampMs()); + assertEquals(throttledTimeMs / 1000, event.getThrottledTime(TimeUnit.SECONDS).longValue()); + assertNull(event.getException()); + + MaxOutstandingRequestBytesReachedException exception = + new MaxOutstandingRequestBytesReachedException(100); + event = FlowControlEvent.createReserveDenied(timestamp, exception); + assertEquals(timestamp, event.getTimestampMs()); + assertNotNull(event.getException()); + assertEquals(exception, event.getException()); + assertNull(event.getThrottledTime(TimeUnit.MILLISECONDS)); + + try { + event = FlowControlEvent.createReserveDenied(null); + fail("FlowControlEvent did not throw exception"); + } catch (NullPointerException e) { + // expected, ignore + } + } + + @Test + public void testGetLastEvent() throws InterruptedException { + final FlowControlEventStats stats = new FlowControlEventStats(); + final long currentTime = System.currentTimeMillis(); + + List threads = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + final int timeElapsed = i; + Thread t = + new Thread() { + @Override + public void run() { + stats.recordFlowControlEvent( + FlowControlEvent.createReserveDelayed(currentTime + timeElapsed, timeElapsed)); + } + }; + threads.add(t); + t.start(); + } + + for (Thread t : threads) { + t.join(10); + } + + assertEquals(currentTime + 10, stats.getLastFlowControlEvent().getTimestampMs()); + assertEquals( + 10, stats.getLastFlowControlEvent().getThrottledTime(TimeUnit.MILLISECONDS).longValue()); + } +} diff --git a/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java b/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java index 8cd84b22b..b0bc72531 100644 --- a/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/FlowControllerTest.java @@ -29,8 +29,10 @@ */ package com.google.api.gax.batching; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -42,9 +44,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.junit.runner.RunWith; @@ -225,6 +229,9 @@ public void testReserveRelease_rejectedByElementCount() throws Exception { testRejectedReserveRelease( flowController, 10, 10, FlowController.MaxOutstandingElementCountReachedException.class); + assertNotNull(flowController.getFlowControlEventStats().getLastFlowControlEvent()); + assertThat(flowController.getFlowControlEventStats().getLastFlowControlEvent().getException()) + .isInstanceOf(FlowController.MaxOutstandingElementCountReachedException.class); } @Test @@ -252,6 +259,9 @@ public void testReserveRelease_rejectedByNumberOfBytes() throws Exception { testRejectedReserveRelease( flowController, 10, 10, FlowController.MaxOutstandingRequestBytesReachedException.class); + assertNotNull(flowController.getFlowControlEventStats().getLastFlowControlEvent()); + assertThat(flowController.getFlowControlEventStats().getLastFlowControlEvent().getException()) + .isInstanceOf(FlowController.MaxOutstandingRequestBytesReachedException.class); } @Test @@ -325,6 +335,7 @@ public void testConstructedByDynamicFlowControlSetting() { assertNull(flowController.getMinRequestBytesLimit()); assertNull(flowController.getCurrentRequestBytesLimit()); assertNull(flowController.getMaxRequestBytesLimit()); + assertNotNull(flowController.getFlowControlEventStats()); } @Test @@ -515,7 +526,7 @@ public void testConcurrentUpdateThresholds_blocking() throws Exception { testConcurrentUpdates( flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter); for (Thread t : reserveThreads) { - t.join(100); + t.join(200); } assertEquals(reserveThreads.size(), releasedCounter.get()); assertTrue(totalIncreased.get() > 0); @@ -548,7 +559,7 @@ public void testConcurrentUpdateThresholds_nonBlocking() throws Exception { testConcurrentUpdates( flowController, 100, 100, 100, totalIncreased, totalDecreased, releasedCounter); for (Thread t : reserveThreads) { - t.join(100); + t.join(200); } assertEquals(reserveThreads.size(), releasedCounter.get()); assertTrue(totalIncreased.get() > 0); @@ -655,6 +666,75 @@ public void run() { t.join(); } + @Test + public void testFlowControlBlockEventIsRecorded() throws Exception { + // Test when reserve is blocked for at least FlowController#RESERVE_FLOW_CONTROL_THRESHOLD_MS, + // FlowController will record the FlowControlEvent in FlowControlEventStats + final FlowController flowController = + new FlowController( + DynamicFlowControlSettings.newBuilder() + .setInitialOutstandingElementCount(5L) + .setMinOutstandingElementCount(1L) + .setMaxOutstandingElementCount(10L) + .setInitialOutstandingRequestBytes(5L) + .setMinOutstandingRequestBytes(1L) + .setMaxOutstandingRequestBytes(10L) + .setLimitExceededBehavior(LimitExceededBehavior.Block) + .build()); + Runnable runnable = + new Runnable() { + @Override + public void run() { + try { + flowController.reserve(1, 1); + } catch (FlowController.FlowControlException e) { + throw new AssertionError(e); + } + } + }; + // blocked by element. Reserve all 5 elements first, reserve in the runnable will be blocked + flowController.reserve(5, 1); + ExecutorService executor = Executors.newCachedThreadPool(); + Future finished1 = executor.submit(runnable); + try { + finished1.get(50, TimeUnit.MILLISECONDS); + fail("reserve should block"); + } catch (TimeoutException e) { + // expected + } + assertFalse(finished1.isDone()); + flowController.release(5, 1); + // After other elements are released, reserve in the runnable should go through. Since reserve + // was blocked for longer than 1 millisecond, FlowController should record this event in + // FlowControlEventStats. + finished1.get(50, TimeUnit.MILLISECONDS); + assertNotNull(flowController.getFlowControlEventStats().getLastFlowControlEvent()); + assertNotNull( + flowController + .getFlowControlEventStats() + .getLastFlowControlEvent() + .getThrottledTime(TimeUnit.MILLISECONDS)); + flowController.release(1, 1); + + // Similar to blocked by element, test blocking by bytes. + flowController.reserve(1, 5); + Future finished2 = executor.submit(runnable); + try { + finished2.get(50, TimeUnit.MILLISECONDS); + fail("reserve should block"); + } catch (TimeoutException e) { + // expected + } + assertFalse(finished2.isDone()); + long currentTime = System.currentTimeMillis(); + flowController.release(1, 5); + finished2.get(50, TimeUnit.MILLISECONDS); + assertNotNull(flowController.getFlowControlEventStats().getLastFlowControlEvent()); + // Make sure this newer event is recorded + assertThat(flowController.getFlowControlEventStats().getLastFlowControlEvent().getTimestampMs()) + .isAtLeast(currentTime); + } + private List testConcurrentUpdates( final FlowController flowController, final int increaseStepRange,