Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

feat: add batch throttled time to tracer #1463

Merged
merged 7 commits into from Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions gax/src/main/java/com/google/api/gax/batching/Batcher.java
Expand Up @@ -32,6 +32,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.rpc.ApiCallContext;

/**
* Represents a batching context where individual elements will be accumulated and flushed in a
Expand All @@ -49,6 +50,9 @@
@InternalExtensionOnly
public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {

/** {@link ApiCallContext.Key} for tracking batch total throttled time */
ApiCallContext.Key<Long> THROTTLED_TIME_KEY = ApiCallContext.Key.create("total_throttled_time");

/**
* Queues the passed in element to be sent at some point in the future.
*
Expand Down
56 changes: 52 additions & 4 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Expand Up @@ -40,9 +40,11 @@
import com.google.api.gax.batching.FlowController.FlowControlException;
import com.google.api.gax.batching.FlowController.FlowControlRuntimeException;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
Expand Down Expand Up @@ -93,22 +95,26 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private SettableApiFuture<Void> closeFuture;
private final BatcherStats batcherStats = new BatcherStats();
private final FlowController flowController;
private final ApiCallContext callContext;

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response
* @param unaryCallable a {@link UnaryCallable} object
* @param prototype a {@link RequestT} object
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
* @deprecated Please instantiate the Batcher with {@link FlowController} and {@link
* ApiCallContext}
*/
@Deprecated
public BatcherImpl(
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
UnaryCallable<RequestT, ResponseT> unaryCallable,
RequestT prototype,
BatchingSettings batchingSettings,
ScheduledExecutorService executor) {

this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null);
this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null, null);
}

