This repository has been archived by the owner on Sep 26, 2023. It is now read-only.
feat: add batch throttled time to tracer #1463
Merged
Merged
Changes from 3 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
298c7bf
feat: add batch throttled time to tracer
mutianf fa3ef6f
deflake test
mutianf 35975a5
Merge branch 'master' into batcher_stats
mutianf 357ac17
Deprecate constructors and update call context
mutianf 9eb045e
update docs
mutianf 200c759
Merge branch 'main' into batcher_stats
igorbernstein2 06ce4f2
Merge branch 'main' into batcher_stats
gcf-merge-on-green[bot] File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,9 +40,11 @@ | |
import com.google.api.gax.batching.FlowController.FlowControlException; | ||
import com.google.api.gax.batching.FlowController.FlowControlRuntimeException; | ||
import com.google.api.gax.batching.FlowController.LimitExceededBehavior; | ||
import com.google.api.gax.rpc.ApiCallContext; | ||
import com.google.api.gax.rpc.UnaryCallable; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.base.Stopwatch; | ||
import com.google.common.util.concurrent.Futures; | ||
import java.lang.ref.Reference; | ||
import java.lang.ref.ReferenceQueue; | ||
|
@@ -93,6 +95,7 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT> | |
private SettableApiFuture<Void> closeFuture; | ||
private final BatcherStats batcherStats = new BatcherStats(); | ||
private final FlowController flowController; | ||
private ApiCallContext callContext; | ||
|
||
/** | ||
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements | ||
|
@@ -108,7 +111,7 @@ public BatcherImpl( | |
BatchingSettings batchingSettings, | ||
ScheduledExecutorService executor) { | ||
|
||
this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null); | ||
this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null, null); | ||
} | ||
|
||
/** | ||
|
@@ -128,6 +131,35 @@ public BatcherImpl( | |
ScheduledExecutorService executor, | ||
@Nullable FlowController flowController) { | ||
|
||
this( | ||
igorbernstein2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
batchingDescriptor, | ||
unaryCallable, | ||
prototype, | ||
batchingSettings, | ||
executor, | ||
flowController, | ||
null); | ||
} | ||
|
||
/** | ||
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements | ||
* into wrappers request and response | ||
* @param unaryCallable a {@link UnaryCallable} object | ||
* @param prototype a {@link RequestT} object | ||
* @param batchingSettings a {@link BatchingSettings} with configuration of thresholds | ||
* @param flowController a {@link FlowController} for throttling requests. If it's null, create a | ||
* {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}. | ||
* @param callContext a {@link ApiCallContext} object that'll be merged in unaryCallable | ||
*/ | ||
public BatcherImpl( | ||
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> batchingDescriptor, | ||
UnaryCallable<RequestT, ResponseT> unaryCallable, | ||
RequestT prototype, | ||
BatchingSettings batchingSettings, | ||
ScheduledExecutorService executor, | ||
@Nullable FlowController flowController, | ||
@Nullable ApiCallContext callContext) { | ||
|
||
this.batchingDescriptor = | ||
Preconditions.checkNotNull(batchingDescriptor, "batching descriptor cannot be null"); | ||
this.unaryCallable = Preconditions.checkNotNull(unaryCallable, "callable cannot be null"); | ||
|
@@ -168,6 +200,7 @@ public BatcherImpl( | |
scheduledFuture = Futures.immediateCancelledFuture(); | ||
} | ||
currentBatcherReference = new BatcherReference(this); | ||
this.callContext = callContext; | ||
} | ||
|
||
/** {@inheritDoc} */ | ||
|
@@ -192,16 +225,18 @@ public ApiFuture<ElementResultT> add(ElementT element) { | |
// class, which made it seem unnecessary to have blocking and non-blocking semaphore | ||
// implementations. Some refactoring may be needed for the optimized implementation. So we'll | ||
// defer it till we decide on if refactoring FlowController is necessary. | ||
Stopwatch stopwatch = Stopwatch.createStarted(); | ||
try { | ||
flowController.reserve(1, batchingDescriptor.countBytes(element)); | ||
} catch (FlowControlException e) { | ||
// This exception will only be thrown if the FlowController is set to ThrowException behavior | ||
throw FlowControlRuntimeException.fromFlowControlException(e); | ||
} | ||
long throttledTimeMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); | ||
|
||
SettableApiFuture<ElementResultT> result = SettableApiFuture.create(); | ||
synchronized (elementLock) { | ||
currentOpenBatch.add(element, result); | ||
currentOpenBatch.add(element, result, throttledTimeMs); | ||
} | ||
|
||
if (currentOpenBatch.hasAnyThresholdReached()) { | ||
|
@@ -230,8 +265,13 @@ public void sendOutstanding() { | |
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); | ||
} | ||
|
||
if (callContext != null) { | ||
igorbernstein2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
callContext = | ||
igorbernstein2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
callContext.withOption(THROTTLED_TIME_KEY, accumulatedBatch.totalThrottledTimeMs); | ||
unaryCallable.withDefaultCallContext(callContext); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a typo |
||
} | ||
final ApiFuture<ResponseT> batchResponse = | ||
unaryCallable.futureCall(accumulatedBatch.builder.build()); | ||
unaryCallable.futureCall(accumulatedBatch.builder.build(), callContext); | ||
|
||
numOfOutstandingBatches.incrementAndGet(); | ||
ApiFutures.addCallback( | ||
|
@@ -367,6 +407,7 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> { | |
|
||
private long elementCounter = 0; | ||
private long byteCounter = 0; | ||
private long totalThrottledTimeMs = 0; | ||
|
||
private Batch( | ||
RequestT prototype, | ||
|
@@ -383,11 +424,12 @@ private Batch( | |
this.batcherStats = batcherStats; | ||
} | ||
|
||
void add(ElementT element, SettableApiFuture<ElementResultT> result) { | ||
void add(ElementT element, SettableApiFuture<ElementResultT> result, long throttledTimeMs) { | ||
builder.add(element); | ||
entries.add(BatchEntry.create(element, result)); | ||
elementCounter++; | ||
byteCounter += descriptor.countBytes(element); | ||
totalThrottledTimeMs += throttledTimeMs; | ||
} | ||
|
||
void onBatchSuccess(ResponseT response) { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be final