From 0d197a5ab9b7ccd20091c1c6c4a794586d6a51dc Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 10 Dec 2021 10:13:41 -0500 Subject: [PATCH] feat: add batch throttled ms metric (#888) * feat: add throttled time to ApiTracer * fix abstract class * update based on review * fix format * updates on comments * make tests more readable * update year * fix test * make the test more readable --- .../data/v2/stub/EnhancedBigtableStub.java | 69 +++++++-- .../data/v2/stub/metrics/BigtableTracer.java | 27 +++- .../data/v2/stub/metrics/CompositeTracer.java | 7 + .../data/v2/stub/metrics/MetricsTracer.java | 13 +- .../v2/stub/metrics/RpcMeasureConstants.java | 7 + .../v2/stub/metrics/RpcViewConstants.java | 11 ++ .../data/v2/stub/metrics/RpcViews.java | 3 +- .../metrics/TracedBatcherUnaryCallable.java | 50 +++++++ .../v2/stub/metrics/CompositeTracerTest.java | 20 +++ .../v2/stub/metrics/MetricsTracerTest.java | 132 +++++++++++++++++- .../misc_utilities/MethodComparator.java | 39 ++++++ 11 files changed, 356 insertions(+), 22 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/misc_utilities/MethodComparator.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index a1cb1ff58..d8daaa80e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -77,6 +77,7 @@ import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants; import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable; +import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable; @@ -88,6 +89,7 @@ import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable; import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -132,6 +134,7 @@ public class EnhancedBigtableStub implements AutoCloseable { private final ServerStreamingCallable readRowsCallable; private final UnaryCallable readRowCallable; + private final UnaryCallable> bulkReadRowsCallable; private final UnaryCallable> sampleRowKeysCallable; private final UnaryCallable mutateRowCallable; private final UnaryCallable bulkMutateRowsCallable; @@ -267,6 +270,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext readRowsCallable = createReadRowsCallable(new DefaultRowAdapter()); readRowCallable = createReadRowCallable(new DefaultRowAdapter()); + bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter()); sampleRowKeysCallable = createSampleRowKeysCallable(); mutateRowCallable = createMutateRowCallable(); bulkMutateRowsCallable = createBulkMutateRowsCallable(); @@ -430,6 +434,46 @@ public Map extract(ReadRowsRequest readRowsRequest) { return new FilterMarkerRowsCallable<>(retrying2, rowAdapter); } + /** + * Creates a callable chain to handle bulk ReadRows RPCs. This is meant to be used in ReadRows + * batcher. The chain will: + * + *
    + *
  • Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest}. + *
  • Upon receiving the response stream, it will merge the {@link + * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row + * implementation can be configured in by the {@code rowAdapter} parameter. + *
  • Retry/resume on failure. + *
  • Filter out marker rows. + *
  • Construct a {@link UnaryCallable} that will buffer the entire stream into memory before + * completing. If the stream is empty, then the list will be empty. + *
  • Add tracing & metrics. + *
+ */ + private UnaryCallable> createBulkReadRowsCallable( + RowAdapter rowAdapter) { + ServerStreamingCallable readRowsCallable = + createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter); + + ServerStreamingCallable readRowsUserCallable = + new ReadRowsUserCallable<>(readRowsCallable, requestContext); + + SpanName span = getSpanName("ReadRows"); + + // The TracedBatcherUnaryCallable has to be wrapped by the TracedUnaryCallable, so that + // TracedUnaryCallable can inject a tracer for the TracedBatcherUnaryCallable to interact with + UnaryCallable> tracedBatcher = + new TracedBatcherUnaryCallable<>(readRowsUserCallable.all()); + + UnaryCallable> withHeaderTracer = + new HeaderTracerUnaryCallable(tracedBatcher); + + UnaryCallable> traced = + new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span); + + return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + } + /** * Creates a callable chain to handle SampleRowKeys RPcs. The chain will: * @@ -549,8 +593,12 @@ private UnaryCallable createBulkMutateRowsCallable() { flowControlCallable != null ? flowControlCallable : baseCallable, requestContext); SpanName spanName = getSpanName("MutateRows"); + + UnaryCallable tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing); + UnaryCallable withHeaderTracer = - new HeaderTracerUnaryCallable<>(userFacing); + new HeaderTracerUnaryCallable<>(tracedBatcher); + UnaryCallable traced = new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName); @@ -578,17 +626,14 @@ private UnaryCallable createBulkMutateRowsCallable() { */ public Batcher newMutateRowsBatcher( @Nonnull String tableId, @Nullable GrpcCallContext ctx) { - UnaryCallable callable = this.bulkMutateRowsCallable; - if (ctx != null) { - callable = callable.withDefaultCallContext(ctx); - } return new BatcherImpl<>( settings.bulkMutateRowsSettings().getBatchingDescriptor(), - callable, + bulkMutateRowsCallable, BulkMutation.create(tableId), settings.bulkMutateRowsSettings().getBatchingSettings(), clientContext.getExecutor(), - bulkMutationFlowController); + bulkMutationFlowController, + MoreObjects.firstNonNull(ctx, clientContext.getDefaultCallContext())); } /** @@ -609,16 +654,14 @@ public Batcher newMutateRowsBatcher( public Batcher newBulkReadRowsBatcher( @Nonnull Query query, @Nullable GrpcCallContext ctx) { Preconditions.checkNotNull(query, "query cannot be null"); - UnaryCallable> callable = readRowsCallable().all(); - if (ctx != null) { - callable = callable.withDefaultCallContext(ctx); - } return new BatcherImpl<>( settings.bulkReadRowsSettings().getBatchingDescriptor(), - callable, + bulkReadRowsCallable, query, settings.bulkReadRowsSettings().getBatchingSettings(), - clientContext.getExecutor()); + clientContext.getExecutor(), + null, + MoreObjects.firstNonNull(ctx, clientContext.getDefaultCallContext())); } /** diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java index 844bb8d09..3d7707cc4 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java @@ -21,21 +21,40 @@ import com.google.api.gax.tracing.BaseApiTracer; import javax.annotation.Nullable; -/** A Bigtable specific {@link ApiTracer} that includes additional contexts. */ +/** + * A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base + * implementation that does nothing. + */ @BetaApi("This surface is stable yet it might be removed in the future.") -public abstract class BigtableTracer extends BaseApiTracer { +public class BigtableTracer extends BaseApiTracer { + + private volatile int attempt = 0; + + @Override + public void attemptStarted(int attemptNumber) { + this.attempt = attemptNumber; + } /** * Get the attempt number of the current call. Attempt number for the current call is passed in * and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from * {@link ApiCallContext}. Attempt number starts from 0. */ - public abstract int getAttempt(); + public int getAttempt() { + return attempt; + } /** * Record the latency between Google's network receives the RPC and reads back the first byte of * the response from server-timing header. If server-timing header is missing, increment the * missing header count. */ - public abstract void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable); + public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) { + // noop + } + + /** Adds an annotation of the total throttled time of a batch. */ + public void batchRequestThrottled(long throttledTimeMs) { + // noop + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java index 38f9da732..5f4580743 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java @@ -178,4 +178,11 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa tracer.recordGfeMetadata(latency, throwable); } } + + @Override + public void batchRequestThrottled(long throttledTimeMs) { + for (BigtableTracer tracer : bigtableTracers) { + tracer.batchRequestThrottled(throttledTimeMs); + } + } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java index af220aee7..f28b07c0c 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java @@ -129,8 +129,8 @@ public void connectionSelected(String s) { } @Override - public void attemptStarted(int i) { - attempt = i; + public void attemptStarted(int attemptNumber) { + attempt = attemptNumber; attemptCount++; attemptTimer = Stopwatch.createStarted(); attemptResponseCount = 0; @@ -226,6 +226,15 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa .build()); } + @Override + public void batchRequestThrottled(long totalThrottledMs) { + MeasureMap measures = + stats + .newMeasureMap() + .put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, totalThrottledMs); + measures.record(newTagCtxBuilder().build()); + } + private TagContextBuilder newTagCtxBuilder() { TagContextBuilder tagCtx = tagger diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java index e6e5c70db..edd73fc81 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java @@ -88,4 +88,11 @@ public class RpcMeasureConstants { "cloud.google.com/java/bigtable/gfe_header_missing_count", "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network", COUNT); + + /** Total throttled time of a batch in msecs. */ + public static final MeasureLong BIGTABLE_BATCH_THROTTLED_TIME = + MeasureLong.create( + "cloud.google.com/java/bigtable/batch_throttled_time", + "Total throttled time of a batch in msecs", + MILLISECOND); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java index a4acf9ea6..0d85c75e9 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java @@ -17,6 +17,7 @@ import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_LATENCY; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_INSTANCE_ID; @@ -154,4 +155,14 @@ class RpcViewConstants { BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP, BIGTABLE_STATUS)); + + // use distribution so we can correlate batch throttled time with op_latency + static final View BIGTABLE_BATCH_THROTTLED_TIME_VIEW = + View.create( + View.Name.create("cloud.google.com/java/bigtable/batch_throttled_time"), + "Total throttled time of a batch in msecs", + BIGTABLE_BATCH_THROTTLED_TIME, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + ImmutableList.of( + BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP)); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java index 9e8f6084a..8b8296b05 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java @@ -31,7 +31,8 @@ public class RpcViews { RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW, RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW, RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW, - RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW); + RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW, + RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW); private static final ImmutableSet GFE_VIEW_SET = ImmutableSet.of( diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java new file mode 100644 index 000000000..b7140f015 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/TracedBatcherUnaryCallable.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.core.ApiFuture; +import com.google.api.core.InternalApi; +import com.google.api.gax.batching.Batcher; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.tracing.ApiTracer; + +/** + * This callable will extract total throttled time from {@link ApiCallContext} and add it to {@link + * ApiTracer}. This class needs to be wrapped by a callable that injects the {@link ApiTracer}. + */ +@InternalApi +public final class TracedBatcherUnaryCallable + extends UnaryCallable { + private final UnaryCallable innerCallable; + + public TracedBatcherUnaryCallable(UnaryCallable innerCallable) { + this.innerCallable = innerCallable; + } + + @Override + public ApiFuture futureCall(RequestT request, ApiCallContext context) { + if (context.getOption(Batcher.THROTTLED_TIME_KEY) != null) { + ApiTracer tracer = context.getTracer(); + // this should always be true + if (tracer instanceof BigtableTracer) { + ((BigtableTracer) tracer) + .batchRequestThrottled(context.getOption(Batcher.THROTTLED_TIME_KEY)); + } + } + return innerCallable.futureCall(request, context); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java index bed0921fe..69a741d0e 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.data.v2.stub.metrics; +import static com.google.common.truth.Truth.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -22,9 +23,12 @@ import com.google.api.gax.tracing.ApiTracer; import com.google.api.gax.tracing.ApiTracer.Scope; +import com.google.cloud.bigtable.misc_utilities.MethodComparator; import com.google.common.collect.ImmutableList; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import java.lang.reflect.Method; +import java.util.Arrays; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -229,4 +233,20 @@ public void testRecordGfeLatency() { verify(child3, times(1)).recordGfeMetadata(20L, t); verify(child4, times(1)).recordGfeMetadata(20L, t); } + + @Test + public void testBatchRequestThrottled() { + compositeTracer.batchRequestThrottled(5L); + verify(child3, times(1)).batchRequestThrottled(5L); + verify(child4, times(1)).batchRequestThrottled(5L); + } + + @Test + public void testMethodsOverride() { + Method[] baseMethods = BigtableTracer.class.getDeclaredMethods(); + Method[] compositeTracerMethods = CompositeTracer.class.getDeclaredMethods(); + assertThat(Arrays.asList(compositeTracerMethods)) + .comparingElementsUsing(MethodComparator.METHOD_CORRESPONDENCE) + .containsAtLeastElementsIn(baseMethods); + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java index 735629977..69183f137 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java @@ -18,9 +18,18 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; - +import static org.mockito.Mockito.when; + +import com.google.api.gax.batching.Batcher; +import com.google.api.gax.batching.BatcherImpl; +import com.google.api.gax.batching.BatchingDescriptor; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ClientContext; import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.ReadRowsResponse.CellChunk; @@ -28,8 +37,11 @@ import com.google.cloud.bigtable.data.v2.FakeServiceHelper; import com.google.cloud.bigtable.data.v2.models.BulkMutation; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; +import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; +import com.google.cloud.bigtable.misc_utilities.MethodComparator; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -44,6 +56,9 @@ import io.opencensus.tags.TagKey; import io.opencensus.tags.TagValue; import io.opencensus.tags.Tags; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; @@ -55,6 +70,7 @@ import org.junit.runners.JUnit4; import org.mockito.Answers; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -88,6 +104,7 @@ public class MetricsTracerTest { private StatsComponentImpl localStats = new StatsComponentImpl(); private EnhancedBigtableStub stub; + private BigtableDataSettings settings; @Before public void setUp() throws Exception { @@ -96,7 +113,7 @@ public void setUp() throws Exception { RpcViews.registerBigtableClientViews(localStats.getViewManager()); - BigtableDataSettings settings = + settings = BigtableDataSettings.newBuilderForEmulator(serviceHelper.getPort()) .setProjectId(PROJECT_ID) .setInstanceId(INSTANCE_ID) @@ -351,6 +368,117 @@ public void testInvalidRequest() throws InterruptedException { } } + @Test + public void testBatchReadRowsThrottledTime() throws Exception { + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(DEFAULT_READ_ROWS_RESPONSES); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .readRows(any(ReadRowsRequest.class), any()); + + try (Batcher batcher = + stub.newBulkReadRowsBatcher(Query.create(TABLE_ID), GrpcCallContext.createDefault())) { + batcher.add(ByteString.copyFromUtf8("row1")); + batcher.sendOutstanding(); + + // Give OpenCensus a chance to update the views asynchronously. + Thread.sleep(100); + + long throttledTimeMetric = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(throttledTimeMetric).isEqualTo(0); + } + } + + @Test + public void testBatchMutateRowsThrottledTime() throws Exception { + FlowController flowController = Mockito.mock(FlowController.class); + BatchingDescriptor batchingDescriptor = Mockito.mock(MutateRowsBatchingDescriptor.class); + // Mock throttling + final long throttled = 50; + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(throttled); + return null; + } + }) + .when(flowController) + .reserve(any(Long.class), any(Long.class)); + when(flowController.getMaxElementCountLimit()).thenReturn(null); + when(flowController.getMaxRequestBytesLimit()).thenReturn(null); + when(batchingDescriptor.countBytes(any())).thenReturn(1l); + when(batchingDescriptor.newRequestBuilder(any())).thenCallRealMethod(); + + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(MutateRowsResponse.getDefaultInstance()); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .mutateRows(any(MutateRowsRequest.class), any()); + + ApiCallContext defaultContext = GrpcCallContext.createDefault(); + + Batcher batcher = + new BatcherImpl( + batchingDescriptor, + stub.bulkMutateRowsCallable().withDefaultCallContext(defaultContext), + BulkMutation.create(TABLE_ID), + settings.getStubSettings().bulkMutateRowsSettings().getBatchingSettings(), + Executors.newSingleThreadScheduledExecutor(), + flowController, + defaultContext); + + batcher.add(RowMutationEntry.create("key")); + batcher.sendOutstanding(); + + Thread.sleep(100); + long throttledTimeMetric = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRows")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(throttledTimeMetric).isAtLeast(throttled); + } + + @Test + public void testMethodsOverride() { + Method[] baseMethods = BigtableTracer.class.getDeclaredMethods(); + Method[] metricsTracerMethods = MetricsTracer.class.getDeclaredMethods(); + assertThat(Arrays.asList(metricsTracerMethods)) + .comparingElementsUsing(MethodComparator.METHOD_CORRESPONDENCE) + .containsAtLeastElementsIn(baseMethods); + } + @SuppressWarnings("unchecked") private static StreamObserver anyObserver(Class returnType) { return (StreamObserver) any(returnType); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/misc_utilities/MethodComparator.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/misc_utilities/MethodComparator.java new file mode 100644 index 000000000..4c3ecd274 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/misc_utilities/MethodComparator.java @@ -0,0 +1,39 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.misc_utilities; + +import com.google.common.truth.Correspondence; +import java.lang.reflect.Method; +import java.util.Arrays; + +/** + * A {@link Correspondence} to compare methods names, parameters and return types in different + * classes. An example usage is to make sure a child class is implementing all the methods in the + * non-abstract parent class. + */ +public class MethodComparator { + + public static final Correspondence METHOD_CORRESPONDENCE = + Correspondence.from( + MethodComparator::compareMethods, "compare method names, parameters and return types"); + + private static boolean compareMethods(Method actual, Method expected) { + return actual.getName().equals(expected.getName()) + && Arrays.equals(actual.getParameterTypes(), expected.getParameterTypes()) + && actual.getModifiers() == expected.getModifiers() + && actual.getReturnType().equals(expected.getReturnType()); + } +}