Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

feat: dynamic flow control part 1 - add FlowController to Batcher #1289

Merged
merged 11 commits into from Feb 19, 2021
87 changes: 81 additions & 6 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Expand Up @@ -37,6 +37,9 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController.FlowControlException;
import com.google.api.gax.batching.FlowController.FlowControlRuntimeException;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -55,6 +58,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* Queues up the elements until {@link #flush()} is called; once batching is over, returned future
Expand Down Expand Up @@ -87,13 +91,14 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final Future<?> scheduledFuture;
private volatile boolean isClosed = false;
private final BatcherStats batcherStats = new BatcherStats();
private final FlowController flowController;

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response.
* @param unaryCallable a {@link UnaryCallable} object.
* @param prototype a {@link RequestT} object.
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds.
* into wrappers request and response
* @param unaryCallable a {@link UnaryCallable} object
* @param prototype a {@link RequestT} object
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
*/
public BatcherImpl(
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
Expand All @@ -102,15 +107,56 @@ public BatcherImpl(
BatchingSettings batchingSettings,
ScheduledExecutorService executor) {

this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null);
}

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response
* @param unaryCallable a {@link UnaryCallable} object
* @param prototype a {@link RequestT} object
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
* @param flowController a {@link FlowController} for throttling requests. If it's null, create a
* {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}.
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
*/
public BatcherImpl(
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
UnaryCallable<RequestT, ResponseT> unaryCallable,
RequestT prototype,
BatchingSettings batchingSettings,
ScheduledExecutorService executor,
@Nullable FlowController flowController) {

this.batchingDescriptor =
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor cannot be null");
this.unaryCallable = Preconditions.checkNotNull(unaryCallable, "callable cannot be null");
this.prototype = Preconditions.checkNotNull(prototype, "request prototype cannot be null");
this.batchingSettings =
Preconditions.checkNotNull(batchingSettings, "batching setting cannot be null");
Preconditions.checkNotNull(executor, "executor cannot be null");
if (flowController == null) {
flowController = new FlowController(batchingSettings.getFlowControlSettings());
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}
// If throttling is enabled, make sure flow control limits are greater or equal to batch sizes
// to avoid deadlocking
if (flowController.getLimitExceededBehavior() != LimitExceededBehavior.Ignore) {
Preconditions.checkArgument(
flowController.getMaxOutstandingElementCount() == null
|| batchingSettings.getElementCountThreshold() == null
|| flowController.getMaxOutstandingElementCount()
>= batchingSettings.getElementCountThreshold(),
"If throttling and batching on element count are enabled, FlowController"
+ "#maxOutstandingElementCount must be greater or equal to elementCountThreshold");
Preconditions.checkArgument(
flowController.getMaxOutstandingRequestBytes() == null
|| batchingSettings.getRequestByteThreshold() == null
|| flowController.getMaxOutstandingRequestBytes()
>= batchingSettings.getRequestByteThreshold(),
"If throttling and batching on request bytes are enabled, FlowController"
+ "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these checks be moved to FlowControlSettings#build()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the checks to make sure flow control limits are >= batch sizes so it won't deadlock. I also added the checks in BatchingSettings. In case where BatcherImpl is constructed with a FlowController, we'll need to validate the settings here also. (and FlowControlSettings doesn't have any information about batch settings)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, makes sense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check my comment about construction BatcherImpl directly via FlowController that looks really suspicious, and should be avoided. If we avoid it, we can also get rid of this check. The fact that we need this check here is a one more indicator that flowControlelr should not be passed directly to BatcherImpl.

this.flowController = flowController;
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);

if (batchingSettings.getDelayThreshold() != null) {
long delay = batchingSettings.getDelayThreshold().toMillis();
PushCurrentBatchRunnable<ElementT, ElementResultT, RequestT, ResponseT> runnable =
Expand All @@ -127,8 +173,29 @@ public BatcherImpl(
@Override
public ApiFuture<ElementResultT> add(ElementT element) {
Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher");
SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
// This is not the optimal way of throttling. It does not send out partial batches, which
// means that the Batcher might not use up all the resources allowed by FlowController.
// The more efficient implementation should look like:
// if (!flowController.tryReserve(1, bytes)) {
// sendOutstanding();
// reserve(1, bytes);
// }
// where tryReserve() will return false if there isn't enough resources, or reserve and return
// true.
// However, with the current FlowController implementation, adding a tryReserve() could be
// confusing. FlowController will end up having 3 different reserve behaviors: blocking,
// non blocking and try reserve. And we'll also need to add a tryAcquire() to the Semaphore64
// class, which made it seem unnecessary to have blocking and non-blocking semaphore
// implementations. Some refactoring may be needed for the optimized implementation. So we'll
// defer it till we decide on if refactoring FlowController is necessary.
try {
flowController.reserve(1, batchingDescriptor.countBytes(element));
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
} catch (FlowControlException e) {
// This exception will only be thrown if the FlowController is set to ThrowException behavior
throw FlowControlRuntimeException.fromFlowControlException(e);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}

SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
synchronized (elementLock) {
currentOpenBatch.add(element, result);
}
Expand Down Expand Up @@ -169,6 +236,7 @@ public void sendOutstanding() {
@Override
public void onSuccess(ResponseT response) {
try {
flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter);
accumulatedBatch.onBatchSuccess(response);
} finally {
onBatchCompletion();
Expand All @@ -178,6 +246,7 @@ public void onSuccess(ResponseT response) {
@Override
public void onFailure(Throwable throwable) {
try {
flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter);
accumulatedBatch.onBatchFailure(throwable);
} finally {
onBatchCompletion();
Expand Down Expand Up @@ -224,6 +293,12 @@ public void close() throws InterruptedException {
}
}

/** Package-private for use in testing. */
@VisibleForTesting
FlowController getFlowController() {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
return flowController;
}

/**
* This class represent one logical Batch. It accumulates all the elements and their corresponding
* future results for one batch.
Expand Down
Expand Up @@ -30,6 +30,7 @@
package com.google.api.gax.batching;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -143,9 +144,10 @@ public enum LimitExceededBehavior {
@Nullable private final Semaphore64 outstandingByteCount;
@Nullable private final Long maxOutstandingElementCount;
@Nullable private final Long maxOutstandingRequestBytes;
private final LimitExceededBehavior limitExceededBehavior;

public FlowController(FlowControlSettings settings) {
boolean failOnLimits;
this.limitExceededBehavior = settings.getLimitExceededBehavior();
switch (settings.getLimitExceededBehavior()) {
case ThrowException:
case Block:
Expand Down Expand Up @@ -216,4 +218,20 @@ public void release(long elements, long bytes) {
outstandingByteCount.release(permitsToReturn);
}
}

LimitExceededBehavior getLimitExceededBehavior() {
return limitExceededBehavior;
}

@InternalApi("For internal use by google-cloud-java clients only")
@Nullable
public Long getMaxOutstandingElementCount() {
return maxOutstandingElementCount;
}

@InternalApi("For internal use by google-cloud-java clients only")
@Nullable
public Long getMaxOutstandingRequestBytes() {
return maxOutstandingRequestBytes;
}
}