From 81370d0ac1d2657d1b35fab95ec0166a437e161a Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 23 Jun 2021 19:59:20 +0000 Subject: [PATCH] feat: add throttled time to ApiTracer --- .../data/v2/stub/EnhancedBigtableStub.java | 58 +++++++- .../data/v2/stub/metrics/BigtableTracer.java | 3 + .../data/v2/stub/metrics/CompositeTracer.java | 7 + .../data/v2/stub/metrics/MetricsTracer.java | 9 ++ .../v2/stub/metrics/RpcMeasureConstants.java | 7 + .../v2/stub/metrics/RpcViewConstants.java | 10 ++ .../data/v2/stub/metrics/RpcViews.java | 3 +- .../metrics/TracedBatcherUnaryCallable.java | 50 +++++++ .../v2/stub/metrics/MetricsTracerTest.java | 124 +++++++++++++++++- 9 files changed, 264 insertions(+), 7 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index a1cb1ff58..428288272 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -77,6 +77,7 @@ import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants; import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable; +import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable; @@ -132,6 +133,7 @@ public class EnhancedBigtableStub implements AutoCloseable { private final ServerStreamingCallable readRowsCallable; private final UnaryCallable readRowCallable; + private final UnaryCallable> bulkReadRowsCallable; private final UnaryCallable> sampleRowKeysCallable; private final UnaryCallable mutateRowCallable; private final UnaryCallable bulkMutateRowsCallable; @@ -267,6 +269,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext readRowsCallable = createReadRowsCallable(new DefaultRowAdapter()); readRowCallable = createReadRowCallable(new DefaultRowAdapter()); + bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter()); sampleRowKeysCallable = createSampleRowKeysCallable(); mutateRowCallable = createMutateRowCallable(); bulkMutateRowsCallable = createBulkMutateRowsCallable(); @@ -430,6 +433,46 @@ public Map extract(ReadRowsRequest readRowsRequest) { return new FilterMarkerRowsCallable<>(retrying2, rowAdapter); } + /** + * Creates a callable chain to handle bulk ReadRows RPCs. This is meant to be used in ReadRows + * batcher. The chain will: + * + *
    + *
  • Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest}. + *
  • Upon receiving the response stream, it will merge the {@link + * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row + * implementation can be configured in by the {@code rowAdapter} parameter. + *
  • Retry/resume on failure. + *
  • Filter out marker rows. + *
  • Construct a {@link UnaryCallable} that will buffer the entire stream into memory before + * completing. If the stream is empty, then the list will be empty. + *
  • Add tracing & metrics. + *
+ */ + private UnaryCallable> createBulkReadRowsCallable( + RowAdapter rowAdapter) { + ServerStreamingCallable readRowsCallable = + createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter); + + ServerStreamingCallable readRowsUserCallable = + new ReadRowsUserCallable<>(readRowsCallable, requestContext); + + SpanName span = getSpanName("ReadRows"); + + // TracedBatcherUnaryCallable needs to be created before TracedUnaryCallable so a tracer is + // created before TracedBatcherUnaryCallable is called. + UnaryCallable> tracedBatcher = + new TracedBatcherUnaryCallable<>(readRowsUserCallable.all()); + + UnaryCallable> withHeaderTracer = + new HeaderTracerUnaryCallable(tracedBatcher); + + UnaryCallable> traced = + new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span); + + return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + } + /** * Creates a callable chain to handle SampleRowKeys RPcs. The chain will: * @@ -549,8 +592,12 @@ private UnaryCallable createBulkMutateRowsCallable() { flowControlCallable != null ? flowControlCallable : baseCallable, requestContext); SpanName spanName = getSpanName("MutateRows"); + + UnaryCallable tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing); + UnaryCallable withHeaderTracer = - new HeaderTracerUnaryCallable<>(userFacing); + new HeaderTracerUnaryCallable<>(tracedBatcher); + UnaryCallable traced = new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName); @@ -588,7 +635,8 @@ public Batcher newMutateRowsBatcher( BulkMutation.create(tableId), settings.bulkMutateRowsSettings().getBatchingSettings(), clientContext.getExecutor(), - bulkMutationFlowController); + bulkMutationFlowController, + ctx == null ? clientContext.getDefaultCallContext() : ctx); } /** @@ -609,7 +657,7 @@ public Batcher newMutateRowsBatcher( public Batcher newBulkReadRowsBatcher( @Nonnull Query query, @Nullable GrpcCallContext ctx) { Preconditions.checkNotNull(query, "query cannot be null"); - UnaryCallable> callable = readRowsCallable().all(); + UnaryCallable> callable = bulkReadRowsCallable; if (ctx != null) { callable = callable.withDefaultCallContext(ctx); } @@ -618,7 +666,9 @@ public Batcher newBulkReadRowsBatcher( callable, query, settings.bulkReadRowsSettings().getBatchingSettings(), - clientContext.getExecutor()); + clientContext.getExecutor(), + null, + ctx == null ? clientContext.getDefaultCallContext() : ctx); } /** diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java index 844bb8d09..1300f96bf 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java @@ -38,4 +38,7 @@ public abstract class BigtableTracer extends BaseApiTracer { * missing header count. */ public abstract void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable); + + /** Adds an annotation of the total throttled time of a batch. */ + public abstract void batchRequestThrottled(long throttledTimeMs); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java index 38f9da732..5f4580743 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java @@ -178,4 +178,11 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa tracer.recordGfeMetadata(latency, throwable); } } + + @Override + public void batchRequestThrottled(long throttledTimeMs) { + for (BigtableTracer tracer : bigtableTracers) { + tracer.batchRequestThrottled(throttledTimeMs); + } + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java index af220aee7..be6e627cc 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java @@ -226,6 +226,15 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa .build()); } + @Override + public void batchRequestThrottled(long totalThrottledMs) { + MeasureMap measures = + stats + .newMeasureMap() + .put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, totalThrottledMs); + measures.record(newTagCtxBuilder().build()); + } + private TagContextBuilder newTagCtxBuilder() { TagContextBuilder tagCtx = tagger diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java index e6e5c70db..edd73fc81 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java @@ -88,4 +88,11 @@ public class RpcMeasureConstants { "cloud.google.com/java/bigtable/gfe_header_missing_count", "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network", COUNT); + + /** Total throttled time of a batch in msecs. */ + public static final MeasureLong BIGTABLE_BATCH_THROTTLED_TIME = + MeasureLong.create( + "cloud.google.com/java/bigtable/batch_throttled_time", + "Total throttled time of a batch in msecs", + MILLISECOND); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java index a4acf9ea6..770646be7 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java @@ -17,6 +17,7 @@ import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_LATENCY; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_INSTANCE_ID; @@ -154,4 +155,13 @@ class RpcViewConstants { BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP, BIGTABLE_STATUS)); + + static final View BIGTABLE_BATCH_THROTTLED_TIME_VIEW = + View.create( + View.Name.create("cloud.google.com/java/bigtable/batch_throttled_time"), + "Total throttled time of a batch in msecs", + BIGTABLE_BATCH_THROTTLED_TIME, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + ImmutableList.of( + BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP)); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java index 9e8f6084a..8b8296b05 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java @@ -31,7 +31,8 @@ public class RpcViews { RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW, RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW, RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW, - RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW); + RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW, + RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW); private static final ImmutableSet GFE_VIEW_SET = ImmutableSet.of( diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java new file mode 100644 index 000000000..ffe1d23e5 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.core.ApiFuture; +import com.google.api.core.InternalApi; +import com.google.api.gax.batching.Batcher; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.tracing.ApiTracer; + +/** + * This callable will extract total throttled time from {@link ApiCallContext} and add it to {@link + * ApiTracer}. + */ +@InternalApi +public final class TracedBatcherUnaryCallable + extends UnaryCallable { + private final UnaryCallable innerCallable; + + public TracedBatcherUnaryCallable(UnaryCallable innerCallable) { + this.innerCallable = innerCallable; + } + + @Override + public ApiFuture futureCall(RequestT request, ApiCallContext context) { + if (context.getOption(Batcher.THROTTLED_TIME_KEY) != null) { + ApiTracer tracer = context.getTracer(); + // this should always be true + if (tracer instanceof BigtableTracer) { + ((BigtableTracer) tracer) + .batchRequestThrottled(context.getOption(Batcher.THROTTLED_TIME_KEY)); + } + } + return innerCallable.futureCall(request, context); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java index 735629977..856627e05 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java @@ -18,9 +18,18 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; - +import static org.mockito.Mockito.when; + +import com.google.api.gax.batching.Batcher; +import com.google.api.gax.batching.BatcherImpl; +import com.google.api.gax.batching.BatchingDescriptor; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ClientContext; import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.ReadRowsResponse.CellChunk; @@ -28,8 +37,10 @@ import com.google.cloud.bigtable.data.v2.FakeServiceHelper; import com.google.cloud.bigtable.data.v2.models.BulkMutation; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; +import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -44,6 +55,7 @@ import io.opencensus.tags.TagKey; import io.opencensus.tags.TagValue; import io.opencensus.tags.Tags; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; @@ -55,6 +67,7 @@ import org.junit.runners.JUnit4; import org.mockito.Answers; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -88,6 +101,7 @@ public class MetricsTracerTest { private StatsComponentImpl localStats = new StatsComponentImpl(); private EnhancedBigtableStub stub; + private BigtableDataSettings settings; @Before public void setUp() throws Exception { @@ -96,7 +110,7 @@ public void setUp() throws Exception { RpcViews.registerBigtableClientViews(localStats.getViewManager()); - BigtableDataSettings settings = + settings = BigtableDataSettings.newBuilderForEmulator(serviceHelper.getPort()) .setProjectId(PROJECT_ID) .setInstanceId(INSTANCE_ID) @@ -351,6 +365,112 @@ public void testInvalidRequest() throws InterruptedException { } } + @Test + public void testBatchReadRowsThrottledTime() throws Exception { + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(DEFAULT_READ_ROWS_RESPONSES); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .readRows(any(ReadRowsRequest.class), any()); + + try (Batcher batcher = + stub.newBulkReadRowsBatcher(Query.create(TABLE_ID), GrpcCallContext.createDefault())) { + batcher.add(ByteString.copyFromUtf8("row1")); + batcher.sendOutstanding(); + + // Give OpenCensus a chance to update the views asynchronously. + Thread.sleep(100); + + long throttledTimeMetric = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + Assert.assertEquals(0, throttledTimeMetric); + } + } + + @Test + public void testBatchMutateRowsThrottledTime() throws Exception { + FlowController flowController = Mockito.mock(FlowController.class); + BatchingDescriptor batchingDescriptor = Mockito.mock(MutateRowsBatchingDescriptor.class); + // Mock throttling + final long throttled = 50; + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(throttled); + return null; + } + }) + .when(flowController) + .reserve(any(Long.class), any(Long.class)); + when(flowController.getMaxElementCountLimit()).thenReturn(null); + when(flowController.getMaxRequestBytesLimit()).thenReturn(null); + when(batchingDescriptor.countBytes(any())).thenReturn(1l); + when(batchingDescriptor.newRequestBuilder(any())).thenCallRealMethod(); + + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(MutateRowsResponse.newBuilder().build()); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .mutateRows(any(MutateRowsRequest.class), any()); + + ApiCallContext defaultContext = GrpcCallContext.createDefault(); + + try { + Batcher batcher = + new BatcherImpl( + batchingDescriptor, + stub.bulkMutateRowsCallable().withDefaultCallContext(defaultContext), + BulkMutation.create(TABLE_ID), + settings.getStubSettings().bulkMutateRowsSettings().getBatchingSettings(), + Executors.newSingleThreadScheduledExecutor(), + flowController, + defaultContext); + + batcher.add(RowMutationEntry.create("key")); + batcher.sendOutstanding(); + + Thread.sleep(100); + long throttledTimeMetric = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRows")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + Assert.assertTrue(throttledTimeMetric >= throttled); + } catch (Exception e) { + throw e; + } + } + @SuppressWarnings("unchecked") private static StreamObserver anyObserver(Class returnType) { return (StreamObserver) any(returnType);