From 82c0f092990c971c686c890371f38c34849760d5 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 23 Jun 2021 19:59:20 +0000 Subject: [PATCH] feat: add throttled time to ApiTracer --- .../data/v2/BigtableDataSettings.java | 6 + .../data/v2/stub/EnhancedBigtableStub.java | 58 +++++++- .../data/v2/stub/metrics/CompositeTracer.java | 7 + .../data/v2/stub/metrics/MetricsTracer.java | 9 ++ .../v2/stub/metrics/RpcMeasureConstants.java | 7 + .../v2/stub/metrics/RpcViewConstants.java | 10 ++ .../data/v2/stub/metrics/RpcViews.java | 15 +++ .../metrics/TracedBatcherUnaryCallable.java | 47 +++++++ .../v2/stub/EnhancedBigtableStubTest.java | 2 + .../v2/stub/metrics/MetricsTracerTest.java | 125 +++++++++++++++++- 10 files changed, 280 insertions(+), 6 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java index e173571ff7..0b4732b8aa 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java @@ -193,6 +193,12 @@ public static void enableGfeOpenCensusStats() { com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientGfeViews(); } + /** Enables OpenCensus metric aggregations for Batch operations. */ + @BetaApi("OpenCensus stats integration is currently unstable and may change in the future") + public static void enableOpenCensusBatchStats() { + com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientBatchViews(); + } + /** Returns the target project id. */ public String getProjectId() { return stubSettings.getProjectId(); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 46729134fd..2923c1cd5c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -75,6 +75,7 @@ import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory; import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants; +import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable; @@ -130,6 +131,7 @@ public class EnhancedBigtableStub implements AutoCloseable { private final ServerStreamingCallable readRowsCallable; private final UnaryCallable readRowCallable; + private final UnaryCallable> bulkReadRowsCallable; private final UnaryCallable> sampleRowKeysCallable; private final UnaryCallable mutateRowCallable; private final UnaryCallable bulkMutateRowsCallable; @@ -273,6 +275,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext readRowsCallable = createReadRowsCallable(new DefaultRowAdapter()); readRowCallable = createReadRowCallable(new DefaultRowAdapter()); + bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter()); sampleRowKeysCallable = createSampleRowKeysCallable(); mutateRowCallable = createMutateRowCallable(); bulkMutateRowsCallable = createBulkMutateRowsCallable(); @@ -434,6 +437,46 @@ public Map extract(ReadRowsRequest readRowsRequest) { return new FilterMarkerRowsCallable<>(retrying2, rowAdapter); } + /** + * Creates a callable chain to handle bulk ReadRows RPCs. This is meant to be used in ReadRows + * batcher. The chain will: + * + *
    + *
  • Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest}. + *
  • Upon receiving the response stream, it will merge the {@link + * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row + * implementation can be configured in by the {@code rowAdapter} parameter. + *
  • Retry/resume on failure. + *
  • Filter out marker rows. + *
  • Construct a {@link UnaryCallable} that will buffer the entire stream into memory before + * completing. If the stream is empty, then the list will be empty. + *
  • Add tracing & metrics. + *
+ */ + private UnaryCallable> createBulkReadRowsCallable( + RowAdapter rowAdapter) { + ServerStreamingCallable readRowsCallable = + createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter); + + ServerStreamingCallable readRowsUserCallable = + new ReadRowsUserCallable<>(readRowsCallable, requestContext); + + SpanName span = getSpanName("ReadRows"); + + // TracedBatcherUnaryCallable needs to be created before TracedUnaryCallable so a tracer is + // created before TracedBatcherUnaryCallable is called. + UnaryCallable> tracedBatcher = + new TracedBatcherUnaryCallable<>(readRowsUserCallable.all()); + + UnaryCallable> traced = + new TracedUnaryCallable<>(tracedBatcher, clientContext.getTracerFactory(), span); + + UnaryCallable> withHeaderTracer = + new HeaderTracerUnaryCallable(traced, settings.getHeaderTracer(), span.toString()); + + return withHeaderTracer.withDefaultCallContext(clientContext.getDefaultCallContext()); + } + /** * Creates a callable chain to handle SampleRowKeys RPcs. The chain will: * @@ -549,8 +592,12 @@ private UnaryCallable createBulkMutateRowsCallable() { flowControlCallable != null ? flowControlCallable : baseCallable, requestContext); SpanName spanName = getSpanName("MutateRows"); + + UnaryCallable tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing); + UnaryCallable traced = - new TracedUnaryCallable<>(userFacing, clientContext.getTracerFactory(), spanName); + new TracedUnaryCallable<>(tracedBatcher, clientContext.getTracerFactory(), spanName); + UnaryCallable withHeaderTracer = new HeaderTracerUnaryCallable<>(traced, settings.getHeaderTracer(), spanName.toString()); @@ -588,7 +635,8 @@ public Batcher newMutateRowsBatcher( BulkMutation.create(tableId), settings.bulkMutateRowsSettings().getBatchingSettings(), clientContext.getExecutor(), - bulkMutationFlowController); + bulkMutationFlowController, + ctx == null ? clientContext.getDefaultCallContext() : ctx); } /** @@ -609,7 +657,7 @@ public Batcher newMutateRowsBatcher( public Batcher newBulkReadRowsBatcher( @Nonnull Query query, @Nullable GrpcCallContext ctx) { Preconditions.checkNotNull(query, "query cannot be null"); - UnaryCallable> callable = readRowsCallable().all(); + UnaryCallable> callable = bulkReadRowsCallable; if (ctx != null) { callable = callable.withDefaultCallContext(ctx); } @@ -618,7 +666,9 @@ public Batcher newBulkReadRowsBatcher( callable, query, settings.bulkReadRowsSettings().getBatchingSettings(), - clientContext.getExecutor()); + clientContext.getExecutor(), + null, + ctx == null ? clientContext.getDefaultCallContext() : ctx); } /** diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java index 25893ee881..cad8e013a9 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java @@ -152,4 +152,11 @@ public void batchRequestSent(long elementCount, long requestSize) { child.batchRequestSent(elementCount, requestSize); } } + + @Override + public void batchRequestThrottled(long throttledTimeMs) { + for (ApiTracer child : children) { + child.batchRequestThrottled(throttledTimeMs); + } + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java index 3d95324505..ed8091ee87 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java @@ -203,6 +203,15 @@ public void batchRequestSent(long elementCount, long requestSize) { // noop } + @Override + public void batchRequestThrottled(long totalThrottledMs) { + MeasureMap measures = + stats + .newMeasureMap() + .put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, totalThrottledMs); + measures.record(newTagCtxBuilder().build()); + } + private TagContextBuilder newTagCtxBuilder() { TagContextBuilder tagCtx = tagger diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java index e6e5c70db1..edd73fc81d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java @@ -88,4 +88,11 @@ public class RpcMeasureConstants { "cloud.google.com/java/bigtable/gfe_header_missing_count", "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network", COUNT); + + /** Total throttled time of a batch in msecs. */ + public static final MeasureLong BIGTABLE_BATCH_THROTTLED_TIME = + MeasureLong.create( + "cloud.google.com/java/bigtable/batch_throttled_time", + "Total throttled time of a batch in msecs", + MILLISECOND); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java index 8a14c01b13..92c8b52091 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java @@ -17,6 +17,7 @@ import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_LATENCY; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_INSTANCE_ID; @@ -146,4 +147,13 @@ class RpcViewConstants { SUM, ImmutableList.of( BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP)); + + static final View BIGTABLE_BATCH_THROTTLED_TIME_VIEW = + View.create( + View.Name.create("cloud.google.com/java/bigtable/batch_throttled_time"), + "Total throttled time of a batch in msecs", + BIGTABLE_BATCH_THROTTLED_TIME, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + ImmutableList.of( + BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP)); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java index 9e8f6084a2..6328436b3a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java @@ -38,6 +38,9 @@ public class RpcViews { RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW); + private static final ImmutableSet BIGTABLE_BATCH_VIEWS_SET = + ImmutableSet.of(RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW); + private static boolean gfeMetricsRegistered = false; /** Registers all Bigtable specific views. */ @@ -55,6 +58,11 @@ public static void registerBigtableClientGfeViews() { registerBigtableClientGfeViews(Stats.getViewManager()); } + /** Register Bigtable batch views. */ + public static void registerBigtableClientBatchViews() { + registerBigtableClientBatchViews(Stats.getViewManager()); + } + @VisibleForTesting static void registerBigtableClientViews(ViewManager viewManager) { for (View view : BIGTABLE_CLIENT_VIEWS_SET) { @@ -70,6 +78,13 @@ static void registerBigtableClientGfeViews(ViewManager viewManager) { gfeMetricsRegistered = true; } + @VisibleForTesting + static void registerBigtableClientBatchViews(ViewManager viewManager) { + for (View view : BIGTABLE_BATCH_VIEWS_SET) { + viewManager.registerView(view); + } + } + static boolean isGfeMetricsRegistered() { return gfeMetricsRegistered; } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java new file mode 100644 index 0000000000..2629308dd8 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java @@ -0,0 +1,47 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.gax.tracing.TracedUnaryCallable; +import com.google.api.core.ApiFuture; +import com.google.api.core.InternalApi; +import com.google.api.gax.batching.Batcher; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.tracing.ApiTracer; + +/** + * This callable will extract total throttled time from {@link ApiCallContext} and add it to {@link + * ApiTracer}. + */ +@InternalApi +public final class TracedBatcherUnaryCallable + extends UnaryCallable { + private final UnaryCallable innerCallable; + + public TracedBatcherUnaryCallable(UnaryCallable innerCallable) { + this.innerCallable = innerCallable; + } + + @Override + public ApiFuture futureCall(RequestT request, ApiCallContext context) { + if (context.getOption(Batcher.THROTTLED_TIME_KEY) != null) { + ApiTracer tracer = context.getTracer(); + tracer.batchRequestThrottled(context.getOption(Batcher.THROTTLED_TIME_KEY)); + } + return innerCallable.futureCall(request, context); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index ae045123f1..7addd59701 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -29,6 +29,7 @@ import com.google.api.gax.grpc.GaxGrpcProperties; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials; @@ -439,6 +440,7 @@ public void testCallContextPropagatedInMutationBatcher() .setPrimedTableIds("table1", "table2") .build(); + ClientContext context = ClientContext.create(settings); try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings)) { // clear the previous contexts contextInterceptor.contexts.clear(); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java index eb7bdaa998..ab31ec2f8a 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java @@ -18,9 +18,18 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; - +import static org.mockito.Mockito.when; + +import com.google.api.gax.batching.Batcher; +import com.google.api.gax.batching.BatcherImpl; +import com.google.api.gax.batching.BatchingDescriptor; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ClientContext; import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.ReadRowsResponse.CellChunk; @@ -28,8 +37,10 @@ import com.google.cloud.bigtable.data.v2.FakeServiceHelper; import com.google.cloud.bigtable.data.v2.models.BulkMutation; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; +import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -44,6 +55,7 @@ import io.opencensus.tags.TagKey; import io.opencensus.tags.TagValue; import io.opencensus.tags.Tags; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; @@ -55,6 +67,7 @@ import org.junit.runners.JUnit4; import org.mockito.Answers; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -88,6 +101,7 @@ public class MetricsTracerTest { private StatsComponentImpl localStats = new StatsComponentImpl(); private EnhancedBigtableStub stub; + private BigtableDataSettings settings; @Before public void setUp() throws Exception { @@ -95,8 +109,9 @@ public void setUp() throws Exception { serviceHelper.start(); RpcViews.registerBigtableClientViews(localStats.getViewManager()); + RpcViews.registerBigtableClientBatchViews(localStats.getViewManager()); - BigtableDataSettings settings = + settings = BigtableDataSettings.newBuilderForEmulator(serviceHelper.getPort()) .setProjectId(PROJECT_ID) .setInstanceId(INSTANCE_ID) @@ -351,6 +366,112 @@ public void testInvalidRequest() throws InterruptedException { } } + @Test + public void testBatchReadRowsThrottledTime() throws Exception { + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(DEFAULT_READ_ROWS_RESPONSES); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .readRows(any(ReadRowsRequest.class), anyObserver(ReadRowsResponse.class)); + + try (Batcher batcher = + stub.newBulkReadRowsBatcher(Query.create(TABLE_ID), GrpcCallContext.createDefault())) { + batcher.add(ByteString.copyFromUtf8("row1")); + batcher.sendOutstanding(); + + // Give OpenCensus a chance to update the views asynchronously. + Thread.sleep(100); + + long throttledTimeMetric = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + Assert.assertEquals(0, throttledTimeMetric); + } + } + + @Test + public void testBatchMutateRowsThrottledTime() throws Exception { + FlowController flowController = Mockito.mock(FlowController.class); + BatchingDescriptor batchingDescriptor = Mockito.mock(MutateRowsBatchingDescriptor.class); + // Mock throttling + final long throttled = 50; + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(throttled); + return null; + } + }) + .when(flowController) + .reserve(any(Long.class), any(Long.class)); + when(flowController.getMaxElementCountLimit()).thenReturn(null); + when(flowController.getMaxRequestBytesLimit()).thenReturn(null); + when(batchingDescriptor.countBytes(any())).thenReturn(1l); + when(batchingDescriptor.newRequestBuilder(any())).thenCallRealMethod(); + + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(MutateRowsResponse.newBuilder().build()); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .mutateRows(any(MutateRowsRequest.class), anyObserver(MutateRowsResponse.class)); + + ApiCallContext defaultContext = GrpcCallContext.createDefault(); + + try { + Batcher batcher = + new BatcherImpl( + batchingDescriptor, + stub.bulkMutateRowsCallable().withDefaultCallContext(defaultContext), + BulkMutation.create(TABLE_ID), + settings.getStubSettings().bulkMutateRowsSettings().getBatchingSettings(), + Executors.newSingleThreadScheduledExecutor(), + flowController, + defaultContext); + + batcher.add(RowMutationEntry.create("key")); + batcher.sendOutstanding(); + + Thread.sleep(100); + long throttledTimeMetric = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRows")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + Assert.assertTrue(throttledTimeMetric >= throttled); + } catch (Exception e) { + throw e; + } + } + @SuppressWarnings("unchecked") private static StreamObserver anyObserver(Class returnType) { return (StreamObserver) any(returnType);