From 7214ef6853fc6892401b55bd1beeccbe896e4f33 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Mon, 18 May 2020 09:57:22 -0400 Subject: [PATCH] feat: Update opencensus metrics to include bigtable resource ids and rpc level metrics (#214) * feat: Update opencensus metrics to include bigtable resource ids and rpc level metrics This PR refactors opencensus metrics integration to use gax's ApiTracers. Which allows this client to instrument individual attempts and tag everything with bigtable resource ids * fix deps * typo * remove unused param * add clirr * review feedback * code formatting * fix deps Co-authored-by: Kristen O'Leary --- README.md | 57 +-- google-cloud-bigtable-deps-bom/pom.xml | 5 + .../clirr-ignored-differences.xml | 16 + google-cloud-bigtable/pom.xml | 15 +- .../data/v2/stub/EnhancedBigtableStub.java | 109 +++-- .../v2/stub/EnhancedBigtableStubSettings.java | 11 - .../data/v2/stub/metrics/CompositeTracer.java | 154 +++++++ .../stub/metrics/CompositeTracerFactory.java} | 36 +- .../metrics/MeasuredMutateRowsCallable.java | 112 ----- .../metrics/MeasuredReadRowsCallable.java | 143 ------- .../stub/metrics/MeasuredUnaryCallable.java | 104 ----- .../data/v2/stub/metrics/MetricsTracer.java | 218 ++++++++++ .../v2/stub/metrics/MetricsTracerFactory.java | 54 +++ .../v2/stub/metrics/RpcMeasureConstants.java | 68 +-- .../v2/stub/metrics/RpcViewConstants.java | 71 +++- .../data/v2/stub/metrics/RpcViews.java | 16 +- .../v2/stub/metrics/CompositeTracerTest.java | 174 ++++++++ .../MeasureMutateRowsCallableTest.java | 141 ------ .../metrics/MeasuredReadRowsCallableTest.java | 225 ---------- .../metrics/MeasuredUnaryCallableTest.java | 127 ------ .../v2/stub/metrics/MetricsTracerTest.java | 401 ++++++++++++++++++ 21 files changed, 1236 insertions(+), 1021 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java rename google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/{gaxx/tracing/WrappedTracerFactory.java => data/v2/stub/metrics/CompositeTracerFactory.java} (50%) delete mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredMutateRowsCallable.java delete mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallable.java delete mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallable.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerFactory.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java delete mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasureMutateRowsCallableTest.java delete mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallableTest.java delete mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallableTest.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java diff --git a/README.md b/README.md index 4cfbec85c..443edeb0d 100644 --- a/README.md +++ b/README.md @@ -176,13 +176,13 @@ If you are using Maven, add this to your pom.xml file io.opencensus opencensus-impl - 0.24.0 + 0.26.0 runtime io.opencensus opencensus-exporter-trace-stackdriver - 0.24.0 + 0.26.0 io.grpc @@ -197,13 +197,13 @@ If you are using Maven, add this to your pom.xml file ``` If you are using Gradle, add this to your dependencies ```Groovy -compile 'io.opencensus:opencensus-impl:0.24.0' -compile 'io.opencensus:opencensus-exporter-trace-stackdriver:0.24.0' +compile 'io.opencensus:opencensus-impl:0.26.0' +compile 'io.opencensus:opencensus-exporter-trace-stackdriver:0.26.0' ``` If you are using SBT, add this to your dependencies ```Scala -libraryDependencies += "io.opencensus" % "opencensus-impl" % "0.24.0" -libraryDependencies += "io.opencensus" % "opencensus-exporter-trace-stackdriver" % "0.24.0" +libraryDependencies += "io.opencensus" % "opencensus-impl" % "0.26.0" +libraryDependencies += "io.opencensus" % "opencensus-exporter-trace-stackdriver" % "0.26.0" ``` At the start of your application configure the exporter: @@ -236,30 +236,37 @@ Tracing.getTraceConfig().updateActiveTraceParams( Cloud Bigtable client supports [Opencensus Metrics](https://opencensus.io/stats/), which gives insight into the client internals and aids in debugging production issues. -Metrics prefixed with `cloud.google.com/java/bigtable/` focus on operation level -metrics across all of the retry attempts that occurred during that operation. RPC -level metrics can be gleaned from gRPC's metrics, which are prefixed with -`grpc.io/client/`. +All Cloud Bigtable Metrics are prefixed with `cloud.google.com/java/bigtable/`. The +metrics will be tagged with: + * `bigtable_project_id`: the project that contains the target Bigtable instance. + Please note that this id could be different from project that the client is running + in and different from the project where the metrics are exported to. +* `bigtable_instance_id`: the instance id of the target Bigtable instance +* `bigtable_app_profile_id`: the app profile id that is being used to access the target + Bigtable instance ### Available operation level metric views: -* `cloud.google.com/java/bigtable/op_latency`: A distribution latency of +* `cloud.google.com/java/bigtable/op_latency`: A distribution of latency of each client method call, across all of it's RPC attempts. Tagged by - method name and final response status. + operation name and final response status. * `cloud.google.com/java/bigtable/completed_ops`: The total count of - method invocations. Tagged by method name. Can be compared to - `grpc.io/client/completed_rpcs` to visualize retry attempts. + method invocations. Tagged by operation name and final response status. * `cloud.google.com/java/bigtable/read_rows_first_row_latency`: A distribution of the latency of receiving the first row in a ReadRows operation. -* `cloud.google.com/java/bigtable/rows_per_op`: A distribution of rows - read per ReadRows operation across all retry attempts. +* `cloud.google.com/java/bigtable/attempt_latency`: A distribution of latency of + each client RPC, tagged by operation name and the attempt status. Under normal + circumstances, this will be identical to op_latency. However, when the client + receives transient errors, op_latency will be the sum of all attempt_latencies + and the exponential delays -* `cloud.google.com/java/bigtable/mutations_per_batch`: A distribution - of mutations per BulkMutation. +* `cloud.google.com/java/bigtable/attempts_per_op`: A distribution of attempts that + each operation required, tagged by operation name and final operation status. + Under normal circumstances, this will be 1. By default, the functionality is disabled. For example to enable metrics using @@ -273,13 +280,13 @@ If you are using Maven, add this to your pom.xml file io.opencensus opencensus-impl - 0.24.0 + 0.26.0 runtime io.opencensus opencensus-exporter-stats-stackdriver - 0.24.0 + 0.26.0 io.grpc @@ -294,13 +301,13 @@ If you are using Maven, add this to your pom.xml file ``` If you are using Gradle, add this to your dependencies ```Groovy -compile 'io.opencensus:opencensus-impl:0.24.0' -compile 'io.opencensus:opencensus-exporter-stats-stackdriver:0.24.0' +compile 'io.opencensus:opencensus-impl:0.26.0' +compile 'io.opencensus:opencensus-exporter-stats-stackdriver:0.26.0' ``` If you are using SBT, add this to your dependencies ```Scala -libraryDependencies += "io.opencensus" % "opencensus-impl" % "0.24.0" -libraryDependencies += "io.opencensus" % "opencensus-exporter-stats-stackdriver" % "0.24.0" +libraryDependencies += "io.opencensus" % "opencensus-impl" % "0.26.0" +libraryDependencies += "io.opencensus" % "opencensus-exporter-stats-stackdriver" % "0.26.0" ``` At the start of your application configure the exporter and enable the Bigtable stats views: @@ -337,7 +344,7 @@ Add the following to your project's pom.xml. io.grpc grpc-bom - 1.27.0 + 1.28.0 pom import diff --git a/google-cloud-bigtable-deps-bom/pom.xml b/google-cloud-bigtable-deps-bom/pom.xml index 60f332659..4657ead1f 100644 --- a/google-cloud-bigtable-deps-bom/pom.xml +++ b/google-cloud-bigtable-deps-bom/pom.xml @@ -190,6 +190,11 @@ opencensus-api ${opencensus.version} + + io.opencensus + opencensus-impl-core + ${opencensus.version} + io.opencensus opencensus-contrib-grpc-util diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml index 9dec1aa01..ab921a973 100644 --- a/google-cloud-bigtable/clirr-ignored-differences.xml +++ b/google-cloud-bigtable/clirr-ignored-differences.xml @@ -7,4 +7,20 @@ *snapshot* *snapshot* + + 8001 + com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredMutateRowsCallable* + + + 8001 + com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallable* + + + 8001 + com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallable* + + + 8001 + com/google/cloud/bigtable/gaxx/tracing/WrappedTracerFactory* + \ No newline at end of file diff --git a/google-cloud-bigtable/pom.xml b/google-cloud-bigtable/pom.xml index 254d19faf..6988e5965 100644 --- a/google-cloud-bigtable/pom.xml +++ b/google-cloud-bigtable/pom.xml @@ -172,12 +172,6 @@ org.threeten threetenbp - - - - - - @@ -218,6 +212,11 @@ grpc-testing test + + io.opencensus + opencensus-impl + test + junit junit @@ -385,8 +384,12 @@ io.grpc:grpc-auth,io.grpc:grpc-grpclb,com.google.auto.value:auto-value + + io.opencensus:opencensus-impl-core + diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 58c0130e6..8d9d2fc70 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -20,6 +20,8 @@ import com.google.api.gax.batching.Batcher; import com.google.api.gax.batching.BatcherImpl; import com.google.api.gax.core.BackgroundResource; +import com.google.api.gax.core.GaxProperties; +import com.google.api.gax.grpc.GaxGrpcProperties; import com.google.api.gax.grpc.GrpcCallSettings; import com.google.api.gax.grpc.GrpcRawCallableFactory; import com.google.api.gax.retrying.ExponentialRetryAlgorithm; @@ -32,6 +34,7 @@ import com.google.api.gax.rpc.ServerStreamingCallSettings; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.tracing.OpencensusTracerFactory; import com.google.api.gax.tracing.SpanName; import com.google.api.gax.tracing.TracedServerStreamingCallable; import com.google.api.gax.tracing.TracedUnaryCallable; @@ -59,9 +62,9 @@ import com.google.cloud.bigtable.data.v2.models.RowAdapter; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; -import com.google.cloud.bigtable.data.v2.stub.metrics.MeasuredMutateRowsCallable; -import com.google.cloud.bigtable.data.v2.stub.metrics.MeasuredReadRowsCallable; -import com.google.cloud.bigtable.data.v2.stub.metrics.MeasuredUnaryCallable; +import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory; +import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory; +import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants; import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable; @@ -73,10 +76,13 @@ import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable; import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; import io.opencensus.stats.Stats; import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; import io.opencensus.tags.Tagger; import io.opencensus.tags.Tags; import java.io.IOException; @@ -98,16 +104,12 @@ */ @InternalApi public class EnhancedBigtableStub implements AutoCloseable { - private static final String TRACING_OUTER_CLIENT_NAME = "Bigtable"; + private static final String CLIENT_NAME = "Bigtable"; private final EnhancedBigtableStubSettings settings; private final ClientContext clientContext; private final RequestContext requestContext; - // TODO: This should probably move to ClientContext - private final Tagger tagger; - private final StatsRecorder statsRecorder; - private final ServerStreamingCallable readRowsCallable; private final UnaryCallable readRowCallable; private final UnaryCallable> sampleRowKeysCallable; @@ -125,15 +127,58 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) } @InternalApi("Visible for testing") - private EnhancedBigtableStub( + public EnhancedBigtableStub( EnhancedBigtableStubSettings settings, ClientContext clientContext, Tagger tagger, StatsRecorder statsRecorder) { this.settings = settings; - this.clientContext = clientContext; - this.tagger = tagger; - this.statsRecorder = statsRecorder; + + this.clientContext = + clientContext + .toBuilder() + .setTracerFactory( + new CompositeTracerFactory( + ImmutableList.of( + // Add OpenCensus Tracing + new OpencensusTracerFactory( + ImmutableMap.builder() + // Annotate traces with the same tags as metrics + .put( + RpcMeasureConstants.BIGTABLE_PROJECT_ID.getName(), + settings.getProjectId()) + .put( + RpcMeasureConstants.BIGTABLE_INSTANCE_ID.getName(), + settings.getInstanceId()) + .put( + RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID.getName(), + settings.getAppProfileId()) + // Also annotate traces with library versions + .put("gax", GaxGrpcProperties.getGaxGrpcVersion()) + .put("grpc", GaxGrpcProperties.getGrpcVersion()) + .put( + "gapic", + GaxProperties.getLibraryVersion( + EnhancedBigtableStubSettings.class)) + .build()), + // Add OpenCensus Metrics + MetricsTracerFactory.create( + tagger, + statsRecorder, + ImmutableMap.builder() + .put( + RpcMeasureConstants.BIGTABLE_PROJECT_ID, + TagValue.create(settings.getProjectId())) + .put( + RpcMeasureConstants.BIGTABLE_INSTANCE_ID, + TagValue.create(settings.getInstanceId())) + .put( + RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, + TagValue.create(settings.getAppProfileId())) + .build()), + // Add user configured tracer + clientContext.getTracerFactory()))) + .build(); this.requestContext = RequestContext.create( settings.getProjectId(), settings.getInstanceId(), settings.getAppProfileId()); @@ -196,17 +241,9 @@ public ServerStreamingCallable createReadRowsCallable( new TracedServerStreamingCallable<>( readRowsUserCallable, clientContext.getTracerFactory(), - SpanName.of(TRACING_OUTER_CLIENT_NAME, "ReadRows")); - - ServerStreamingCallable measured = - new MeasuredReadRowsCallable<>( - traced, - TRACING_OUTER_CLIENT_NAME + ".ReadRows", - tagger, - statsRecorder, - clientContext.getClock()); + SpanName.of(CLIENT_NAME, "ReadRows")); - return measured.withDefaultCallContext(clientContext.getDefaultCallContext()); + return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -393,19 +430,9 @@ private UnaryCallable createBulkMutateRowsCallable() { UnaryCallable traced = new TracedUnaryCallable<>( - userFacing, - clientContext.getTracerFactory(), - SpanName.of(TRACING_OUTER_CLIENT_NAME, "MutateRows")); - - UnaryCallable measured = - new MeasuredMutateRowsCallable( - traced, - TRACING_OUTER_CLIENT_NAME + ".MutateRows", - tagger, - statsRecorder, - clientContext.getClock()); + userFacing, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, "MutateRows")); - return measured.withDefaultCallContext(clientContext.getDefaultCallContext()); + return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -578,19 +605,9 @@ private UnaryCallable createUserFacin UnaryCallable traced = new TracedUnaryCallable<>( - inner, - clientContext.getTracerFactory(), - SpanName.of(TRACING_OUTER_CLIENT_NAME, methodName)); - - UnaryCallable measured = - new MeasuredUnaryCallable<>( - traced, - TRACING_OUTER_CLIENT_NAME + "." + methodName, - tagger, - statsRecorder, - clientContext.getClock()); + inner, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, methodName)); - return measured.withDefaultCallContext(clientContext.getDefaultCallContext()); + return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } // diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index ce8b4bb2b..139e7c223 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -20,9 +20,7 @@ import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; -import com.google.api.gax.core.GaxProperties; import com.google.api.gax.core.GoogleCredentialsProvider; -import com.google.api.gax.grpc.GaxGrpcProperties; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ServerStreamingCallSettings; @@ -30,7 +28,6 @@ import com.google.api.gax.rpc.StubSettings; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; -import com.google.api.gax.tracing.OpencensusTracerFactory; import com.google.cloud.bigtable.data.v2.internal.RefreshChannel; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.KeyOffset; @@ -42,7 +39,6 @@ import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import java.util.List; import java.util.Set; @@ -522,13 +518,6 @@ private Builder() { setInternalHeaderProvider( BigtableStubSettings.defaultApiClientHeaderProviderBuilder().build()); - setTracerFactory( - new OpencensusTracerFactory( - ImmutableMap.of( - "gax", GaxGrpcProperties.getGaxGrpcVersion(), - "grpc", GaxGrpcProperties.getGrpcVersion(), - "gapic", GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class)))); - // Per-method settings using baseSettings for defaults. readRowsSettings = ServerStreamingCallSettings.newBuilder(); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java new file mode 100644 index 000000000..33bf9c42b --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java @@ -0,0 +1,154 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.gax.tracing.ApiTracer; +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.List; +import org.threeten.bp.Duration; + +/** Combines multiple {@link ApiTracer}s into a single {@link ApiTracer}. */ +class CompositeTracer implements ApiTracer { + private final List children; + + CompositeTracer(List children) { + this.children = ImmutableList.copyOf(children); + } + + @Override + public Scope inScope() { + final List childScopes = new ArrayList<>(children.size()); + + for (ApiTracer child : children) { + childScopes.add(child.inScope()); + } + + return new Scope() { + @Override + public void close() { + for (Scope childScope : childScopes) { + childScope.close(); + } + } + }; + } + + @Override + public void operationSucceeded() { + for (ApiTracer child : children) { + child.operationSucceeded(); + } + } + + @Override + public void operationCancelled() { + for (ApiTracer child : children) { + child.operationCancelled(); + } + } + + @Override + public void operationFailed(Throwable error) { + for (ApiTracer child : children) { + child.operationFailed(error); + } + } + + @Override + public void connectionSelected(String id) { + for (ApiTracer child : children) { + child.connectionSelected(id); + } + } + + @Override + public void attemptStarted(int attemptNumber) { + for (ApiTracer child : children) { + child.attemptStarted(attemptNumber); + } + } + + @Override + public void attemptSucceeded() { + for (ApiTracer child : children) { + child.attemptSucceeded(); + } + } + + @Override + public void attemptCancelled() { + for (ApiTracer child : children) { + child.attemptCancelled(); + } + } + + @Override + public void attemptFailed(Throwable error, Duration delay) { + for (ApiTracer child : children) { + child.attemptFailed(error, delay); + } + } + + @Override + public void attemptFailedRetriesExhausted(Throwable error) { + for (ApiTracer child : children) { + child.attemptFailedRetriesExhausted(error); + } + } + + @Override + public void attemptPermanentFailure(Throwable error) { + for (ApiTracer child : children) { + child.attemptPermanentFailure(error); + } + } + + @Override + public void lroStartFailed(Throwable error) { + for (ApiTracer child : children) { + child.lroStartFailed(error); + } + } + + @Override + public void lroStartSucceeded() { + for (ApiTracer child : children) { + child.lroStartSucceeded(); + } + } + + @Override + public void responseReceived() { + for (ApiTracer child : children) { + child.responseReceived(); + } + } + + @Override + public void requestSent() { + for (ApiTracer child : children) { + child.requestSent(); + } + } + + @Override + public void batchRequestSent(long elementCount, long requestSize) { + for (ApiTracer child : children) { + child.batchRequestSent(elementCount, requestSize); + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/tracing/WrappedTracerFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory.java similarity index 50% rename from google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/tracing/WrappedTracerFactory.java rename to google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory.java index 253d7a207..e2e399ae3 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/tracing/WrappedTracerFactory.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Google LLC + * Copyright 2020 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,36 +13,32 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.google.cloud.bigtable.gaxx.tracing; +package com.google.cloud.bigtable.data.v2.stub.metrics; import com.google.api.core.InternalApi; import com.google.api.gax.tracing.ApiTracer; import com.google.api.gax.tracing.ApiTracerFactory; import com.google.api.gax.tracing.SpanName; +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.List; -/** - * Simple wrapper around {@link ApiTracerFactory} to augment the client name of the generated - * traces. - * - *

This is used to disambiguate traces in underlying GAPIC client from the manually written - * overlay. - * - *

For internal use, public for technical reasons. - */ -@InternalApi -public class WrappedTracerFactory implements ApiTracerFactory { - private final ApiTracerFactory innerFactory; - private final String clientName; +/** Combines multiple {@link ApiTracerFactory} into a single {@link ApiTracerFactory}. */ +@InternalApi("For internal use only") +public class CompositeTracerFactory implements ApiTracerFactory { + private final List apiTracerFactories; - public WrappedTracerFactory(ApiTracerFactory tracerFactory, String clientName) { - this.innerFactory = tracerFactory; - this.clientName = clientName; + public CompositeTracerFactory(List apiTracerFactories) { + this.apiTracerFactories = ImmutableList.copyOf(apiTracerFactories); } @Override public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) { - spanName = SpanName.of(clientName, spanName.getMethodName()); + List children = new ArrayList<>(apiTracerFactories.size()); - return innerFactory.newTracer(parent, spanName, operationType); + for (ApiTracerFactory factory : apiTracerFactories) { + children.add(factory.newTracer(parent, spanName, operationType)); + } + return new CompositeTracer(children); } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredMutateRowsCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredMutateRowsCallable.java deleted file mode 100644 index 11878635f..000000000 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredMutateRowsCallable.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright 2019 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.stub.metrics; - -import com.google.api.core.ApiClock; -import com.google.api.core.ApiFuture; -import com.google.api.core.InternalApi; -import com.google.api.gax.rpc.ApiCallContext; -import com.google.api.gax.rpc.UnaryCallable; -import com.google.cloud.bigtable.data.v2.models.BulkMutation; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.MoreExecutors; -import io.opencensus.stats.StatsRecorder; -import io.opencensus.tags.TagContext; -import io.opencensus.tags.TagValue; -import io.opencensus.tags.Tagger; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; - -/** - * This callable will instrument MutateRows invocations using OpenCensus stats. - * - *

Recorded stats: - * - *

- *
{@link RpcMeasureConstants#BIGTABLE_OP_LATENCY} - *
the total time it took the operation across all of its retry attempts to complete. - *
{@link RpcMeasureConstants#BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH} - *
the number of mutations sent per batch operation. Retry attempts might have few entries. - *
- * - *

For internal use only. - */ -@InternalApi -public class MeasuredMutateRowsCallable extends UnaryCallable { - private final UnaryCallable innerCallable; - private final TagValue methodName; - private final TagContext parentCtx; - private final Tagger tagger; - private final StatsRecorder stats; - private final ApiClock clock; - - @InternalApi - public MeasuredMutateRowsCallable( - @Nonnull UnaryCallable innerCallable, - @Nonnull String methodName, - @Nonnull Tagger tagger, - @Nonnull StatsRecorder stats, - @Nonnull ApiClock clock) { - this.innerCallable = Preconditions.checkNotNull(innerCallable, "innerCallable"); - this.methodName = TagValue.create(Preconditions.checkNotNull(methodName, "methodName")); - this.tagger = Preconditions.checkNotNull(tagger, "tagger"); - this.parentCtx = tagger.getCurrentTagContext(); - this.stats = Preconditions.checkNotNull(stats, "stats"); - this.clock = Preconditions.checkNotNull(clock, "clock"); - } - - @Override - public ApiFuture futureCall(BulkMutation request, ApiCallContext context) { - long operationStartTime = clock.nanoTime(); - - final ApiFuture future = innerCallable.futureCall(request, context); - future.addListener( - new StatsRecordingRunnable(future, operationStartTime, request.getEntryCount()), - MoreExecutors.directExecutor()); - return future; - } - - private class StatsRecordingRunnable implements Runnable { - private final Future operationFuture; - private final long operationStart; - private final long numEntries; - - private StatsRecordingRunnable( - @Nonnull Future operationFuture, long operationStartTime, long numEntries) { - this.operationFuture = Preconditions.checkNotNull(operationFuture, "operationFuture"); - this.operationStart = operationStartTime; - this.numEntries = numEntries; - } - - @Override - public void run() { - long elapsed = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - operationStart); - - stats - .newMeasureMap() - .put(RpcMeasureConstants.BIGTABLE_OP_LATENCY, elapsed) - .put(RpcMeasureConstants.BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH, numEntries) - .record( - tagger - .toBuilder(parentCtx) - .putLocal(RpcMeasureConstants.BIGTABLE_OP, methodName) - .putLocal( - RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(operationFuture)) - .build()); - } - } -} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallable.java deleted file mode 100644 index 0ac3777ea..000000000 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallable.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Copyright 2019 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.stub.metrics; - -import com.google.api.core.ApiClock; -import com.google.api.core.InternalApi; -import com.google.api.gax.rpc.ApiCallContext; -import com.google.api.gax.rpc.ResponseObserver; -import com.google.api.gax.rpc.ServerStreamingCallable; -import com.google.api.gax.rpc.StreamController; -import com.google.cloud.bigtable.data.v2.models.Query; -import com.google.common.base.Preconditions; -import io.opencensus.stats.MeasureMap; -import io.opencensus.stats.StatsRecorder; -import io.opencensus.tags.TagContext; -import io.opencensus.tags.TagValue; -import io.opencensus.tags.Tagger; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -/** - * This callable will instrument ReadRows invocations using OpenCensus stats. - * - *

Recorded stats: - * - *

- *
{@link RpcMeasureConstants#BIGTABLE_OP_LATENCY} - *
the total time it took the operation across all of its retry attempts to complete. - *
{@link RpcMeasureConstants#BIGTABLE_ROWS_READ_PER_OP} - *
the number of rows received across all of the retries for each invocation. - *
{@link RpcMeasureConstants#BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY} - *
the amount of time it took the caller to receive the first row. - *
- * - *

For internal use only. - */ -@InternalApi -public class MeasuredReadRowsCallable extends ServerStreamingCallable { - private final ServerStreamingCallable innerCallable; - - private final TagValue methodName; - private final TagContext parentCtx; - - private final Tagger tagger; - private final StatsRecorder stats; - private final ApiClock clock; - - public MeasuredReadRowsCallable( - @Nonnull ServerStreamingCallable innerCallable, - @Nonnull String methodName, - @Nonnull Tagger tagger, - @Nonnull StatsRecorder stats, - @Nonnull ApiClock clock) { - this.innerCallable = Preconditions.checkNotNull(innerCallable, "innerCallable"); - this.methodName = TagValue.create(Preconditions.checkNotNull(methodName, "methodName")); - this.tagger = Preconditions.checkNotNull(tagger, "tagger"); - this.parentCtx = tagger.getCurrentTagContext(); - this.stats = Preconditions.checkNotNull(stats, "stats"); - this.clock = Preconditions.checkNotNull(clock, "clock"); - } - - @Override - public void call(Query request, ResponseObserver outerObserver, ApiCallContext context) { - innerCallable.call(request, new MeasuredResponseObserver(outerObserver), context); - } - - private class MeasuredResponseObserver implements ResponseObserver { - private final ResponseObserver outerResponseObserver; - - private final long operationStart; - private Long firstRowReceivedAt = null; - private long rowsRead = 0; - - private MeasuredResponseObserver(@Nonnull ResponseObserver outerResponseObserver) { - this.outerResponseObserver = - Preconditions.checkNotNull(outerResponseObserver, "outerResponseObserver"); - this.operationStart = clock.nanoTime(); - } - - @Override - public void onStart(StreamController controller) { - outerResponseObserver.onStart(controller); - } - - @Override - public void onResponse(RowT row) { - if (firstRowReceivedAt == null) { - firstRowReceivedAt = clock.nanoTime(); - } - rowsRead++; - outerResponseObserver.onResponse(row); - } - - @Override - public void onError(Throwable t) { - recordStats(t); - outerResponseObserver.onError(t); - } - - @Override - public void onComplete() { - recordStats(null); - outerResponseObserver.onComplete(); - } - - private void recordStats(@Nullable Throwable error) { - long now = clock.nanoTime(); - long elapsed = TimeUnit.NANOSECONDS.toMillis(now - operationStart); - - MeasureMap measures = - stats - .newMeasureMap() - .put(RpcMeasureConstants.BIGTABLE_OP_LATENCY, elapsed) - .put(RpcMeasureConstants.BIGTABLE_ROWS_READ_PER_OP, rowsRead); - - if (firstRowReceivedAt != null) { - long firstRowLatency = TimeUnit.NANOSECONDS.toMillis(firstRowReceivedAt - operationStart); - measures.put(RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY, firstRowLatency); - } - - measures.record( - tagger - .toBuilder(parentCtx) - .putLocal(RpcMeasureConstants.BIGTABLE_OP, methodName) - .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(error)) - .build()); - } - } -} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallable.java deleted file mode 100644 index aade43fd2..000000000 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallable.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright 2019 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.stub.metrics; - -import com.google.api.core.ApiClock; -import com.google.api.core.ApiFuture; -import com.google.api.core.InternalApi; -import com.google.api.gax.rpc.ApiCallContext; -import com.google.api.gax.rpc.UnaryCallable; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.MoreExecutors; -import io.opencensus.stats.StatsRecorder; -import io.opencensus.tags.TagContext; -import io.opencensus.tags.TagValue; -import io.opencensus.tags.Tagger; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; - -/** - * This callable will instrument callable invocations using OpenCensus stats. - * - *

Recorded stats: - * - *

- *
{@link RpcMeasureConstants#BIGTABLE_OP_LATENCY} - *
the total time it took the operation across all of its retry attempts to complete - *
- * - *

For internal use only. - */ -@InternalApi -public class MeasuredUnaryCallable extends UnaryCallable { - private final UnaryCallable innerCallable; - - private final TagValue methodName; - private final TagContext parentCtx; - - private final Tagger tagger; - private final StatsRecorder stats; - private final ApiClock clock; - - public MeasuredUnaryCallable( - @Nonnull UnaryCallable innerCallable, - @Nonnull String methodName, - @Nonnull Tagger tagger, - @Nonnull StatsRecorder stats, - @Nonnull ApiClock clock) { - this.innerCallable = Preconditions.checkNotNull(innerCallable, "innerCallable"); - this.methodName = TagValue.create(Preconditions.checkNotNull(methodName, "methodName")); - this.tagger = Preconditions.checkNotNull(tagger, "tagger"); - this.parentCtx = tagger.getCurrentTagContext(); - this.stats = Preconditions.checkNotNull(stats, "stats"); - this.clock = Preconditions.checkNotNull(clock, "clock"); - } - - @Override - public ApiFuture futureCall(RequestT request, ApiCallContext context) { - long startTime = clock.nanoTime(); - ApiFuture future = innerCallable.futureCall(request, context); - future.addListener( - new StatsRecordingRunnable(future, startTime), MoreExecutors.directExecutor()); - return future; - } - - private class StatsRecordingRunnable implements Runnable { - private final Future operationFuture; - private final long operationStart; - - private StatsRecordingRunnable(@Nonnull Future operationFuture, long startTime) { - this.operationFuture = Preconditions.checkNotNull(operationFuture, "operationFuture"); - this.operationStart = startTime; - } - - @Override - public void run() { - long elapsed = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - operationStart); - - stats - .newMeasureMap() - .put(RpcMeasureConstants.BIGTABLE_OP_LATENCY, elapsed) - .record( - tagger - .toBuilder(parentCtx) - .putLocal(RpcMeasureConstants.BIGTABLE_OP, methodName) - .putLocal( - RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(operationFuture)) - .build()); - } - } -} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java new file mode 100644 index 000000000..864ba7502 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java @@ -0,0 +1,218 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.ApiTracerFactory.OperationType; +import com.google.api.gax.tracing.SpanName; +import com.google.common.base.Stopwatch; +import io.opencensus.stats.MeasureMap; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; +import org.threeten.bp.Duration; + +class MetricsTracer implements ApiTracer { + private final OperationType operationType; + + private final Tagger tagger; + private final StatsRecorder stats; + + // Tags + private final TagContext parentContext; + private final SpanName spanName; + private final Map statsAttributes; + + // Operation level metrics + private final AtomicBoolean opFinished = new AtomicBoolean(); + private final Stopwatch operationTimer = Stopwatch.createStarted(); + private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted(); + private long operationResponseCount = 0; + + // Attempt level metrics + private int attemptCount = 0; + private Stopwatch attemptTimer; + private long attemptResponseCount = 0; + + MetricsTracer( + OperationType operationType, + Tagger tagger, + StatsRecorder stats, + SpanName spanName, + Map statsAttributes) { + this.operationType = operationType; + this.tagger = tagger; + this.stats = stats; + this.parentContext = tagger.getCurrentTagContext(); + this.spanName = spanName; + this.statsAttributes = statsAttributes; + } + + @Override + public Scope inScope() { + return new Scope() { + @Override + public void close() {} + }; + } + + @Override + public void operationSucceeded() { + recordOperationCompletion(null); + } + + @Override + public void operationCancelled() { + recordOperationCompletion(new CancellationException()); + } + + @Override + public void operationFailed(Throwable throwable) { + recordOperationCompletion(throwable); + } + + private void recordOperationCompletion(@Nullable Throwable throwable) { + if (!opFinished.compareAndSet(false, true)) { + return; + } + operationTimer.stop(); + + long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS); + + MeasureMap measures = + stats + .newMeasureMap() + .put(RpcMeasureConstants.BIGTABLE_OP_LATENCY, elapsed) + .put(RpcMeasureConstants.BIGTABLE_OP_ATTEMPT_COUNT, attemptCount); + + if (operationType == OperationType.ServerStreaming + && spanName.getMethodName().equals("ReadRows")) { + measures.put( + RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY, + firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS)); + } + + TagContextBuilder tagCtx = + newTagCtxBuilder() + .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable)); + + measures.record(tagCtx.build()); + } + + @Override + public void connectionSelected(String s) { + // noop: cardinality for connection ids is too high to use as tags + } + + @Override + public void attemptStarted(int i) { + attemptCount++; + attemptTimer = Stopwatch.createStarted(); + attemptResponseCount = 0; + } + + @Override + public void attemptSucceeded() { + recordAttemptCompletion(null); + } + + @Override + public void attemptCancelled() { + recordAttemptCompletion(new CancellationException()); + } + + @Override + public void attemptFailed(Throwable throwable, Duration duration) { + recordAttemptCompletion(throwable); + } + + @Override + public void attemptFailedRetriesExhausted(Throwable throwable) { + recordAttemptCompletion(throwable); + } + + @Override + public void attemptPermanentFailure(Throwable throwable) { + recordAttemptCompletion(throwable); + } + + private void recordAttemptCompletion(@Nullable Throwable throwable) { + MeasureMap measures = + stats + .newMeasureMap() + .put( + RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY, + attemptTimer.elapsed(TimeUnit.MILLISECONDS)); + + TagContextBuilder tagCtx = + newTagCtxBuilder() + .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable)); + + measures.record(tagCtx.build()); + } + + @Override + public void lroStartFailed(Throwable throwable) { + // noop + } + + @Override + public void lroStartSucceeded() { + // noop + } + + @Override + public void responseReceived() { + if (firstResponsePerOpTimer.isRunning()) { + firstResponsePerOpTimer.stop(); + } + attemptResponseCount++; + operationResponseCount++; + } + + @Override + public void requestSent() { + // noop: no operations are client streaming + } + + @Override + public void batchRequestSent(long elementCount, long requestSize) { + // noop + } + + private TagContextBuilder newTagCtxBuilder() { + TagContextBuilder tagCtx = + tagger + .toBuilder(parentContext) + .putLocal(RpcMeasureConstants.BIGTABLE_OP, TagValue.create(spanName.toString())); + + // Copy client level tags in + for (Entry entry : statsAttributes.entrySet()) { + tagCtx.putLocal(entry.getKey(), entry.getValue()); + } + + return tagCtx; + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerFactory.java new file mode 100644 index 000000000..24b22d353 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerFactory.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.core.InternalApi; +import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.ApiTracerFactory; +import com.google.api.gax.tracing.SpanName; +import com.google.common.collect.ImmutableMap; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; + +/** + * {@link ApiTracerFactory} that will generate OpenCensus metrics by using the {@link ApiTracer} + * api. + */ +@InternalApi("For internal use only") +public class MetricsTracerFactory implements ApiTracerFactory { + private final Tagger tagger; + private final StatsRecorder stats; + private final ImmutableMap statsAttributes; + + public static MetricsTracerFactory create( + Tagger tagger, StatsRecorder stats, ImmutableMap statsAttributes) { + return new MetricsTracerFactory(tagger, stats, statsAttributes); + } + + private MetricsTracerFactory( + Tagger tagger, StatsRecorder stats, ImmutableMap statsAttributes) { + this.tagger = tagger; + this.stats = stats; + this.statsAttributes = statsAttributes; + } + + @Override + public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) { + return new MetricsTracer(operationType, tagger, stats, spanName, statsAttributes); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java index f5830d05d..8c6e347a0 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java @@ -15,53 +15,63 @@ */ package com.google.cloud.bigtable.data.v2.stub.metrics; -import io.opencensus.stats.Measure; -import io.opencensus.stats.Measure.MeasureDouble; +import com.google.api.core.InternalApi; import io.opencensus.stats.Measure.MeasureLong; import io.opencensus.tags.TagKey; -class RpcMeasureConstants { - /** - * Tag key that represents a Bigtable operation name. - * - *

A Bigtable operation consists of 1 or more RPCs. By comparing metrics tagged with {@link - * io.opencensus.contrib.grpc.metrics.RpcMeasureConstants#GRPC_CLIENT_METHOD} to methods tagged - * with {@link RpcMeasureConstants#BIGTABLE_OP}, the end user can get a sense how many attempts an - * operation took. - */ - public static final TagKey BIGTABLE_OP = TagKey.create("bigtable_op"); +@InternalApi("For internal use only") +public class RpcMeasureConstants { + // TagKeys + public static final TagKey BIGTABLE_PROJECT_ID = TagKey.create("bigtable_project_id"); + public static final TagKey BIGTABLE_INSTANCE_ID = TagKey.create("bigtable_instance_id"); + public static final TagKey BIGTABLE_APP_PROFILE_ID = TagKey.create("bigtable_app_profile_id"); + + /** Tag key that represents a Bigtable operation name. */ + static final TagKey BIGTABLE_OP = TagKey.create("bigtable_op"); /** Tag key that represents the final status of the Bigtable operation. */ - public static final TagKey BIGTABLE_STATUS = TagKey.create("bigtable_status"); + static final TagKey BIGTABLE_STATUS = TagKey.create("bigtable_status"); + // Units /** Unit to represent counts. */ private static final String COUNT = "1"; /** Unit to represent milliseconds. */ private static final String MILLISECOND = "ms"; - static final MeasureDouble BIGTABLE_OP_LATENCY = - Measure.MeasureDouble.create( + // Measurements + /** + * Latency for a logic operation, which will include latencies for each attempt and exponential + * backoff delays. + */ + static final MeasureLong BIGTABLE_OP_LATENCY = + MeasureLong.create( "cloud.google.com/java/bigtable/op_latency", "Time between request being sent to last row received, " + "or terminal error of the last retry attempt.", MILLISECOND); - static final MeasureDouble BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY = - MeasureDouble.create( + /** + * Number of attempts a logical operation took to complete. Under normal circumstances should be + * 1. + */ + static final MeasureLong BIGTABLE_OP_ATTEMPT_COUNT = + MeasureLong.MeasureLong.create( + "cloud.google.com/java/bigtable/op_attempt_count", + "Number of attempts per operation", + COUNT); + + /** Latency that a single attempt (RPC) took to complete. */ + static final MeasureLong BIGTABLE_ATTEMPT_LATENCY = + MeasureLong.create( + "cloud.google.com/java/bigtable/attempt_latency", + "Duration of an individual operation attempt", + MILLISECOND); + + /** Latency for the caller to see the first row in a ReadRows stream. */ + static final MeasureLong BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY = + MeasureLong.create( "cloud.google.com/java/bigtable/read_rows_first_row_latency", "Time between request being sent to the first row received", MILLISECOND); - - static final MeasureLong BIGTABLE_ROWS_READ_PER_OP = - Measure.MeasureLong.create( - "cloud.google.com/java/bigtable/rows_read_per_op", - "Number of rows received per ReadRows operation", - COUNT); - - static final MeasureLong BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH = - Measure.MeasureLong.create( - "cloud.google.com/java/bigtable/mutations_per_batch", - "Number of mutations per MutateRows request", - COUNT); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java index 9ee673900..d21060c4a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java @@ -15,11 +15,14 @@ */ package com.google.cloud.bigtable.data.v2.stub.metrics; -import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_INSTANCE_ID; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP_ATTEMPT_COUNT; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP_LATENCY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_PROJECT_ID; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY; -import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ROWS_READ_PER_OP; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_STATUS; import com.google.common.collect.ImmutableList; @@ -28,7 +31,6 @@ import io.opencensus.stats.Aggregation.Distribution; import io.opencensus.stats.BucketBoundaries; import io.opencensus.stats.View; -import io.opencensus.tags.TagKey; import java.util.Arrays; class RpcViewConstants { @@ -44,6 +46,13 @@ class RpcViewConstants { 250.0, 300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0))); + private static final Aggregation AGGREGATION_ATTEMPT_COUNT = + Distribution.create( + BucketBoundaries.create( + ImmutableList.of( + 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 15.0, 20.0, 30.0, 40.0, 50.0, + 100.0))); + private static final Aggregation AGGREGATION_WITH_POWERS_OF_2 = Distribution.create( BucketBoundaries.create( @@ -56,21 +65,31 @@ class RpcViewConstants { * {@link View} for Bigtable client roundtrip latency in milliseconds including all retry * attempts. */ - public static final View BIGTABLE_OP_LATENCY_VIEW = + static final View BIGTABLE_OP_LATENCY_VIEW = View.create( View.Name.create("cloud.google.com/java/bigtable/op_latency"), - "Latency in msecs", + "Operation latency in msecs", BIGTABLE_OP_LATENCY, AGGREGATION_WITH_MILLIS_HISTOGRAM, - ImmutableList.of(BIGTABLE_OP)); + ImmutableList.of( + BIGTABLE_PROJECT_ID, + BIGTABLE_INSTANCE_ID, + BIGTABLE_APP_PROFILE_ID, + BIGTABLE_OP, + BIGTABLE_STATUS)); - static final View BIGTABLE_CLIENT_COMPLETED_OP_VIEW = + static final View BIGTABLE_COMPLETED_OP_VIEW = View.create( View.Name.create("cloud.google.com/java/bigtable/completed_ops"), "Number of completed Bigtable client operations", BIGTABLE_OP_LATENCY, COUNT, - Arrays.asList(BIGTABLE_OP, BIGTABLE_STATUS)); + Arrays.asList( + BIGTABLE_PROJECT_ID, + BIGTABLE_INSTANCE_ID, + BIGTABLE_APP_PROFILE_ID, + BIGTABLE_OP, + BIGTABLE_STATUS)); static final View BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW = View.create( @@ -78,21 +97,31 @@ class RpcViewConstants { "Latency to receive the first row in a ReadRows stream", BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY, AGGREGATION_WITH_MILLIS_HISTOGRAM, - ImmutableList.of()); + ImmutableList.of(BIGTABLE_PROJECT_ID, BIGTABLE_INSTANCE_ID, BIGTABLE_APP_PROFILE_ID)); - static final View BIGTABLE_ROWS_READ_PER_OP_VIEW = + static final View BIGTABLE_ATTEMPT_LATENCY_VIEW = View.create( - View.Name.create("cloud.google.com/java/bigtable/rows_per_op"), - "Rows scanned per operation", - BIGTABLE_ROWS_READ_PER_OP, - AGGREGATION_WITH_POWERS_OF_2, - ImmutableList.of()); + View.Name.create("cloud.google.com/java/bigtable/attempt_latency"), + "Attempt latency in msecs", + BIGTABLE_ATTEMPT_LATENCY, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + ImmutableList.of( + BIGTABLE_PROJECT_ID, + BIGTABLE_INSTANCE_ID, + BIGTABLE_APP_PROFILE_ID, + BIGTABLE_OP, + BIGTABLE_STATUS)); - static final View BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH_VIEW = + static final View BIGTABLE_ATTEMPTS_PER_OP_VIEW = View.create( - View.Name.create("cloud.google.com/java/bigtable/mutations_per_batch"), - "Number of mutations sent in a single MutateRowsRequest", - BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH, - AGGREGATION_WITH_POWERS_OF_2, - ImmutableList.of()); + View.Name.create("cloud.google.com/java/bigtable/attempts_per_op"), + "Distribution of attempts per logical operation", + BIGTABLE_OP_ATTEMPT_COUNT, + AGGREGATION_ATTEMPT_COUNT, + ImmutableList.of( + BIGTABLE_PROJECT_ID, + BIGTABLE_INSTANCE_ID, + BIGTABLE_APP_PROFILE_ID, + BIGTABLE_OP, + BIGTABLE_STATUS)); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java index a8e772e3b..cc3153949 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java @@ -25,21 +25,15 @@ @BetaApi public class RpcViews { @VisibleForTesting - static final ImmutableSet BIGTABLE_CLIENT_VIEWS_SET = + private static final ImmutableSet BIGTABLE_CLIENT_VIEWS_SET = ImmutableSet.of( RpcViewConstants.BIGTABLE_OP_LATENCY_VIEW, - RpcViewConstants.BIGTABLE_CLIENT_COMPLETED_OP_VIEW, + RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW, RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW, - RpcViewConstants.BIGTABLE_ROWS_READ_PER_OP_VIEW, - RpcViewConstants.BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH_VIEW); + RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW, + RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW); - /** - * Registers all Bigtable specific views. - * - *

It is recommended to call this method and {@link - * io.opencensus.contrib.grpc.metrics.RpcViews#registerClientGrpcViews()} before doing any RPC - * call to avoid missing stats. - */ + /** Registers all Bigtable specific views. */ public static void registerBigtableClientViews() { registerBigtableClientViews(Stats.getViewManager()); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java new file mode 100644 index 000000000..cedb227ba --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java @@ -0,0 +1,174 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.ApiTracer.Scope; +import com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class CompositeTracerTest { + @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Mock private ApiTracer child1; + @Mock private ApiTracer child2; + + private CompositeTracer compositeTracer; + + @Before + public void setup() { + compositeTracer = new CompositeTracer(ImmutableList.of(child1, child2)); + } + + @Test + public void testInScope() { + Scope scope1 = mock(Scope.class); + when(child1.inScope()).thenReturn(scope1); + + Scope scope2 = mock(Scope.class); + when(child2.inScope()).thenReturn(scope2); + + Scope parentScope = compositeTracer.inScope(); + + parentScope.close(); + verify(scope1, times(1)).close(); + } + + @Test + public void testOperationSucceeded() { + compositeTracer.operationSucceeded(); + verify(child1, times(1)).operationSucceeded(); + verify(child2, times(1)).operationSucceeded(); + } + + @Test + public void testOperationCancelled() { + compositeTracer.operationCancelled(); + verify(child1, times(1)).operationCancelled(); + verify(child2, times(1)).operationCancelled(); + } + + @Test + public void testOperationFailed() { + RuntimeException error = new RuntimeException(); + compositeTracer.operationFailed(error); + verify(child1, times(1)).operationFailed(error); + verify(child2, times(1)).operationFailed(error); + } + + @Test + public void testConnectionSelected() { + compositeTracer.connectionSelected("connection-one"); + verify(child1, times(1)).connectionSelected("connection-one"); + verify(child2, times(1)).connectionSelected("connection-one"); + } + + @Test + public void testAttemptStarted() { + compositeTracer.attemptStarted(3); + verify(child1, times(1)).attemptStarted(3); + verify(child2, times(1)).attemptStarted(3); + } + + @Test + public void testAttemptSucceeded() { + compositeTracer.attemptSucceeded(); + verify(child1, times(1)).attemptSucceeded(); + verify(child2, times(1)).attemptSucceeded(); + } + + @Test + public void testAttemptCancelled() { + compositeTracer.attemptCancelled(); + verify(child1, times(1)).attemptCancelled(); + verify(child2, times(1)).attemptCancelled(); + } + + @Test + public void testAttemptFailed() { + RuntimeException error = new RuntimeException(); + Duration delay = Duration.ofMillis(10); + compositeTracer.attemptFailed(error, delay); + verify(child1, times(1)).attemptFailed(error, delay); + verify(child2, times(1)).attemptFailed(error, delay); + } + + @Test + public void testAttemptFailedRetriesExhausted() { + RuntimeException error = new RuntimeException(); + compositeTracer.attemptFailedRetriesExhausted(error); + verify(child1, times(1)).attemptFailedRetriesExhausted(error); + verify(child2, times(1)).attemptFailedRetriesExhausted(error); + } + + @Test + public void testAttemptPermanentFailure() { + RuntimeException error = new RuntimeException(); + compositeTracer.attemptPermanentFailure(error); + verify(child1, times(1)).attemptPermanentFailure(error); + verify(child2, times(1)).attemptPermanentFailure(error); + } + + @Test + public void testLroStartFailed() { + RuntimeException error = new RuntimeException(); + compositeTracer.lroStartFailed(error); + verify(child1, times(1)).lroStartFailed(error); + verify(child2, times(1)).lroStartFailed(error); + } + + @Test + public void testLroStartSucceeded() { + compositeTracer.lroStartSucceeded(); + verify(child1, times(1)).lroStartSucceeded(); + verify(child2, times(1)).lroStartSucceeded(); + } + + @Test + public void testResponseReceived() { + compositeTracer.responseReceived(); + verify(child1, times(1)).responseReceived(); + verify(child2, times(1)).responseReceived(); + } + + @Test + public void testRequestSent() { + compositeTracer.requestSent(); + verify(child1, times(1)).requestSent(); + verify(child2, times(1)).requestSent(); + } + + @Test + public void testBatchRequestSent() { + compositeTracer.batchRequestSent(2, 20); + verify(child1, times(1)).batchRequestSent(2, 20); + verify(child2, times(1)).batchRequestSent(2, 20); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasureMutateRowsCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasureMutateRowsCallableTest.java deleted file mode 100644 index 2b47282da..000000000 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasureMutateRowsCallableTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright 2019 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.stub.metrics; - -import static com.google.common.truth.Truth.assertThat; - -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutures; -import com.google.api.gax.core.FakeApiClock; -import com.google.api.gax.grpc.GrpcStatusCode; -import com.google.api.gax.rpc.ApiCallContext; -import com.google.api.gax.rpc.DeadlineExceededException; -import com.google.api.gax.rpc.UnaryCallable; -import com.google.cloud.bigtable.data.v2.models.BulkMutation; -import com.google.cloud.bigtable.data.v2.models.Mutation; -import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.FakeStatsRecorder; -import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.FakeTagger; -import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.MetricsRecord; -import io.grpc.Status.Code; -import io.opencensus.tags.TagValue; -import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; -import org.mockito.stubbing.Answer; - -@RunWith(JUnit4.class) -public class MeasureMutateRowsCallableTest { - private static final String METHOD_NAME = "Bigtable.MutateRows"; - @Rule public final MockitoRule rule = MockitoJUnit.rule(); - - private FakeTagger tagger; - - private FakeStatsRecorder statsRecorder; - - private FakeApiClock clock; - - @Mock private UnaryCallable innerCallable; - - private MeasuredMutateRowsCallable callable; - - @Before - public void setUp() { - tagger = new FakeTagger(); - statsRecorder = new FakeStatsRecorder(); - clock = new FakeApiClock(0); - - callable = - new MeasuredMutateRowsCallable(innerCallable, METHOD_NAME, tagger, statsRecorder, clock); - } - - @Test - public void testOk() { - Mockito.when( - innerCallable.futureCall( - Mockito.any(BulkMutation.class), Mockito.any(ApiCallContext.class))) - .thenAnswer( - new Answer>() { - @Override - public ApiFuture answer(InvocationOnMock invocationOnMock) { - clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(3)); - return ApiFutures.immediateFuture(null); - } - }); - - callable.call( - BulkMutation.create("tableID") - .add("rowKey", Mutation.create()) - .add("rowKey2", Mutation.create())); - - MetricsRecord metricsRecord = statsRecorder.pollRecord(); - - assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 3.0); - assertThat(metricsRecord.metrics) - .containsEntry(RpcMeasureConstants.BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH, 2L); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create(METHOD_NAME)); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")); - } - - @Test - public void testFailure() { - Mockito.when( - innerCallable.futureCall( - Mockito.any(BulkMutation.class), Mockito.any(ApiCallContext.class))) - .thenAnswer( - new Answer>() { - @Override - public ApiFuture answer(InvocationOnMock invocationOnMock) { - clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(3)); - return ApiFutures.immediateFailedFuture( - new DeadlineExceededException( - "timeout!", null, GrpcStatusCode.of(Code.DEADLINE_EXCEEDED), true)); - } - }); - - Throwable actualError = null; - - try { - callable.call( - BulkMutation.create("tableID") - .add("rowKey", Mutation.create()) - .add("rowKey2", Mutation.create())); - } catch (Throwable e) { - actualError = e; - } - - assertThat(actualError).isInstanceOf(DeadlineExceededException.class); - - MetricsRecord metricsRecord = statsRecorder.pollRecord(); - - assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 3.0); - assertThat(metricsRecord.metrics) - .containsEntry(RpcMeasureConstants.BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH, 2L); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create(METHOD_NAME)); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("DEADLINE_EXCEEDED")); - } -} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallableTest.java deleted file mode 100644 index 2c0863895..000000000 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallableTest.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Copyright 2019 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.stub.metrics; - -import static com.google.common.truth.Truth.assertThat; - -import com.google.api.gax.core.FakeApiClock; -import com.google.api.gax.grpc.GrpcStatusCode; -import com.google.api.gax.rpc.DeadlineExceededException; -import com.google.cloud.bigtable.data.v2.models.Query; -import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.FakeStatsRecorder; -import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.FakeTagger; -import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.MetricsRecord; -import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCall; -import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCallable; -import io.grpc.Status.Code; -import io.opencensus.tags.TagValue; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; - -@RunWith(JUnit4.class) -public class MeasuredReadRowsCallableTest { - @Rule public final MockitoRule rule = MockitoJUnit.rule(); - - private FakeTagger tagger; - - private FakeStatsRecorder statsRecorder; - - private FakeApiClock clock; - - private MockServerStreamingCallable innerCallable; - - private MeasuredReadRowsCallable callable; - - @Before - public void setUp() { - innerCallable = new MockServerStreamingCallable<>(); - - tagger = new FakeTagger(); - statsRecorder = new FakeStatsRecorder(); - clock = new FakeApiClock(0); - - callable = - new MeasuredReadRowsCallable<>( - innerCallable, "Bigtable.ReadRows", tagger, statsRecorder, clock); - } - - @Test - public void testOk() { - new Thread() { - @Override - public void run() { - MockServerStreamingCall lastCall = null; - - for (int i = 0; i < 10 && lastCall == null; i++) { - lastCall = innerCallable.popLastCall(); - } - clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(2)); - lastCall.getController().popLastPull(); - lastCall.getController().getObserver().onResponse("row0"); - - clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(3)); - lastCall.getController().popLastPull(); - lastCall.getController().getObserver().onResponse("row1"); - lastCall.getController().getObserver().onComplete(); - } - }.start(); - - List results = callable.all().call(Query.create("fake-table")); - - assertThat(results).containsExactly("row0", "row1"); - - MetricsRecord metricsRecord = statsRecorder.pollRecord(); - - assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 5.0); - assertThat(metricsRecord.metrics) - .containsEntry(RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY, 2.0); - assertThat(metricsRecord.metrics) - .containsEntry(RpcMeasureConstants.BIGTABLE_ROWS_READ_PER_OP, 2L); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")); - } - - @Test - public void testEmpty() { - new Thread() { - @Override - public void run() { - MockServerStreamingCall lastCall = null; - - for (int i = 0; i < 10 && lastCall == null; i++) { - lastCall = innerCallable.popLastCall(); - } - clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(2)); - lastCall.getController().getObserver().onComplete(); - } - }.start(); - - List results = callable.all().call(Query.create("fake-table")); - - assertThat(results).isEmpty(); - - MetricsRecord metricsRecord = statsRecorder.pollRecord(); - - assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 2.0); - assertThat(metricsRecord.metrics) - .doesNotContainKey(RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY); - assertThat(metricsRecord.metrics) - .containsEntry(RpcMeasureConstants.BIGTABLE_ROWS_READ_PER_OP, 0L); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")); - } - - @Test - public void testFailure() { - new Thread() { - @Override - public void run() { - MockServerStreamingCall lastCall = null; - - for (int i = 0; i < 10 && lastCall == null; i++) { - lastCall = innerCallable.popLastCall(); - } - clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(2)); - lastCall - .getController() - .getObserver() - .onError( - new DeadlineExceededException( - "timeout!", null, GrpcStatusCode.of(Code.DEADLINE_EXCEEDED), true)); - } - }.start(); - - Throwable actualError = null; - try { - callable.all().call(Query.create("fake-table")); - } catch (Throwable e) { - actualError = e; - } - - assertThat(actualError).isInstanceOf(DeadlineExceededException.class); - - MetricsRecord metricsRecord = statsRecorder.pollRecord(); - - assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 2.0); - assertThat(metricsRecord.metrics) - .doesNotContainKey(RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY); - assertThat(metricsRecord.metrics) - .containsEntry(RpcMeasureConstants.BIGTABLE_ROWS_READ_PER_OP, 0L); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("DEADLINE_EXCEEDED")); - } - - @Test - public void testFailureAfterData() { - new Thread() { - @Override - public void run() { - MockServerStreamingCall lastCall = null; - - for (int i = 0; i < 10 && lastCall == null; i++) { - lastCall = innerCallable.popLastCall(); - } - clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(2)); - lastCall.getController().popLastPull(); - lastCall.getController().getObserver().onResponse("row0"); - - clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(3)); - lastCall - .getController() - .getObserver() - .onError( - new DeadlineExceededException( - "timeout!", null, GrpcStatusCode.of(Code.DEADLINE_EXCEEDED), true)); - } - }.start(); - - Throwable actualError = null; - try { - callable.all().call(Query.create("fake-table")); - } catch (Throwable e) { - actualError = e; - } - - assertThat(actualError).isInstanceOf(DeadlineExceededException.class); - - MetricsRecord metricsRecord = statsRecorder.pollRecord(); - - assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 5.0); - assertThat(metricsRecord.metrics) - .containsEntry(RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY, 2.0); - assertThat(metricsRecord.metrics) - .containsEntry(RpcMeasureConstants.BIGTABLE_ROWS_READ_PER_OP, 1L); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("DEADLINE_EXCEEDED")); - } -} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallableTest.java deleted file mode 100644 index 41006148f..000000000 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallableTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright 2019 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.stub.metrics; - -import static com.google.common.truth.Truth.assertThat; - -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutures; -import com.google.api.gax.core.FakeApiClock; -import com.google.api.gax.grpc.GrpcStatusCode; -import com.google.api.gax.rpc.ApiCallContext; -import com.google.api.gax.rpc.DeadlineExceededException; -import com.google.api.gax.rpc.UnaryCallable; -import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.FakeStatsRecorder; -import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.FakeTagger; -import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.MetricsRecord; -import io.grpc.Status.Code; -import io.opencensus.tags.TagValue; -import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; -import org.mockito.stubbing.Answer; - -@RunWith(JUnit4.class) -public class MeasuredUnaryCallableTest { - private static final String FAKE_METHOD = "Bigtable.FakeMethod"; - - @Rule public final MockitoRule rule = MockitoJUnit.rule(); - - private FakeTagger tagger; - - private FakeStatsRecorder statsRecorder; - - private FakeApiClock clock; - - @Mock private UnaryCallable innerCallable; - - private MeasuredUnaryCallable callable; - - @Before - public void setUp() { - tagger = new FakeTagger(); - statsRecorder = new FakeStatsRecorder(); - clock = new FakeApiClock(0); - - callable = - new MeasuredUnaryCallable<>(innerCallable, FAKE_METHOD, tagger, statsRecorder, clock); - } - - @Test - public void testOk() { - Mockito.when(innerCallable.futureCall(Mockito.anyString(), Mockito.any(ApiCallContext.class))) - .thenAnswer( - new Answer>() { - @Override - public ApiFuture answer(InvocationOnMock invocationOnMock) { - clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(2)); - return ApiFutures.immediateFuture("response"); - } - }); - - String response = callable.call("request"); - - assertThat(response).isEqualTo("response"); - - MetricsRecord metricsRecord = statsRecorder.pollRecord(); - - assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 2.0); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create(FAKE_METHOD)); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")); - } - - @Test - public void testFailure() { - Mockito.when(innerCallable.futureCall(Mockito.anyString(), Mockito.any(ApiCallContext.class))) - .thenAnswer( - new Answer>() { - @Override - public ApiFuture answer(InvocationOnMock invocationOnMock) { - clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(2)); - return ApiFutures.immediateFailedFuture( - new DeadlineExceededException( - "timeout!", null, GrpcStatusCode.of(Code.DEADLINE_EXCEEDED), true)); - } - }); - - Throwable actualError = null; - try { - callable.call("request"); - } catch (Throwable e) { - actualError = e; - } - - assertThat(actualError).isInstanceOf(DeadlineExceededException.class); - - MetricsRecord metricsRecord = statsRecorder.pollRecord(); - - assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 2.0); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create(FAKE_METHOD)); - assertThat(metricsRecord.tags) - .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("DEADLINE_EXCEEDED")); - } -} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java new file mode 100644 index 000000000..9314391af --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java @@ -0,0 +1,401 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; + +import com.google.api.gax.rpc.ClientContext; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.ReadRowsResponse.CellChunk; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.StringValue; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import io.opencensus.common.Function; +import io.opencensus.impl.stats.StatsComponentImpl; +import io.opencensus.stats.AggregationData; +import io.opencensus.stats.AggregationData.CountData; +import io.opencensus.stats.AggregationData.DistributionData; +import io.opencensus.stats.AggregationData.LastValueDataDouble; +import io.opencensus.stats.AggregationData.LastValueDataLong; +import io.opencensus.stats.AggregationData.SumDataDouble; +import io.opencensus.stats.AggregationData.SumDataLong; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tags; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.mockito.stubbing.Answer; + +@RunWith(JUnit4.class) +public class MetricsTracerTest { + private static final String PROJECT_ID = "fake-project"; + private static final String INSTANCE_ID = "fake-instance"; + private static final String APP_PROFILE_ID = "default"; + private static final String TABLE_ID = "fake-table"; + + private static final ReadRowsResponse DEFAULT_READ_ROWS_RESPONSES = + ReadRowsResponse.newBuilder() + .addChunks( + CellChunk.newBuilder() + .setRowKey(ByteString.copyFromUtf8("fake-key")) + .setFamilyName(StringValue.of("cf")) + .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q"))) + .setTimestampMicros(1_000) + .setValue(ByteString.copyFromUtf8("value")) + .setCommitRow(true)) + .build(); + + @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + + private Server server; + + @Mock(answer = Answers.CALLS_REAL_METHODS) + private BigtableGrpc.BigtableImplBase mockService; + + private StatsComponentImpl localStats = new StatsComponentImpl(); + private EnhancedBigtableStub stub; + + @Before + public void setUp() throws Exception { + int port; + try (ServerSocket ss = new ServerSocket(0)) { + port = ss.getLocalPort(); + } + server = ServerBuilder.forPort(port).addService(mockService).build(); + server.start(); + + RpcViews.registerBigtableClientViews(localStats.getViewManager()); + + BigtableDataSettings settings = + BigtableDataSettings.newBuilderForEmulator(port) + .setProjectId(PROJECT_ID) + .setInstanceId(INSTANCE_ID) + .setAppProfileId(APP_PROFILE_ID) + .build(); + EnhancedBigtableStubSettings stubSettings = settings.getStubSettings(); + + stub = + new EnhancedBigtableStub( + stubSettings, + ClientContext.create(stubSettings), + Tags.getTagger(), + localStats.getStatsRecorder()); + } + + @After + public void tearDown() { + stub.close(); + server.shutdown(); + } + + @Test + public void testReadRowsLatency() throws InterruptedException { + final long sleepTime = 50; + + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + Thread.sleep(sleepTime); + observer.onNext(DEFAULT_READ_ROWS_RESPONSES); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .readRows(any(ReadRowsRequest.class), anyObserver(ReadRowsResponse.class)); + + Stopwatch stopwatch = Stopwatch.createStarted(); + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + // Give OpenCensus a chance to update the views asynchronously. + Thread.sleep(100); + + long opLatency = + getAggregationValueAsLong( + RpcViewConstants.BIGTABLE_OP_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK"))); + assertThat(opLatency).isIn(Range.closed(sleepTime, elapsed)); + } + + @Test + public void testReadRowsOpCount() throws InterruptedException { + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(DEFAULT_READ_ROWS_RESPONSES); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .readRows(any(ReadRowsRequest.class), anyObserver(ReadRowsResponse.class)); + + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); + + // Give OpenCensus a chance to update the views asynchronously. + Thread.sleep(100); + + long opLatency = + getAggregationValueAsLong( + RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK"))); + assertThat(opLatency).isEqualTo(2); + } + + @Test + public void testReadRowsFirstRow() throws InterruptedException { + final long beforeSleep = 50; + final long afterSleep = 50; + + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + Thread.sleep(beforeSleep); + observer.onNext(DEFAULT_READ_ROWS_RESPONSES); + Thread.sleep(afterSleep); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .readRows(any(ReadRowsRequest.class), anyObserver(ReadRowsResponse.class)); + + Stopwatch stopwatch = Stopwatch.createStarted(); + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + // Give OpenCensus a chance to update the views asynchronously. + Thread.sleep(100); + + long firstRowLatency = + getAggregationValueAsLong( + RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW, + ImmutableMap.of()); + assertThat(firstRowLatency).isIn(Range.closed(beforeSleep, elapsed - afterSleep)); + } + + @Test + public void testReadRowsAttemptsPerOp() throws InterruptedException { + final AtomicInteger callCount = new AtomicInteger(0); + + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + + // First call will trigger a transient error + if (callCount.getAndIncrement() == 0) { + observer.onError(new StatusRuntimeException(Status.UNAVAILABLE)); + return null; + } + + // Next attempt will return a row + observer.onNext(DEFAULT_READ_ROWS_RESPONSES); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .readRows(any(ReadRowsRequest.class), anyObserver(ReadRowsResponse.class)); + + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); + + // Give OpenCensus a chance to update the views asynchronously. + Thread.sleep(100); + + long opLatency = + getAggregationValueAsLong( + RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK"))); + assertThat(opLatency).isEqualTo(2); + } + + @Test + public void testReadRowsAttemptLatency() throws InterruptedException { + final long sleepTime = 50; + final AtomicInteger callCount = new AtomicInteger(0); + + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + @SuppressWarnings("unchecked") + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + + Thread.sleep(sleepTime); + + // First attempt will return a transient error + if (callCount.getAndIncrement() == 0) { + observer.onError(new StatusRuntimeException(Status.UNAVAILABLE)); + return null; + } + // Next attempt will be ok + observer.onNext(DEFAULT_READ_ROWS_RESPONSES); + observer.onCompleted(); + return null; + } + }) + .when(mockService) + .readRows(any(ReadRowsRequest.class), anyObserver(ReadRowsResponse.class)); + + Stopwatch stopwatch = Stopwatch.createStarted(); + Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID))); + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + // Give OpenCensus a chance to update the views asynchronously. + Thread.sleep(100); + + long attemptLatency = + getAggregationValueAsLong( + RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"), + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK"))); + // Average attempt latency will be just a single wait (as opposed to op latency which will be 2x + // sleeptime) + assertThat(attemptLatency).isIn(Range.closed(sleepTime, elapsed - sleepTime)); + } + + @SuppressWarnings("unchecked") + private static StreamObserver anyObserver(Class returnType) { + return (StreamObserver) any(returnType); + } + + private long getAggregationValueAsLong(View view, ImmutableMap tags) { + ViewData viewData = localStats.getViewManager().getView(view.getName()); + Map, AggregationData> aggregationMap = + Objects.requireNonNull(viewData).getAggregationMap(); + + List tagValues = new ArrayList<>(); + + for (TagKey column : view.getColumns()) { + if (RpcMeasureConstants.BIGTABLE_PROJECT_ID == column) { + tagValues.add(TagValue.create(PROJECT_ID)); + } else if (RpcMeasureConstants.BIGTABLE_INSTANCE_ID == column) { + tagValues.add(TagValue.create(INSTANCE_ID)); + } else if (RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID == column) { + tagValues.add(TagValue.create(APP_PROFILE_ID)); + } else { + tagValues.add(tags.get(column)); + } + } + + AggregationData aggregationData = aggregationMap.get(tagValues); + + return aggregationData.match( + new Function() { + @Override + public Long apply(SumDataDouble arg) { + return (long) arg.getSum(); + } + }, + new Function() { + @Override + public Long apply(SumDataLong arg) { + return arg.getSum(); + } + }, + new Function() { + @Override + public Long apply(CountData arg) { + return arg.getCount(); + } + }, + new Function() { + @Override + public Long apply(DistributionData arg) { + return (long) arg.getMean(); + } + }, + new Function() { + @Override + public Long apply(LastValueDataDouble arg) { + return (long) arg.getLastValue(); + } + }, + new Function() { + @Override + public Long apply(LastValueDataLong arg) { + return arg.getLastValue(); + } + }, + new Function() { + @Override + public Long apply(AggregationData arg) { + throw new UnsupportedOperationException(); + } + }); + } +}