Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

feat: dynamic flow control p3: add FlowControllerEventStats #1332

Merged
merged 14 commits into from Apr 5, 2021
Expand Up @@ -293,9 +293,8 @@ public void close() throws InterruptedException {
}
}

/** Package-private for use in testing. */
@VisibleForTesting
FlowController getFlowController() {
@InternalApi("For google-cloud-java client use only")
public FlowController getFlowController() {
return flowController;
}

Expand Down
@@ -0,0 +1,122 @@
/*
* Copyright 2021 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import static com.google.api.gax.batching.FlowController.LimitExceededBehavior;

import com.google.api.core.InternalApi;
import com.google.api.gax.batching.FlowController.FlowControlException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/** Record the statistics of flow control events. */
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
@InternalApi("For google-cloud-java client use only")
public class FlowControlEventStats {

// Currently we're only interested in the most recent flow control event, but this class can be
// expanded to record other stats.
private FlowControlEvent lastFlowControlEvent;

public synchronized void recordFlowControlEvent(FlowControlEvent event) {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
if (lastFlowControlEvent == null || event.compareTo(lastFlowControlEvent) > 0) {
lastFlowControlEvent = event;
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}

public FlowControlEvent getLastFlowControlEvent() {
return lastFlowControlEvent;
}

/**
* A flow control event. Record throttled time if {@link LimitExceededBehavior} is {@link
* LimitExceededBehavior#Block}, or the exception if the behavior is {@link
* LimitExceededBehavior#ThrowException}.
*/
public static final class FlowControlEvent implements Comparable<FlowControlEvent> {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
private long timestampMs;
private Long throttledTimeInMs;
private FlowControlException exception;
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

public static FlowControlEvent create(long throttledTimeInMs) {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
return create(System.currentTimeMillis(), throttledTimeInMs);
}

public static FlowControlEvent create(FlowControlException exception) {
return create(System.currentTimeMillis(), exception);
}

/** Package-private for use in testing. */
@VisibleForTesting
static FlowControlEvent create(long timestampMs, long throttledTimeInMs) {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
return new FlowControlEvent(timestampMs, throttledTimeInMs, null);
}

/** Package-private for use in testing. */
@VisibleForTesting
static FlowControlEvent create(long timestampMs, FlowControlException exception) {
return new FlowControlEvent(timestampMs, null, exception);
}

private FlowControlEvent(
long timestampMs,
@Nullable Long throttledTimeInMs,
@Nullable FlowControlException exception) {
Preconditions.checkArgument(
throttledTimeInMs != null || exception != null,
"a flow control event needs to have throttledTime or FlowControlException");
mutianf marked this conversation as resolved.
Show resolved Hide resolved
this.timestampMs = timestampMs;
this.throttledTimeInMs = throttledTimeInMs;
this.exception = exception;
}

public long getTimestampMs() {
return timestampMs;
}

@Nullable
public Long getThrottledTime(TimeUnit timeUnit) {
return throttledTimeInMs == null
? null
: timeUnit.convert(throttledTimeInMs, TimeUnit.MILLISECONDS);
}

@Nullable
public FlowControlException getException() {
return exception;
}

@Override
public int compareTo(FlowControlEvent o) {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
return Long.compare(this.timestampMs, o.timestampMs);
}
}
}
29 changes: 26 additions & 3 deletions gax/src/main/java/com/google/api/gax/batching/FlowController.java
Expand Up @@ -31,7 +31,10 @@

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/** Provides flow control capability. */
Expand Down Expand Up @@ -147,6 +150,9 @@ public enum LimitExceededBehavior {
@Nullable private final Long minElementCountLimit;
@Nullable private final Long minRequestBytesLimit;
private final LimitExceededBehavior limitExceededBehavior;
private final FlowControlEventStats flowControlEventStats;
// record the flow control event if it takes longer than throttledMs to reserve the permits
private final long throttledMs = 1;
mutianf marked this conversation as resolved.
Show resolved Hide resolved
private final Object updateLimitLock;

public FlowController(FlowControlSettings settings) {
Expand All @@ -160,6 +166,7 @@ public FlowController(FlowControlSettings settings) {
public FlowController(DynamicFlowControlSettings settings) {
this.limitExceededBehavior = settings.getLimitExceededBehavior();
this.updateLimitLock = new Object();
this.flowControlEventStats = new FlowControlEventStats();
switch (settings.getLimitExceededBehavior()) {
case ThrowException:
case Block:
Expand Down Expand Up @@ -204,10 +211,14 @@ public void reserve(long elements, long bytes) throws FlowControlException {
Preconditions.checkArgument(elements >= 0);
Preconditions.checkArgument(bytes >= 0);

Stopwatch stopwatch = Stopwatch.createStarted();
if (outstandingElementCount != null) {
if (!outstandingElementCount.acquire(elements)) {
throw new MaxOutstandingElementCountReachedException(
outstandingElementCount.getPermitLimit());
MaxOutstandingElementCountReachedException exception =
new MaxOutstandingElementCountReachedException(
outstandingElementCount.getPermitLimit());
flowControlEventStats.recordFlowControlEvent(FlowControlEvent.create(exception));
throw exception;
}
}

Expand All @@ -218,9 +229,16 @@ public void reserve(long elements, long bytes) throws FlowControlException {
if (outstandingElementCount != null) {
outstandingElementCount.release(elements);
}
throw new MaxOutstandingRequestBytesReachedException(outstandingByteCount.getPermitLimit());
MaxOutstandingRequestBytesReachedException exception =
new MaxOutstandingRequestBytesReachedException(outstandingByteCount.getPermitLimit());
flowControlEventStats.recordFlowControlEvent(FlowControlEvent.create(exception));
throw exception;
}
}
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
if (elapsed >= throttledMs) {
flowControlEventStats.recordFlowControlEvent(FlowControlEvent.create(elapsed));
}
}

public void release(long elements, long bytes) {
Expand Down Expand Up @@ -333,4 +351,9 @@ public Long getCurrentElementCountLimit() {
public Long getCurrentRequestBytesLimit() {
return outstandingByteCount == null ? null : outstandingByteCount.getPermitLimit();
}

@InternalApi("For google-cloud-java client use only")
public FlowControlEventStats getFlowControlEventStats() {
return flowControlEventStats;
}
}
@@ -0,0 +1,102 @@
/*
* Copyright 2021 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

import com.google.api.gax.batching.FlowControlEventStats.FlowControlEvent;
import com.google.api.gax.batching.FlowController.MaxOutstandingRequestBytesReachedException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class FlowControlEventStatsTest {

@Test
public void testCreateEvent() {
long timestamp = 12345, throttledTimeMs = 5000;
FlowControlEvent event = FlowControlEvent.create(timestamp, throttledTimeMs);
assertEquals(event.getTimestampMs(), event.getTimestampMs());
assertEquals(throttledTimeMs / 1000, event.getThrottledTime(TimeUnit.SECONDS).longValue());
assertNull(event.getException());

MaxOutstandingRequestBytesReachedException exception =
new MaxOutstandingRequestBytesReachedException(100);
event = FlowControlEvent.create(timestamp, exception);
assertEquals(timestamp, event.getTimestampMs());
assertNotNull(event.getException());
assertEquals(exception, event.getException());
assertNull(event.getThrottledTime(TimeUnit.MILLISECONDS));

try {
event = FlowControlEvent.create(null);
fail("FlowControlEvent did not throw exception");
} catch (IllegalArgumentException e) {
// expected, ignore
}
}

@Test
public void testGetLastEvent() throws InterruptedException {
final FlowControlEventStats stats = new FlowControlEventStats();
final long currentTime = System.currentTimeMillis();

List<Thread> threads = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
final int timeElapsed = i;
Thread t =
new Thread(
new Runnable() {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void run() {
stats.recordFlowControlEvent(
FlowControlEvent.create(currentTime + timeElapsed, timeElapsed));
}
});
threads.add(t);
t.start();
}

for (Thread t : threads) {
t.join(10);
}

assertEquals(currentTime + 100, stats.getLastFlowControlEvent().getTimestampMs());
assertEquals(
100, stats.getLastFlowControlEvent().getThrottledTime(TimeUnit.MILLISECONDS).longValue());
}
}