Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
feat: add batch throttled time to tracer (#1463)
Browse files Browse the repository at this point in the history
Add `ApiTracer#batchRequestThrottled` to track total throttled time of a batch. In `BatcherImpl`, add throttled time to `ApiCallContext` so it can be accessed from callable chains and recorded in `ApiTracer`.
  • Loading branch information
mutianf committed Oct 27, 2021
1 parent 27bf265 commit 14c25cd
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 7 deletions.
4 changes: 4 additions & 0 deletions gax/src/main/java/com/google/api/gax/batching/Batcher.java
Expand Up @@ -32,6 +32,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.rpc.ApiCallContext;

/**
* Represents a batching context where individual elements will be accumulated and flushed in a
Expand All @@ -49,6 +50,9 @@
@InternalExtensionOnly
public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {

/** {@link ApiCallContext.Key} for tracking batch total throttled time */
ApiCallContext.Key<Long> THROTTLED_TIME_KEY = ApiCallContext.Key.create("total_throttled_time");

/**
* Queues the passed in element to be sent at some point in the future.
*
Expand Down
56 changes: 52 additions & 4 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Expand Up @@ -40,9 +40,11 @@
import com.google.api.gax.batching.FlowController.FlowControlException;
import com.google.api.gax.batching.FlowController.FlowControlRuntimeException;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
Expand Down Expand Up @@ -93,22 +95,26 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private SettableApiFuture<Void> closeFuture;
private final BatcherStats batcherStats = new BatcherStats();
private final FlowController flowController;
private final ApiCallContext callContext;

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response
* @param unaryCallable a {@link UnaryCallable} object
* @param prototype a {@link RequestT} object
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
* @deprecated Please instantiate the Batcher with {@link FlowController} and {@link
* ApiCallContext}
*/
@Deprecated
public BatcherImpl(
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
UnaryCallable<RequestT, ResponseT> unaryCallable,
RequestT prototype,
BatchingSettings batchingSettings,
ScheduledExecutorService executor) {

this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null);
this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null, null);
}

