diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 347804882..228e56fd1 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -37,6 +37,9 @@ import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.batching.FlowController.FlowControlException; +import com.google.api.gax.batching.FlowController.FlowControlRuntimeException; +import com.google.api.gax.batching.FlowController.LimitExceededBehavior; import com.google.api.gax.rpc.UnaryCallable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -55,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** * Queues up the elements until {@link #flush()} is called; once batching is over, returned future @@ -87,13 +91,14 @@ public class BatcherImpl private final Future scheduledFuture; private volatile boolean isClosed = false; private final BatcherStats batcherStats = new BatcherStats(); + private final FlowController flowController; /** * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements - * into wrappers request and response. - * @param unaryCallable a {@link UnaryCallable} object. - * @param prototype a {@link RequestT} object. - * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds. + * into wrappers request and response + * @param unaryCallable a {@link UnaryCallable} object + * @param prototype a {@link RequestT} object + * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds */ public BatcherImpl( BatchingDescriptor batchingDescriptor, @@ -102,6 +107,26 @@ public BatcherImpl( BatchingSettings batchingSettings, ScheduledExecutorService executor) { + this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null); + } + + /** + * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements + * into wrappers request and response + * @param unaryCallable a {@link UnaryCallable} object + * @param prototype a {@link RequestT} object + * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds + * @param flowController a {@link FlowController} for throttling requests. If it's null, create a + * {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}. + */ + public BatcherImpl( + BatchingDescriptor batchingDescriptor, + UnaryCallable unaryCallable, + RequestT prototype, + BatchingSettings batchingSettings, + ScheduledExecutorService executor, + @Nullable FlowController flowController) { + this.batchingDescriptor = Preconditions.checkNotNull(batchingDescriptor, "batching descriptor cannot be null"); this.unaryCallable = Preconditions.checkNotNull(unaryCallable, "callable cannot be null"); @@ -109,8 +134,29 @@ public BatcherImpl( this.batchingSettings = Preconditions.checkNotNull(batchingSettings, "batching setting cannot be null"); Preconditions.checkNotNull(executor, "executor cannot be null"); + if (flowController == null) { + flowController = new FlowController(batchingSettings.getFlowControlSettings()); + } + // If throttling is enabled, make sure flow control limits are greater or equal to batch sizes + // to avoid deadlocking + if (flowController.getLimitExceededBehavior() != LimitExceededBehavior.Ignore) { + Preconditions.checkArgument( + flowController.getMaxOutstandingElementCount() == null + || batchingSettings.getElementCountThreshold() == null + || flowController.getMaxOutstandingElementCount() + >= batchingSettings.getElementCountThreshold(), + "If throttling and batching on element count are enabled, FlowController" + + "#maxOutstandingElementCount must be greater or equal to elementCountThreshold"); + Preconditions.checkArgument( + flowController.getMaxOutstandingRequestBytes() == null + || batchingSettings.getRequestByteThreshold() == null + || flowController.getMaxOutstandingRequestBytes() + >= batchingSettings.getRequestByteThreshold(), + "If throttling and batching on request bytes are enabled, FlowController" + + "#maxOutstandingRequestBytes must be greater or equal to requestByteThreshold"); + } + this.flowController = flowController; currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); - if (batchingSettings.getDelayThreshold() != null) { long delay = batchingSettings.getDelayThreshold().toMillis(); PushCurrentBatchRunnable runnable = @@ -127,8 +173,29 @@ public BatcherImpl( @Override public ApiFuture add(ElementT element) { Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher"); - SettableApiFuture result = SettableApiFuture.create(); + // This is not the optimal way of throttling. It does not send out partial batches, which + // means that the Batcher might not use up all the resources allowed by FlowController. + // The more efficient implementation should look like: + // if (!flowController.tryReserve(1, bytes)) { + // sendOutstanding(); + // reserve(1, bytes); + // } + // where tryReserve() will return false if there isn't enough resources, or reserve and return + // true. + // However, with the current FlowController implementation, adding a tryReserve() could be + // confusing. FlowController will end up having 3 different reserve behaviors: blocking, + // non blocking and try reserve. And we'll also need to add a tryAcquire() to the Semaphore64 + // class, which made it seem unnecessary to have blocking and non-blocking semaphore + // implementations. Some refactoring may be needed for the optimized implementation. So we'll + // defer it till we decide on if refactoring FlowController is necessary. + try { + flowController.reserve(1, batchingDescriptor.countBytes(element)); + } catch (FlowControlException e) { + // This exception will only be thrown if the FlowController is set to ThrowException behavior + throw FlowControlRuntimeException.fromFlowControlException(e); + } + SettableApiFuture result = SettableApiFuture.create(); synchronized (elementLock) { currentOpenBatch.add(element, result); } @@ -169,6 +236,7 @@ public void sendOutstanding() { @Override public void onSuccess(ResponseT response) { try { + flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter); accumulatedBatch.onBatchSuccess(response); } finally { onBatchCompletion(); @@ -178,6 +246,7 @@ public void onSuccess(ResponseT response) { @Override public void onFailure(Throwable throwable) { try { + flowController.release(accumulatedBatch.elementCounter, accumulatedBatch.byteCounter); accumulatedBatch.onBatchFailure(throwable); } finally { onBatchCompletion(); @@ -224,6 +293,12 @@ public void close() throws InterruptedException { } } + /** Package-private for use in testing. */ + @VisibleForTesting + FlowController getFlowController() { + return flowController; + } + /** * This class represent one logical Batch. It accumulates all the elements and their corresponding * future results for one batch. diff --git a/gax/src/main/java/com/google/api/gax/batching/FlowController.java b/gax/src/main/java/com/google/api/gax/batching/FlowController.java index 9921b9bfc..75c1379c0 100644 --- a/gax/src/main/java/com/google/api/gax/batching/FlowController.java +++ b/gax/src/main/java/com/google/api/gax/batching/FlowController.java @@ -30,6 +30,7 @@ package com.google.api.gax.batching; import com.google.api.core.BetaApi; +import com.google.api.core.InternalApi; import com.google.common.base.Preconditions; import javax.annotation.Nullable; @@ -143,9 +144,10 @@ public enum LimitExceededBehavior { @Nullable private final Semaphore64 outstandingByteCount; @Nullable private final Long maxOutstandingElementCount; @Nullable private final Long maxOutstandingRequestBytes; + private final LimitExceededBehavior limitExceededBehavior; public FlowController(FlowControlSettings settings) { - boolean failOnLimits; + this.limitExceededBehavior = settings.getLimitExceededBehavior(); switch (settings.getLimitExceededBehavior()) { case ThrowException: case Block: @@ -216,4 +218,20 @@ public void release(long elements, long bytes) { outstandingByteCount.release(permitsToReturn); } } + + LimitExceededBehavior getLimitExceededBehavior() { + return limitExceededBehavior; + } + + @InternalApi("For internal use by google-cloud-java clients only") + @Nullable + public Long getMaxOutstandingElementCount() { + return maxOutstandingElementCount; + } + + @InternalApi("For internal use by google-cloud-java clients only") + @Nullable + public Long getMaxOutstandingRequestBytes() { + return maxOutstandingRequestBytes; + } } diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 0c2dfc4ab..bb6cae532 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -38,6 +38,8 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.BatcherImpl.BatcherReference; +import com.google.api.gax.batching.FlowController.FlowControlRuntimeException; +import com.google.api.gax.batching.FlowController.LimitExceededBehavior; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList; @@ -102,13 +104,7 @@ public static void tearDownExecutor() throws InterruptedException { /** The accumulated results in the test are resolved when {@link Batcher#flush()} is called. */ @Test public void testResultsAreResolvedAfterFlush() throws Exception { - underTest = - new BatcherImpl<>( - SQUARER_BATCHING_DESC_V2, - callLabeledIntSquarer, - labeledIntList, - batchingSettings, - EXECUTOR); + underTest = createDefaultBatcherImpl(batchingSettings, null); Future result = underTest.add(4); assertThat(result.isDone()).isFalse(); underTest.flush(); @@ -152,13 +148,7 @@ public ApiFuture> futureCall(LabeledIntList request) { @Test public void testWhenBatcherIsClose() throws Exception { Future result; - try (Batcher batcher = - new BatcherImpl<>( - SQUARER_BATCHING_DESC_V2, - callLabeledIntSquarer, - labeledIntList, - batchingSettings, - EXECUTOR)) { + try (Batcher batcher = createDefaultBatcherImpl(batchingSettings, null)) { result = batcher.add(5); } assertThat(result.isDone()).isTrue(); @@ -168,13 +158,7 @@ public void testWhenBatcherIsClose() throws Exception { /** Validates exception when batch is called after {@link Batcher#close()}. */ @Test public void testNoElementAdditionAfterClose() throws Exception { - underTest = - new BatcherImpl<>( - SQUARER_BATCHING_DESC_V2, - callLabeledIntSquarer, - labeledIntList, - batchingSettings, - EXECUTOR); + underTest = createDefaultBatcherImpl(batchingSettings, null); underTest.close(); Throwable addOnClosedError = null; try { @@ -328,9 +312,7 @@ public void testWhenThresholdIsDisabled() throws Exception { .setRequestByteThreshold(null) .setDelayThreshold(null) .build(); - underTest = - new BatcherImpl<>( - SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings, EXECUTOR); + underTest = createDefaultBatcherImpl(settings, null); Future result = underTest.add(2); assertThat(result.isDone()).isTrue(); assertThat(result.get()).isEqualTo(4); @@ -340,9 +322,7 @@ public void testWhenThresholdIsDisabled() throws Exception { public void testWhenDelayThresholdExceeds() throws Exception { BatchingSettings settings = batchingSettings.toBuilder().setDelayThreshold(Duration.ofMillis(100)).build(); - underTest = - new BatcherImpl<>( - SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings, EXECUTOR); + underTest = createDefaultBatcherImpl(settings, null); Future result = underTest.add(6); assertThat(result.isDone()).isFalse(); assertThat(result.get()).isEqualTo(36); @@ -418,8 +398,7 @@ public void testPushCurrentBatchRunnable() throws Exception { BatchingSettings settings = batchingSettings.toBuilder().setDelayThreshold(Duration.ofMillis(DELAY_TIME)).build(); BatcherImpl> batcher = - new BatcherImpl<>( - SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings, EXECUTOR); + createDefaultBatcherImpl(settings, null); BatcherImpl.PushCurrentBatchRunnable> pushBatchRunnable = new BatcherImpl.PushCurrentBatchRunnable<>(batcher); @@ -582,20 +561,8 @@ public void testUnclosedBatchersAreLogged() throws Exception { Thread.sleep(DELAY_TIME * (1L << retry)); } assertThat(actualRemaining).isAtMost(0); - underTest = - new BatcherImpl<>( - SQUARER_BATCHING_DESC_V2, - callLabeledIntSquarer, - labeledIntList, - batchingSettings, - EXECUTOR); - Batcher extraBatcher = - new BatcherImpl<>( - SQUARER_BATCHING_DESC_V2, - callLabeledIntSquarer, - labeledIntList, - batchingSettings, - EXECUTOR); + underTest = createDefaultBatcherImpl(batchingSettings, null); + Batcher extraBatcher = createDefaultBatcherImpl(batchingSettings, null); // Try to capture the log output but without causing terminal noise. Adding the filter must // be done before clearing the ref or else it might be missed. @@ -711,10 +678,100 @@ public void run() { } } + @Test + public void testConstructors() throws InterruptedException { + try (BatcherImpl batcher1 = createDefaultBatcherImpl(batchingSettings, null)) { + assertThat(batcher1.getFlowController()).isNotNull(); + assertThat(batcher1.getFlowController().getLimitExceededBehavior()) + .isEqualTo(batchingSettings.getFlowControlSettings().getLimitExceededBehavior()); + assertThat(batcher1.getFlowController().getMaxOutstandingElementCount()) + .isEqualTo(batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount()); + assertThat(batcher1.getFlowController().getMaxOutstandingRequestBytes()) + .isEqualTo(batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes()); + } + + FlowController flowController = + new FlowController( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException.ThrowException) + .setMaxOutstandingRequestBytes(6000L) + .build()); + try (BatcherImpl batcher2 = createDefaultBatcherImpl(batchingSettings, flowController)) { + assertThat(batcher2.getFlowController()).isSameInstanceAs(flowController); + } + } + + @Test + public void testThrottlingBlocking() throws Exception { + BatchingSettings settings = + BatchingSettings.newBuilder() + .setElementCountThreshold(1L) + .setRequestByteThreshold(1L) + .build(); + FlowController flowController = + new FlowController( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(LimitExceededBehavior.Block) + .setMaxOutstandingElementCount(1L) + .build()); + ExecutorService executor = Executors.newSingleThreadExecutor(); + try (final Batcher batcher = + createDefaultBatcherImpl(settings, flowController)) { + flowController.reserve(1, 1); + Future future = + executor.submit( + new Runnable() { + @Override + public void run() { + batcher.add(1); + } + }); + try { + future.get(10, TimeUnit.MILLISECONDS); + assertWithMessage("adding elements to batcher should be blocked by FlowControlled").fail(); + } catch (TimeoutException e) { + // expected + } + flowController.release(1, 1); + try { + future.get(100, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + assertWithMessage("adding elements to batcher should not be blocked").fail(); + } + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testThrottlingNonBlocking() throws Exception { + BatchingSettings settings = + BatchingSettings.newBuilder() + .setElementCountThreshold(1L) + .setRequestByteThreshold(1L) + .build(); + FlowController flowController = + new FlowController( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) + .setMaxOutstandingElementCount(1L) + .build()); + try (final Batcher batcher = + createDefaultBatcherImpl(settings, flowController)) { + flowController.reserve(1, 1); + try { + batcher.add(1); + assertWithMessage("Should throw exception because it exceeded FlowController limit").fail(); + } catch (FlowControlRuntimeException e) { + assertThat(e.getMessage()).contains("The maximum number of batch elements"); + } + flowController.release(1, 1); + batcher.add(1); + } + } + private void testElementTriggers(BatchingSettings settings) throws Exception { - underTest = - new BatcherImpl<>( - SQUARER_BATCHING_DESC_V2, callLabeledIntSquarer, labeledIntList, settings, EXECUTOR); + underTest = createDefaultBatcherImpl(settings, null); Future result = underTest.add(4); assertThat(result.isDone()).isFalse(); // After this element is added, the batch triggers sendOutstanding(). @@ -724,4 +781,15 @@ private void testElementTriggers(BatchingSettings settings) throws Exception { assertThat(result.get()).isEqualTo(16); assertThat(anotherResult.isDone()).isTrue(); } + + private BatcherImpl> createDefaultBatcherImpl( + BatchingSettings settings, FlowController flowController) { + return new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, + callLabeledIntSquarer, + labeledIntList, + settings, + EXECUTOR, + flowController); + } }