Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

feat: dynamic flow control part 1 - add FlowController to Batcher #1289

Merged
merged 11 commits into from Feb 19, 2021
75 changes: 73 additions & 2 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Expand Up @@ -37,6 +37,8 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController.FlowControlException;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -55,6 +57,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* Queues up the elements until {@link #flush()} is called; once batching is over, returned future
Expand Down Expand Up @@ -87,6 +90,7 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final Future<?> scheduledFuture;
private volatile boolean isClosed = false;
private final BatcherStats batcherStats = new BatcherStats();
private final FlowController flowController;

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
Expand All @@ -102,15 +106,55 @@ public BatcherImpl(
BatchingSettings batchingSettings,
ScheduledExecutorService executor) {

this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null);
}

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response.
* @param unaryCallable a {@link UnaryCallable} object.
mutianf marked this conversation as resolved.
Show resolved Hide resolved
* @param prototype a {@link RequestT} object.
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds.
* @param flowControllerToUse a {@link FlowController} for throttling requests. If it's not set,
* create a {@link FlowController} object from {@link
* BatchingSettings#getFlowControlSettings()}.
*/
public BatcherImpl(
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
UnaryCallable<RequestT, ResponseT> unaryCallable,
RequestT prototype,
BatchingSettings batchingSettings,
ScheduledExecutorService executor,
@Nullable FlowController flowControllerToUse) {
mutianf marked this conversation as resolved.
Show resolved Hide resolved

this.batchingDescriptor =
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor cannot be null");
this.unaryCallable = Preconditions.checkNotNull(unaryCallable, "callable cannot be null");
this.prototype = Preconditions.checkNotNull(prototype, "request prototype cannot be null");
this.batchingSettings =
Preconditions.checkNotNull(batchingSettings, "batching setting cannot be null");
Preconditions.checkNotNull(executor, "executor cannot be null");
if (flowControllerToUse == null) {
flowControllerToUse = new FlowController(batchingSettings.getFlowControlSettings());
}
// If throttling is enabled, make sure flow control limits are greater or equal to batch sizes
// to avoid deadlocking
if (flowControllerToUse.getLimitExceededBehavior() != LimitExceededBehavior.Ignore) {
Preconditions.checkArgument(
flowControllerToUse.getMaxOutstandingElementCount() == null
|| batchingSettings.getElementCountThreshold() == null
|| flowControllerToUse.getMaxOutstandingElementCount()
>= batchingSettings.getElementCountThreshold(),
"if throttling and batching on element count are enabled, FlowController#maxOutstandingElementCount must be greater or equal to elementCountThreshold");
Preconditions.checkArgument(
flowControllerToUse.getMaxOutstandingRequestBytes() == null
|| batchingSettings.getRequestByteThreshold() == null
|| flowControllerToUse.getMaxOutstandingRequestBytes()
>= batchingSettings.getRequestByteThreshold(),
"if throttling and batching on request bytes are enabled, FlowController#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these checks be moved to FlowControlSettings#build()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the checks to make sure flow control limits are >= batch sizes so it won't deadlock. I also added the checks in BatchingSettings. In case where BatcherImpl is constructed with a FlowController, we'll need to validate the settings here also. (and FlowControlSettings doesn't have any information about batch settings)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, makes sense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check my comment about construction BatcherImpl directly via FlowController that looks really suspicious, and should be avoided. If we avoid it, we can also get rid of this check. The fact that we need this check here is a one more indicator that flowControlelr should not be passed directly to BatcherImpl.

this.flowController = flowControllerToUse;
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);

if (batchingSettings.getDelayThreshold() != null) {
long delay = batchingSettings.getDelayThreshold().toMillis();
PushCurrentBatchRunnable<ElementT, ElementResultT, RequestT, ResponseT> runnable =
Expand All @@ -127,8 +171,29 @@ public BatcherImpl(
@Override
public ApiFuture<ElementResultT> add(ElementT element) {
Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher");
SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
// This is not the optimal way of throttling. It does not send out partial batches, which
// means that the Batcher might not use up all the resources allowed by FlowController.
// The more efficient implementation should look like:
// if (!flowController.tryReserve(1, bytes)) {
// sendOutstanding();
// reserve(1, bytes);
// }
// where tryReserve() will return false if there isn't enough resources, or reserve and return
// true.
// However, with the current FlowController implementation, adding a tryReserve() could be
// confusing. FlowController will end up having 3 different reserve behaviors: blocking,
// non blocking and try reserve. And we'll also need to add a tryAcquire() to the Semaphore64
// class, which makes it seemed unnecessary to have blocking and non-blocking semaphore
mutianf marked this conversation as resolved.
Show resolved Hide resolved
// implementations. Some refactoring may be needed for the optimized implementation. So we'll
// defer it till we decide on if refactoring FlowController is necessary.
try {
flowController.reserve(1, batchingDescriptor.countBytes(element));
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
} catch (FlowControlException e) {
// This exception will only be thrown if the FlowController is set to ThrowException behavior
throw new RuntimeException(e);
}

SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
synchronized (elementLock) {
currentOpenBatch.add(element, result);
}
Expand Down Expand Up @@ -169,6 +234,7 @@ public void sendOutstanding() {
@Override
public void onSuccess(ResponseT response) {
try {
flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter);
accumulatedBatch.onBatchSuccess(response);
} finally {
onBatchCompletion();
Expand All @@ -178,6 +244,7 @@ public void onSuccess(ResponseT response) {
@Override
public void onFailure(Throwable throwable) {
try {
flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter);
accumulatedBatch.onBatchFailure(throwable);
} finally {
onBatchCompletion();
Expand Down Expand Up @@ -224,6 +291,10 @@ public void close() throws InterruptedException {
}
}

FlowController getFlowController() {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
return flowController;
}

/**
* This class represent one logical Batch. It accumulates all the elements and their corresponding
* future results for one batch.
Expand Down
Expand Up @@ -174,6 +174,21 @@ public BatchingSettings build() {
settings.getDelayThreshold() == null
|| settings.getDelayThreshold().compareTo(Duration.ZERO) > 0,
"delayThreshold must be either unset or positive");
if (settings.getFlowControlSettings().getLimitExceededBehavior()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regardless of how the flowController vs flowControllerSettings issue gets resolved, please make sure that this complex logic exists in only one place (now there are two: here and in BatcherImpl constructor) and reused if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the checks in BatchingSettings and only kept them in BatchImpl. I guess it really depends on the Batcher how these settings are used, so it makes more sense to enforce the checks there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable

!= LimitExceededBehavior.Ignore) {
Preconditions.checkArgument(
settings.getFlowControlSettings().getMaxOutstandingElementCount() == null
|| settings.getElementCountThreshold() == null
|| settings.getFlowControlSettings().getMaxOutstandingElementCount()
>= settings.getElementCountThreshold(),
"if throttling and batching on element count are enabled, FlowController#maxOutstandingElementCount must be greater or equal to elementCountThreshold");
Preconditions.checkArgument(
settings.getFlowControlSettings().getMaxOutstandingRequestBytes() == null
|| settings.getRequestByteThreshold() == null
|| settings.getFlowControlSettings().getMaxOutstandingRequestBytes()
>= settings.getRequestByteThreshold(),
"if throttling and batching on request bytes are enabled, FlowController#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should those error messages start with capital letter? Also, please split the error sting into mutiple lines, as this one is 160+ characters long and exceeds the formatting limits.

}
return settings;
}
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
package com.google.api.gax.batching;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -143,9 +144,10 @@ public enum LimitExceededBehavior {
@Nullable private final Semaphore64 outstandingByteCount;
@Nullable private final Long maxOutstandingElementCount;
@Nullable private final Long maxOutstandingRequestBytes;
private final LimitExceededBehavior limitExceededBehavior;

public FlowController(FlowControlSettings settings) {
boolean failOnLimits;
this.limitExceededBehavior = settings.getLimitExceededBehavior();
switch (settings.getLimitExceededBehavior()) {
case ThrowException:
case Block:
Expand Down Expand Up @@ -216,4 +218,20 @@ public void release(long elements, long bytes) {
outstandingByteCount.release(permitsToReturn);
}
}

LimitExceededBehavior getLimitExceededBehavior() {
return limitExceededBehavior;
}

@InternalApi("For internal use by google-cloud-java clients only")
@Nullable
public Long getMaxOutstandingElementCount() {
return maxOutstandingElementCount;
}

@InternalApi("For internal use by google-cloud-java clients only")
@Nullable
public Long getMaxOutstandingRequestBytes() {
return maxOutstandingRequestBytes;
}
}
129 changes: 129 additions & 0 deletions gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java
Expand Up @@ -38,6 +38,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatcherImpl.BatcherReference;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList;
Expand Down Expand Up @@ -711,6 +712,134 @@ public void run() {
}
}

@Test
public void testConstructors() throws InterruptedException {
BatcherImpl batcher1 =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2,
callLabeledIntSquarer,
labeledIntList,
batchingSettings,
EXECUTOR);
try {
assertThat(batcher1.getFlowController()).isNotNull();
assertThat(batcher1.getFlowController().getLimitExceededBehavior())
.isEqualTo(batchingSettings.getFlowControlSettings().getLimitExceededBehavior());
assertThat(batcher1.getFlowController().getMaxOutstandingElementCount())
.isEqualTo(batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount());
assertThat(batcher1.getFlowController().getMaxOutstandingRequestBytes())
.isEqualTo(batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes());
} finally {
batcher1.close();
}

FlowController flowController =
new FlowController(
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException.ThrowException)
.setMaxOutstandingRequestBytes(6000L)
.build());
BatcherImpl batcher2 =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2,
callLabeledIntSquarer,
labeledIntList,
batchingSettings,
EXECUTOR,
flowController);
try {
assertThat(batcher2.getFlowController()).isSameInstanceAs(flowController);
} finally {
batcher2.close();
mutianf marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Test
public void testThrottlingBlocking() throws Exception {
BatchingSettings settings =
BatchingSettings.newBuilder()
.setElementCountThreshold(1L)
.setRequestByteThreshold(1L)
.build();
FlowController flowController =
new FlowController(
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(LimitExceededBehavior.Block)
.setMaxOutstandingElementCount(1L)
.build());
final Batcher<Integer, Integer> batcher =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2,
callLabeledIntSquarer,
labeledIntList,
settings,
EXECUTOR,
flowController);
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
flowController.reserve(1, 1);
Future future =
executor.submit(
new Runnable() {
@Override
public void run() {
batcher.add(1);
}
});
try {
future.get(100, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If TimeoutException is expected, can we make this shorter (i.e. since we are guaranteed to wait, lets wait as little as possible, otherwise it may quickly lead to really long-running unit tests, which should not happen). This comments/question applies to all timed get()s in this test.

assertWithMessage("adding elements to batcher should be blocked by FlowControlled").fail();
} catch (TimeoutException e) {
// expected
}
flowController.release(1, 1);
try {
future.get(100, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
assertWithMessage("adding elements to batcher should not be blocked").fail();
}
} finally {
batcher.close();
mutianf marked this conversation as resolved.
Show resolved Hide resolved
executor.shutdownNow();
}
}

@Test
public void testThrottlingNonBlocking() throws Exception {
BatchingSettings settings =
BatchingSettings.newBuilder()
.setElementCountThreshold(1L)
.setRequestByteThreshold(1L)
.build();
FlowController flowController =
new FlowController(
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException)
.setMaxOutstandingElementCount(1L)
.build());
final Batcher<Integer, Integer> batcher =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2,
callLabeledIntSquarer,
labeledIntList,
settings,
EXECUTOR,
flowController);
try {
flowController.reserve(1, 1);
try {
batcher.add(1);
assertWithMessage("Should throw exception because it exceeded FlowController limit").fail();
} catch (Exception e) {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
assertThat(e.getMessage()).contains("The maximum number of batch elements");
}
flowController.release(1, 1);
batcher.add(1);
} finally {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
batcher.close();
}
}

private void testElementTriggers(BatchingSettings settings) throws Exception {
underTest =
new BatcherImpl<>(
Expand Down