/**
Expand All @@ -119,7 +125,9 @@ public BatcherImpl(
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
* @param flowController a {@link FlowController} for throttling requests. If it's null, create a
* {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}.
* @deprecated Please instantiate the Batcher with {@link ApiCallContext}
*/
@Deprecated
public BatcherImpl(
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
UnaryCallable<RequestT, ResponseT> unaryCallable,
Expand All @@ -128,6 +136,35 @@ public BatcherImpl(
ScheduledExecutorService executor,
@Nullable FlowController flowController) {

this(
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
batchingDescriptor,
unaryCallable,
prototype,
batchingSettings,
executor,
flowController,
null);
}

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
* into wrappers request and response
* @param unaryCallable a {@link UnaryCallable} object
* @param prototype a {@link RequestT} object
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds
* @param flowController a {@link FlowController} for throttling requests. If it's null, create a
* {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}.
* @param callContext a {@link ApiCallContext} object that'll be merged in unaryCallable
*/
public BatcherImpl(
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor,
UnaryCallable<RequestT, ResponseT> unaryCallable,
RequestT prototype,
BatchingSettings batchingSettings,
ScheduledExecutorService executor,
@Nullable FlowController flowController,
@Nullable ApiCallContext callContext) {

this.batchingDescriptor =
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor cannot be null");
this.unaryCallable = Preconditions.checkNotNull(unaryCallable, "callable cannot be null");
Expand Down Expand Up @@ -168,6 +205,7 @@ public BatcherImpl(
scheduledFuture = Futures.immediateCancelledFuture();
}
currentBatcherReference = new BatcherReference(this);
this.callContext = callContext;
}

/** {@inheritDoc} */
Expand All @@ -192,16 +230,18 @@ public ApiFuture<ElementResultT> add(ElementT element) {
// class, which made it seem unnecessary to have blocking and non-blocking semaphore
// implementations. Some refactoring may be needed for the optimized implementation. So we'll
// defer it till we decide on if refactoring FlowController is necessary.
Stopwatch stopwatch = Stopwatch.createStarted();
try {
flowController.reserve(1, batchingDescriptor.countBytes(element));
} catch (FlowControlException e) {
// This exception will only be thrown if the FlowController is set to ThrowException behavior
throw FlowControlRuntimeException.fromFlowControlException(e);
}
long throttledTimeMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);

SettableApiFuture<ElementResultT> result = SettableApiFuture.create();
synchronized (elementLock) {
currentOpenBatch.add(element, result);
currentOpenBatch.add(element, result, throttledTimeMs);
}

if (currentOpenBatch.hasAnyThresholdReached()) {
Expand Down Expand Up @@ -230,8 +270,14 @@ public void sendOutstanding() {
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
}

// This check is for old clients that instantiated the batcher without ApiCallContext
ApiCallContext callContextWithOption = null;
if (callContext != null) {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
callContextWithOption =
callContext.withOption(THROTTLED_TIME_KEY, accumulatedBatch.totalThrottledTimeMs);
}
final ApiFuture<ResponseT> batchResponse =
unaryCallable.futureCall(accumulatedBatch.builder.build());
unaryCallable.futureCall(accumulatedBatch.builder.build(), callContextWithOption);

numOfOutstandingBatches.incrementAndGet();
ApiFutures.addCallback(
Expand Down Expand Up @@ -367,6 +413,7 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {

private long elementCounter = 0;
private long byteCounter = 0;
private long totalThrottledTimeMs = 0;

private Batch(
RequestT prototype,
Expand All @@ -383,11 +430,12 @@ private Batch(
this.batcherStats = batcherStats;
}

void add(ElementT element, SettableApiFuture<ElementResultT> result) {
void add(ElementT element, SettableApiFuture<ElementResultT> result, long throttledTimeMs) {
builder.add(element);
entries.add(BatchEntry.create(element, result));
elementCounter++;
byteCounter += descriptor.countBytes(element);
totalThrottledTimeMs += throttledTimeMs;
}

void onBatchSuccess(ResponseT response) {
Expand Down
29 changes: 26 additions & 3 deletions gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java
Expand Up @@ -33,6 +33,7 @@
import static com.google.api.gax.rpc.testing.FakeBatchableApi.callLabeledIntSquarer;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
Expand Down Expand Up @@ -75,6 +76,8 @@
import org.junit.function.ThrowingRunnable;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
Expand Down Expand Up @@ -132,9 +135,10 @@ public void testSendOutstanding() {
SQUARER_BATCHING_DESC_V2,
new LabeledIntSquarerCallable() {
@Override
public ApiFuture<List<Integer>> futureCall(LabeledIntList request) {
public ApiFuture<List<Integer>> futureCall(
LabeledIntList request, ApiCallContext context) {
callableCounter.incrementAndGet();
return super.futureCall(request);
return super.futureCall(request, context);
}
},
labeledIntList,
Expand Down Expand Up @@ -838,8 +842,23 @@ public void testThrottlingBlocking() throws Exception {
.setMaxOutstandingElementCount(1L)
.build());
ExecutorService executor = Executors.newSingleThreadExecutor();

ApiCallContext callContext = Mockito.mock(ApiCallContext.class);
ArgumentCaptor<ApiCallContext.Key<Long>> key =
ArgumentCaptor.forClass(ApiCallContext.Key.class);
ArgumentCaptor<Long> value = ArgumentCaptor.forClass(Long.class);
when(callContext.withOption(key.capture(), value.capture())).thenReturn(callContext);
long throttledTime = 10;

try (final Batcher<Integer, Integer> batcher =
createDefaultBatcherImpl(settings, flowController)) {
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2,
callLabeledIntSquarer,
labeledIntList,
settings,
EXECUTOR,
flowController,
callContext)) {
flowController.reserve(1, 1);
Future future =
executor.submit(
Expand All @@ -851,6 +870,7 @@ public void run() {
});
try {
future.get(10, TimeUnit.MILLISECONDS);
Thread.sleep(throttledTime);
assertWithMessage("adding elements to batcher should be blocked by FlowControlled").fail();
} catch (TimeoutException e) {
// expected
Expand All @@ -861,6 +881,9 @@ public void run() {
} catch (TimeoutException e) {
assertWithMessage("adding elements to batcher should not be blocked").fail();
}
// Verify that throttled time is recorded in ApiCallContext
assertThat(key.getValue()).isSameInstanceAs(Batcher.THROTTLED_TIME_KEY);
assertThat(value.getValue()).isAtLeast(throttledTime);
} finally {
executor.shutdownNow();
}
Expand Down