diff --git a/gax/src/main/java/com/google/api/gax/batching/Batcher.java b/gax/src/main/java/com/google/api/gax/batching/Batcher.java index d01f79f4a..8ac78ef2f 100644 --- a/gax/src/main/java/com/google/api/gax/batching/Batcher.java +++ b/gax/src/main/java/com/google/api/gax/batching/Batcher.java @@ -32,6 +32,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.BetaApi; import com.google.api.core.InternalExtensionOnly; +import com.google.api.gax.rpc.ApiCallContext; /** * Represents a batching context where individual elements will be accumulated and flushed in a @@ -49,6 +50,9 @@ @InternalExtensionOnly public interface Batcher extends AutoCloseable { + /** {@link ApiCallContext.Key} for tracking batch total throttled time */ + ApiCallContext.Key THROTTLED_TIME_KEY = ApiCallContext.Key.create("total_throttled_time"); + /** * Queues the passed in element to be sent at some point in the future. * diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index aeeab82b2..743d25c57 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -40,9 +40,11 @@ import com.google.api.gax.batching.FlowController.FlowControlException; import com.google.api.gax.batching.FlowController.FlowControlRuntimeException; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; +import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Futures; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; @@ -93,6 +95,7 @@ public class BatcherImpl private SettableApiFuture closeFuture; private final BatcherStats batcherStats = new BatcherStats(); private final FlowController flowController; + private final ApiCallContext callContext; /** * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements @@ -100,7 +103,10 @@ public class BatcherImpl * @param unaryCallable a {@link UnaryCallable} object * @param prototype a {@link RequestT} object * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds + * @deprecated Please instantiate the Batcher with {@link FlowController} and {@link + * ApiCallContext} */ + @Deprecated public BatcherImpl( BatchingDescriptor batchingDescriptor, UnaryCallable unaryCallable, @@ -108,7 +114,7 @@ public BatcherImpl( BatchingSettings batchingSettings, ScheduledExecutorService executor) { - this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null); + this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null, null); } /** @@ -119,7 +125,9 @@ public BatcherImpl( * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds * @param flowController a {@link FlowController} for throttling requests. If it's null, create a * {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}. + * @deprecated Please instantiate the Batcher with {@link ApiCallContext} */ + @Deprecated public BatcherImpl( BatchingDescriptor batchingDescriptor, UnaryCallable unaryCallable, @@ -128,6 +136,35 @@ public BatcherImpl( ScheduledExecutorService executor, @Nullable FlowController flowController) { + this( + batchingDescriptor, + unaryCallable, + prototype, + batchingSettings, + executor, + flowController, + null); + } + + /** + * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements + * into wrappers request and response + * @param unaryCallable a {@link UnaryCallable} object + * @param prototype a {@link RequestT} object + * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds + * @param flowController a {@link FlowController} for throttling requests. If it's null, create a + * {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}. + * @param callContext a {@link ApiCallContext} object that'll be merged in unaryCallable + */ + public BatcherImpl( + BatchingDescriptor batchingDescriptor, + UnaryCallable unaryCallable, + RequestT prototype, + BatchingSettings batchingSettings, + ScheduledExecutorService executor, + @Nullable FlowController flowController, + @Nullable ApiCallContext callContext) { + this.batchingDescriptor = Preconditions.checkNotNull(batchingDescriptor, "batching descriptor cannot be null"); this.unaryCallable = Preconditions.checkNotNull(unaryCallable, "callable cannot be null"); @@ -168,6 +205,7 @@ public BatcherImpl( scheduledFuture = Futures.immediateCancelledFuture(); } currentBatcherReference = new BatcherReference(this); + this.callContext = callContext; } /** {@inheritDoc} */ @@ -192,16 +230,18 @@ public ApiFuture add(ElementT element) { // class, which made it seem unnecessary to have blocking and non-blocking semaphore // implementations. Some refactoring may be needed for the optimized implementation. So we'll // defer it till we decide on if refactoring FlowController is necessary. + Stopwatch stopwatch = Stopwatch.createStarted(); try { flowController.reserve(1, batchingDescriptor.countBytes(element)); } catch (FlowControlException e) { // This exception will only be thrown if the FlowController is set to ThrowException behavior throw FlowControlRuntimeException.fromFlowControlException(e); } + long throttledTimeMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); SettableApiFuture result = SettableApiFuture.create(); synchronized (elementLock) { - currentOpenBatch.add(element, result); + currentOpenBatch.add(element, result, throttledTimeMs); } if (currentOpenBatch.hasAnyThresholdReached()) { @@ -230,8 +270,14 @@ public void sendOutstanding() { currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); } + // This check is for old clients that instantiated the batcher without ApiCallContext + ApiCallContext callContextWithOption = null; + if (callContext != null) { + callContextWithOption = + callContext.withOption(THROTTLED_TIME_KEY, accumulatedBatch.totalThrottledTimeMs); + } final ApiFuture batchResponse = - unaryCallable.futureCall(accumulatedBatch.builder.build()); + unaryCallable.futureCall(accumulatedBatch.builder.build(), callContextWithOption); numOfOutstandingBatches.incrementAndGet(); ApiFutures.addCallback( @@ -367,6 +413,7 @@ private static class Batch { private long elementCounter = 0; private long byteCounter = 0; + private long totalThrottledTimeMs = 0; private Batch( RequestT prototype, @@ -383,11 +430,12 @@ private Batch( this.batcherStats = batcherStats; } - void add(ElementT element, SettableApiFuture result) { + void add(ElementT element, SettableApiFuture result, long throttledTimeMs) { builder.add(element); entries.add(BatchEntry.create(element, result)); elementCounter++; byteCounter += descriptor.countBytes(element); + totalThrottledTimeMs += throttledTimeMs; } void onBatchSuccess(ResponseT response) { diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 10ccb1453..c3a1ce0bb 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -33,6 +33,7 @@ import static com.google.api.gax.rpc.testing.FakeBatchableApi.callLabeledIntSquarer; import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; +import static org.mockito.Mockito.when; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; @@ -75,6 +76,8 @@ import org.junit.function.ThrowingRunnable; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.threeten.bp.Duration; @RunWith(JUnit4.class) @@ -132,9 +135,10 @@ public void testSendOutstanding() { SQUARER_BATCHING_DESC_V2, new LabeledIntSquarerCallable() { @Override - public ApiFuture> futureCall(LabeledIntList request) { + public ApiFuture> futureCall( + LabeledIntList request, ApiCallContext context) { callableCounter.incrementAndGet(); - return super.futureCall(request); + return super.futureCall(request, context); } }, labeledIntList, @@ -838,8 +842,23 @@ public void testThrottlingBlocking() throws Exception { .setMaxOutstandingElementCount(1L) .build()); ExecutorService executor = Executors.newSingleThreadExecutor(); + + ApiCallContext callContext = Mockito.mock(ApiCallContext.class); + ArgumentCaptor> key = + ArgumentCaptor.forClass(ApiCallContext.Key.class); + ArgumentCaptor value = ArgumentCaptor.forClass(Long.class); + when(callContext.withOption(key.capture(), value.capture())).thenReturn(callContext); + long throttledTime = 10; + try (final Batcher batcher = - createDefaultBatcherImpl(settings, flowController)) { + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, + callLabeledIntSquarer, + labeledIntList, + settings, + EXECUTOR, + flowController, + callContext)) { flowController.reserve(1, 1); Future future = executor.submit( @@ -851,6 +870,7 @@ public void run() { }); try { future.get(10, TimeUnit.MILLISECONDS); + Thread.sleep(throttledTime); assertWithMessage("adding elements to batcher should be blocked by FlowControlled").fail(); } catch (TimeoutException e) { // expected @@ -861,6 +881,9 @@ public void run() { } catch (TimeoutException e) { assertWithMessage("adding elements to batcher should not be blocked").fail(); } + // Verify that throttled time is recorded in ApiCallContext + assertThat(key.getValue()).isSameInstanceAs(Batcher.THROTTLED_TIME_KEY); + assertThat(value.getValue()).isAtLeast(throttledTime); } finally { executor.shutdownNow(); }