/**
Expand All @@ -119,7 +125,9 @@ public BatcherImpl(
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
* @param flowController a {@link FlowController} for throttling requests. If it's null, create a
* {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}.
* @deprecated Please instantiate the Batcher with {@link ApiCallContext}
*/
@Deprecated
public BatcherImpl(
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
UnaryCallable<RequestT, ResponseT> unaryCallable,
Expand All @@ -128,6 +136,35 @@ public BatcherImpl(
ScheduledExecutorService executor,
@Nullable FlowController flowController) {

this(
batchingDescriptor,
unaryCallable,
prototype,
batchingSettings,
executor,
flowController,
null);
}

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response
* @param unaryCallable a {@link UnaryCallable} object
* @param prototype a {@link RequestT} object
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
* @param flowController a {@link FlowController} for throttling requests. If it's null, create a
* {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}.
* @param callContext a {@link ApiCallContext} object that'll be merged in unaryCallable
*/
public BatcherImpl(
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
UnaryCallable<RequestT, ResponseT> unaryCallable,
RequestT prototype,
BatchingSettings batchingSettings,
ScheduledExecutorService executor,
@Nullable FlowController flowController,
@Nullable ApiCallContext callContext) {

this.batchingDescriptor =
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor cannot be null");
this.unaryCallable = Preconditions.checkNotNull(unaryCallable, "callable cannot be null");
Expand Down Expand Up @@ -168,6 +205,7 @@ public BatcherImpl(
scheduledFuture = Futures.immediateCancelledFuture();
}
currentBatcherReference = new BatcherReference(this);
this.callContext = callContext;
}

/** {@inheritDoc} */
Expand All @@ -192,16 +230,18 @@ public ApiFuture<ElementResultT> add(ElementT element) {
// class, which made it seem unnecessary to have blocking and non-blocking semaphore
// implementations. Some refactoring may be needed for the optimized implementation. So we'll
// defer it till we decide on if refactoring FlowController is necessary.
Stopwatch stopwatch = Stopwatch.createStarted();
try {
flowController.reserve(1, batchingDescriptor.countBytes(element));
} catch (FlowControlException e) {
// This exception will only be thrown if the FlowController is set to ThrowException behavior
throw FlowControlRuntimeException.fromFlowControlException(e);
}
long throttledTimeMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);

SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
synchronized (elementLock) {
currentOpenBatch.add(element, result);
currentOpenBatch.add(element, result, throttledTimeMs);
}

if (currentOpenBatch.hasAnyThresholdReached()) {
Expand Down Expand Up @@ -230,8 +270,14 @@ public void sendOutstanding() {
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
}

// This check is for old clients that instantiated the batcher without ApiCallContext
ApiCallContext callContextWithOption = null;
if (callContext != null) {
callContextWithOption =
callContext.withOption(THROTTLED_TIME_KEY, accumulatedBatch.totalThrottledTimeMs);
}
final ApiFuture<ResponseT> batchResponse =
unaryCallable.futureCall(accumulatedBatch.builder.build());
unaryCallable.futureCall(accumulatedBatch.builder.build(), callContextWithOption);

numOfOutstandingBatches.incrementAndGet();
ApiFutures.addCallback(
Expand Down Expand Up @@ -367,6 +413,7 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {

private long elementCounter = 0;
private long byteCounter = 0;
private long totalThrottledTimeMs = 0;

private Batch(
RequestT prototype,
Expand All @@ -383,11 +430,12 @@ private Batch(
this.batcherStats = batcherStats;
}

void add(ElementT element, SettableApiFuture<ElementResultT> result) {
void add(ElementT element, SettableApiFuture<ElementResultT> result, long throttledTimeMs) {
builder.add(element);
entries.add(BatchEntry.create(element, result));
elementCounter++;
byteCounter += descriptor.countBytes(element);
totalThrottledTimeMs += throttledTimeMs;
}

void onBatchSuccess(ResponseT response) {
Expand Down
29 changes: 26 additions & 3 deletions gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java
Expand Up @@ -33,6 +33,7 @@
import static com.google.api.gax.rpc.testing.FakeBatchableApi.callLabeledIntSquarer;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
Expand Down Expand Up @@ -75,6 +76,8 @@
import org.junit.function.ThrowingRunnable;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
Expand Down Expand Up @@ -132,9 +135,10 @@ public void testSendOutstanding() {
SQUARER_BATCHING_DESC_V2,
new LabeledIntSquarerCallable() {
@Override
public ApiFuture<List<Integer>> futureCall(LabeledIntList request) {
public ApiFuture<List<Integer>> futureCall(
LabeledIntList request, ApiCallContext context) {
callableCounter.incrementAndGet();
return super.futureCall(request);
return super.futureCall(request, context);
}
},
labeledIntList,
Expand Down Expand Up @@ -838,8 +842,23 @@ public void testThrottlingBlocking() throws Exception {
.setMaxOutstandingElementCount(1L)
.build());
ExecutorService executor = Executors.newSingleThreadExecutor();

ApiCallContext callContext = Mockito.mock(ApiCallContext.class);
ArgumentCaptor<ApiCallContext.Key<Long>> key =
ArgumentCaptor.forClass(ApiCallContext.Key.class);
ArgumentCaptor<Long> value = ArgumentCaptor.forClass(Long.class);
when(callContext.withOption(key.capture(), value.capture())).thenReturn(callContext);
long throttledTime = 10;

try (final Batcher<Integer, Integer> batcher =
createDefaultBatcherImpl(settings, flowController)) {
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2,
callLabeledIntSquarer,
labeledIntList,
settings,
EXECUTOR,
flowController,
callContext)) {
flowController.reserve(1, 1);
Future future =
executor.submit(
Expand All @@ -851,6 +870,7 @@ public void run() {
});
try {
future.get(10, TimeUnit.MILLISECONDS);
Thread.sleep(throttledTime);
assertWithMessage("adding elements to batcher should be blocked by FlowControlled").fail();
} catch (TimeoutException e) {
// expected
Expand All @@ -861,6 +881,9 @@ public void run() {
} catch (TimeoutException e) {
assertWithMessage("adding elements to batcher should not be blocked").fail();
}
// Verify that throttled time is recorded in ApiCallContext
assertThat(key.getValue()).isSameInstanceAs(Batcher.THROTTLED_TIME_KEY);
assertThat(value.getValue()).isAtLeast(throttledTime);
} finally {
executor.shutdownNow();
}
Expand Down

0 comments on commit 14c25cd

Please sign in to comment.