childScopes = new ArrayList<>(children.size());
+
+ for (ApiTracer child : children) {
+ childScopes.add(child.inScope());
+ }
+
+ return new Scope() {
+ @Override
+ public void close() {
+ for (Scope childScope : childScopes) {
+ childScope.close();
+ }
+ }
+ };
+ }
+
+ @Override
+ public void operationSucceeded() {
+ for (ApiTracer child : children) {
+ child.operationSucceeded();
+ }
+ }
+
+ @Override
+ public void operationCancelled() {
+ for (ApiTracer child : children) {
+ child.operationCancelled();
+ }
+ }
+
+ @Override
+ public void operationFailed(Throwable error) {
+ for (ApiTracer child : children) {
+ child.operationFailed(error);
+ }
+ }
+
+ @Override
+ public void connectionSelected(String id) {
+ for (ApiTracer child : children) {
+ child.connectionSelected(id);
+ }
+ }
+
+ @Override
+ public void attemptStarted(int attemptNumber) {
+ for (ApiTracer child : children) {
+ child.attemptStarted(attemptNumber);
+ }
+ }
+
+ @Override
+ public void attemptSucceeded() {
+ for (ApiTracer child : children) {
+ child.attemptSucceeded();
+ }
+ }
+
+ @Override
+ public void attemptCancelled() {
+ for (ApiTracer child : children) {
+ child.attemptCancelled();
+ }
+ }
+
+ @Override
+ public void attemptFailed(Throwable error, Duration delay) {
+ for (ApiTracer child : children) {
+ child.attemptFailed(error, delay);
+ }
+ }
+
+ @Override
+ public void attemptFailedRetriesExhausted(Throwable error) {
+ for (ApiTracer child : children) {
+ child.attemptFailedRetriesExhausted(error);
+ }
+ }
+
+ @Override
+ public void attemptPermanentFailure(Throwable error) {
+ for (ApiTracer child : children) {
+ child.attemptPermanentFailure(error);
+ }
+ }
+
+ @Override
+ public void lroStartFailed(Throwable error) {
+ for (ApiTracer child : children) {
+ child.lroStartFailed(error);
+ }
+ }
+
+ @Override
+ public void lroStartSucceeded() {
+ for (ApiTracer child : children) {
+ child.lroStartSucceeded();
+ }
+ }
+
+ @Override
+ public void responseReceived() {
+ for (ApiTracer child : children) {
+ child.responseReceived();
+ }
+ }
+
+ @Override
+ public void requestSent() {
+ for (ApiTracer child : children) {
+ child.requestSent();
+ }
+ }
+
+ @Override
+ public void batchRequestSent(long elementCount, long requestSize) {
+ for (ApiTracer child : children) {
+ child.batchRequestSent(elementCount, requestSize);
+ }
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/tracing/WrappedTracerFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory.java
similarity index 50%
rename from google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/tracing/WrappedTracerFactory.java
rename to google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory.java
index 253d7a207..e2e399ae3 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/tracing/WrappedTracerFactory.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2019 Google LLC
+ * Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,36 +13,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.google.cloud.bigtable.gaxx.tracing;
+package com.google.cloud.bigtable.data.v2.stub.metrics;
import com.google.api.core.InternalApi;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.api.gax.tracing.SpanName;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
-/**
- * Simple wrapper around {@link ApiTracerFactory} to augment the client name of the generated
- * traces.
- *
- * This is used to disambiguate traces in underlying GAPIC client from the manually written
- * overlay.
- *
- *
For internal use, public for technical reasons.
- */
-@InternalApi
-public class WrappedTracerFactory implements ApiTracerFactory {
- private final ApiTracerFactory innerFactory;
- private final String clientName;
+/** Combines multiple {@link ApiTracerFactory} into a single {@link ApiTracerFactory}. */
+@InternalApi("For internal use only")
+public class CompositeTracerFactory implements ApiTracerFactory {
+ private final List apiTracerFactories;
- public WrappedTracerFactory(ApiTracerFactory tracerFactory, String clientName) {
- this.innerFactory = tracerFactory;
- this.clientName = clientName;
+ public CompositeTracerFactory(List apiTracerFactories) {
+ this.apiTracerFactories = ImmutableList.copyOf(apiTracerFactories);
}
@Override
public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) {
- spanName = SpanName.of(clientName, spanName.getMethodName());
+ List children = new ArrayList<>(apiTracerFactories.size());
- return innerFactory.newTracer(parent, spanName, operationType);
+ for (ApiTracerFactory factory : apiTracerFactories) {
+ children.add(factory.newTracer(parent, spanName, operationType));
+ }
+ return new CompositeTracer(children);
}
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredMutateRowsCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredMutateRowsCallable.java
deleted file mode 100644
index 11878635f..000000000
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredMutateRowsCallable.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright 2019 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub.metrics;
-
-import com.google.api.core.ApiClock;
-import com.google.api.core.ApiFuture;
-import com.google.api.core.InternalApi;
-import com.google.api.gax.rpc.ApiCallContext;
-import com.google.api.gax.rpc.UnaryCallable;
-import com.google.cloud.bigtable.data.v2.models.BulkMutation;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.opencensus.stats.StatsRecorder;
-import io.opencensus.tags.TagContext;
-import io.opencensus.tags.TagValue;
-import io.opencensus.tags.Tagger;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-
-/**
- * This callable will instrument MutateRows invocations using OpenCensus stats.
- *
- * Recorded stats:
- *
- *
- * - {@link RpcMeasureConstants#BIGTABLE_OP_LATENCY}
- *
- the total time it took the operation across all of its retry attempts to complete.
- *
- {@link RpcMeasureConstants#BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH}
- *
- the number of mutations sent per batch operation. Retry attempts might have few entries.
- *
- *
- * For internal use only.
- */
-@InternalApi
-public class MeasuredMutateRowsCallable extends UnaryCallable {
- private final UnaryCallable innerCallable;
- private final TagValue methodName;
- private final TagContext parentCtx;
- private final Tagger tagger;
- private final StatsRecorder stats;
- private final ApiClock clock;
-
- @InternalApi
- public MeasuredMutateRowsCallable(
- @Nonnull UnaryCallable innerCallable,
- @Nonnull String methodName,
- @Nonnull Tagger tagger,
- @Nonnull StatsRecorder stats,
- @Nonnull ApiClock clock) {
- this.innerCallable = Preconditions.checkNotNull(innerCallable, "innerCallable");
- this.methodName = TagValue.create(Preconditions.checkNotNull(methodName, "methodName"));
- this.tagger = Preconditions.checkNotNull(tagger, "tagger");
- this.parentCtx = tagger.getCurrentTagContext();
- this.stats = Preconditions.checkNotNull(stats, "stats");
- this.clock = Preconditions.checkNotNull(clock, "clock");
- }
-
- @Override
- public ApiFuture futureCall(BulkMutation request, ApiCallContext context) {
- long operationStartTime = clock.nanoTime();
-
- final ApiFuture future = innerCallable.futureCall(request, context);
- future.addListener(
- new StatsRecordingRunnable(future, operationStartTime, request.getEntryCount()),
- MoreExecutors.directExecutor());
- return future;
- }
-
- private class StatsRecordingRunnable implements Runnable {
- private final Future> operationFuture;
- private final long operationStart;
- private final long numEntries;
-
- private StatsRecordingRunnable(
- @Nonnull Future> operationFuture, long operationStartTime, long numEntries) {
- this.operationFuture = Preconditions.checkNotNull(operationFuture, "operationFuture");
- this.operationStart = operationStartTime;
- this.numEntries = numEntries;
- }
-
- @Override
- public void run() {
- long elapsed = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - operationStart);
-
- stats
- .newMeasureMap()
- .put(RpcMeasureConstants.BIGTABLE_OP_LATENCY, elapsed)
- .put(RpcMeasureConstants.BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH, numEntries)
- .record(
- tagger
- .toBuilder(parentCtx)
- .putLocal(RpcMeasureConstants.BIGTABLE_OP, methodName)
- .putLocal(
- RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(operationFuture))
- .build());
- }
- }
-}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallable.java
deleted file mode 100644
index 0ac3777ea..000000000
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallable.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Copyright 2019 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub.metrics;
-
-import com.google.api.core.ApiClock;
-import com.google.api.core.InternalApi;
-import com.google.api.gax.rpc.ApiCallContext;
-import com.google.api.gax.rpc.ResponseObserver;
-import com.google.api.gax.rpc.ServerStreamingCallable;
-import com.google.api.gax.rpc.StreamController;
-import com.google.cloud.bigtable.data.v2.models.Query;
-import com.google.common.base.Preconditions;
-import io.opencensus.stats.MeasureMap;
-import io.opencensus.stats.StatsRecorder;
-import io.opencensus.tags.TagContext;
-import io.opencensus.tags.TagValue;
-import io.opencensus.tags.Tagger;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-/**
- * This callable will instrument ReadRows invocations using OpenCensus stats.
- *
- * Recorded stats:
- *
- *
- * - {@link RpcMeasureConstants#BIGTABLE_OP_LATENCY}
- *
- the total time it took the operation across all of its retry attempts to complete.
- *
- {@link RpcMeasureConstants#BIGTABLE_ROWS_READ_PER_OP}
- *
- the number of rows received across all of the retries for each invocation.
- *
- {@link RpcMeasureConstants#BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY}
- *
- the amount of time it took the caller to receive the first row.
- *
- *
- * For internal use only.
- */
-@InternalApi
-public class MeasuredReadRowsCallable extends ServerStreamingCallable {
- private final ServerStreamingCallable innerCallable;
-
- private final TagValue methodName;
- private final TagContext parentCtx;
-
- private final Tagger tagger;
- private final StatsRecorder stats;
- private final ApiClock clock;
-
- public MeasuredReadRowsCallable(
- @Nonnull ServerStreamingCallable innerCallable,
- @Nonnull String methodName,
- @Nonnull Tagger tagger,
- @Nonnull StatsRecorder stats,
- @Nonnull ApiClock clock) {
- this.innerCallable = Preconditions.checkNotNull(innerCallable, "innerCallable");
- this.methodName = TagValue.create(Preconditions.checkNotNull(methodName, "methodName"));
- this.tagger = Preconditions.checkNotNull(tagger, "tagger");
- this.parentCtx = tagger.getCurrentTagContext();
- this.stats = Preconditions.checkNotNull(stats, "stats");
- this.clock = Preconditions.checkNotNull(clock, "clock");
- }
-
- @Override
- public void call(Query request, ResponseObserver outerObserver, ApiCallContext context) {
- innerCallable.call(request, new MeasuredResponseObserver(outerObserver), context);
- }
-
- private class MeasuredResponseObserver implements ResponseObserver {
- private final ResponseObserver outerResponseObserver;
-
- private final long operationStart;
- private Long firstRowReceivedAt = null;
- private long rowsRead = 0;
-
- private MeasuredResponseObserver(@Nonnull ResponseObserver outerResponseObserver) {
- this.outerResponseObserver =
- Preconditions.checkNotNull(outerResponseObserver, "outerResponseObserver");
- this.operationStart = clock.nanoTime();
- }
-
- @Override
- public void onStart(StreamController controller) {
- outerResponseObserver.onStart(controller);
- }
-
- @Override
- public void onResponse(RowT row) {
- if (firstRowReceivedAt == null) {
- firstRowReceivedAt = clock.nanoTime();
- }
- rowsRead++;
- outerResponseObserver.onResponse(row);
- }
-
- @Override
- public void onError(Throwable t) {
- recordStats(t);
- outerResponseObserver.onError(t);
- }
-
- @Override
- public void onComplete() {
- recordStats(null);
- outerResponseObserver.onComplete();
- }
-
- private void recordStats(@Nullable Throwable error) {
- long now = clock.nanoTime();
- long elapsed = TimeUnit.NANOSECONDS.toMillis(now - operationStart);
-
- MeasureMap measures =
- stats
- .newMeasureMap()
- .put(RpcMeasureConstants.BIGTABLE_OP_LATENCY, elapsed)
- .put(RpcMeasureConstants.BIGTABLE_ROWS_READ_PER_OP, rowsRead);
-
- if (firstRowReceivedAt != null) {
- long firstRowLatency = TimeUnit.NANOSECONDS.toMillis(firstRowReceivedAt - operationStart);
- measures.put(RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY, firstRowLatency);
- }
-
- measures.record(
- tagger
- .toBuilder(parentCtx)
- .putLocal(RpcMeasureConstants.BIGTABLE_OP, methodName)
- .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(error))
- .build());
- }
- }
-}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallable.java
deleted file mode 100644
index aade43fd2..000000000
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallable.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Copyright 2019 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub.metrics;
-
-import com.google.api.core.ApiClock;
-import com.google.api.core.ApiFuture;
-import com.google.api.core.InternalApi;
-import com.google.api.gax.rpc.ApiCallContext;
-import com.google.api.gax.rpc.UnaryCallable;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.opencensus.stats.StatsRecorder;
-import io.opencensus.tags.TagContext;
-import io.opencensus.tags.TagValue;
-import io.opencensus.tags.Tagger;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-
-/**
- * This callable will instrument callable invocations using OpenCensus stats.
- *
- * Recorded stats:
- *
- *
- * - {@link RpcMeasureConstants#BIGTABLE_OP_LATENCY}
- *
- the total time it took the operation across all of its retry attempts to complete
- *
- *
- * For internal use only.
- */
-@InternalApi
-public class MeasuredUnaryCallable extends UnaryCallable {
- private final UnaryCallable innerCallable;
-
- private final TagValue methodName;
- private final TagContext parentCtx;
-
- private final Tagger tagger;
- private final StatsRecorder stats;
- private final ApiClock clock;
-
- public MeasuredUnaryCallable(
- @Nonnull UnaryCallable innerCallable,
- @Nonnull String methodName,
- @Nonnull Tagger tagger,
- @Nonnull StatsRecorder stats,
- @Nonnull ApiClock clock) {
- this.innerCallable = Preconditions.checkNotNull(innerCallable, "innerCallable");
- this.methodName = TagValue.create(Preconditions.checkNotNull(methodName, "methodName"));
- this.tagger = Preconditions.checkNotNull(tagger, "tagger");
- this.parentCtx = tagger.getCurrentTagContext();
- this.stats = Preconditions.checkNotNull(stats, "stats");
- this.clock = Preconditions.checkNotNull(clock, "clock");
- }
-
- @Override
- public ApiFuture futureCall(RequestT request, ApiCallContext context) {
- long startTime = clock.nanoTime();
- ApiFuture future = innerCallable.futureCall(request, context);
- future.addListener(
- new StatsRecordingRunnable(future, startTime), MoreExecutors.directExecutor());
- return future;
- }
-
- private class StatsRecordingRunnable implements Runnable {
- private final Future> operationFuture;
- private final long operationStart;
-
- private StatsRecordingRunnable(@Nonnull Future> operationFuture, long startTime) {
- this.operationFuture = Preconditions.checkNotNull(operationFuture, "operationFuture");
- this.operationStart = startTime;
- }
-
- @Override
- public void run() {
- long elapsed = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - operationStart);
-
- stats
- .newMeasureMap()
- .put(RpcMeasureConstants.BIGTABLE_OP_LATENCY, elapsed)
- .record(
- tagger
- .toBuilder(parentCtx)
- .putLocal(RpcMeasureConstants.BIGTABLE_OP, methodName)
- .putLocal(
- RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(operationFuture))
- .build());
- }
- }
-}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java
new file mode 100644
index 000000000..864ba7502
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.api.gax.tracing.ApiTracer;
+import com.google.api.gax.tracing.ApiTracerFactory.OperationType;
+import com.google.api.gax.tracing.SpanName;
+import com.google.common.base.Stopwatch;
+import io.opencensus.stats.MeasureMap;
+import io.opencensus.stats.StatsRecorder;
+import io.opencensus.tags.TagContext;
+import io.opencensus.tags.TagContextBuilder;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import io.opencensus.tags.Tagger;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.threeten.bp.Duration;
+
+class MetricsTracer implements ApiTracer {
+ private final OperationType operationType;
+
+ private final Tagger tagger;
+ private final StatsRecorder stats;
+
+ // Tags
+ private final TagContext parentContext;
+ private final SpanName spanName;
+ private final Map statsAttributes;
+
+ // Operation level metrics
+ private final AtomicBoolean opFinished = new AtomicBoolean();
+ private final Stopwatch operationTimer = Stopwatch.createStarted();
+ private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted();
+ private long operationResponseCount = 0;
+
+ // Attempt level metrics
+ private int attemptCount = 0;
+ private Stopwatch attemptTimer;
+ private long attemptResponseCount = 0;
+
+ MetricsTracer(
+ OperationType operationType,
+ Tagger tagger,
+ StatsRecorder stats,
+ SpanName spanName,
+ Map statsAttributes) {
+ this.operationType = operationType;
+ this.tagger = tagger;
+ this.stats = stats;
+ this.parentContext = tagger.getCurrentTagContext();
+ this.spanName = spanName;
+ this.statsAttributes = statsAttributes;
+ }
+
+ @Override
+ public Scope inScope() {
+ return new Scope() {
+ @Override
+ public void close() {}
+ };
+ }
+
+ @Override
+ public void operationSucceeded() {
+ recordOperationCompletion(null);
+ }
+
+ @Override
+ public void operationCancelled() {
+ recordOperationCompletion(new CancellationException());
+ }
+
+ @Override
+ public void operationFailed(Throwable throwable) {
+ recordOperationCompletion(throwable);
+ }
+
+ private void recordOperationCompletion(@Nullable Throwable throwable) {
+ if (!opFinished.compareAndSet(false, true)) {
+ return;
+ }
+ operationTimer.stop();
+
+ long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS);
+
+ MeasureMap measures =
+ stats
+ .newMeasureMap()
+ .put(RpcMeasureConstants.BIGTABLE_OP_LATENCY, elapsed)
+ .put(RpcMeasureConstants.BIGTABLE_OP_ATTEMPT_COUNT, attemptCount);
+
+ if (operationType == OperationType.ServerStreaming
+ && spanName.getMethodName().equals("ReadRows")) {
+ measures.put(
+ RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY,
+ firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS));
+ }
+
+ TagContextBuilder tagCtx =
+ newTagCtxBuilder()
+ .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable));
+
+ measures.record(tagCtx.build());
+ }
+
+ @Override
+ public void connectionSelected(String s) {
+ // noop: cardinality for connection ids is too high to use as tags
+ }
+
+ @Override
+ public void attemptStarted(int i) {
+ attemptCount++;
+ attemptTimer = Stopwatch.createStarted();
+ attemptResponseCount = 0;
+ }
+
+ @Override
+ public void attemptSucceeded() {
+ recordAttemptCompletion(null);
+ }
+
+ @Override
+ public void attemptCancelled() {
+ recordAttemptCompletion(new CancellationException());
+ }
+
+ @Override
+ public void attemptFailed(Throwable throwable, Duration duration) {
+ recordAttemptCompletion(throwable);
+ }
+
+ @Override
+ public void attemptFailedRetriesExhausted(Throwable throwable) {
+ recordAttemptCompletion(throwable);
+ }
+
+ @Override
+ public void attemptPermanentFailure(Throwable throwable) {
+ recordAttemptCompletion(throwable);
+ }
+
+ private void recordAttemptCompletion(@Nullable Throwable throwable) {
+ MeasureMap measures =
+ stats
+ .newMeasureMap()
+ .put(
+ RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY,
+ attemptTimer.elapsed(TimeUnit.MILLISECONDS));
+
+ TagContextBuilder tagCtx =
+ newTagCtxBuilder()
+ .putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable));
+
+ measures.record(tagCtx.build());
+ }
+
+ @Override
+ public void lroStartFailed(Throwable throwable) {
+ // noop
+ }
+
+ @Override
+ public void lroStartSucceeded() {
+ // noop
+ }
+
+ @Override
+ public void responseReceived() {
+ if (firstResponsePerOpTimer.isRunning()) {
+ firstResponsePerOpTimer.stop();
+ }
+ attemptResponseCount++;
+ operationResponseCount++;
+ }
+
+ @Override
+ public void requestSent() {
+ // noop: no operations are client streaming
+ }
+
+ @Override
+ public void batchRequestSent(long elementCount, long requestSize) {
+ // noop
+ }
+
+ private TagContextBuilder newTagCtxBuilder() {
+ TagContextBuilder tagCtx =
+ tagger
+ .toBuilder(parentContext)
+ .putLocal(RpcMeasureConstants.BIGTABLE_OP, TagValue.create(spanName.toString()));
+
+ // Copy client level tags in
+ for (Entry entry : statsAttributes.entrySet()) {
+ tagCtx.putLocal(entry.getKey(), entry.getValue());
+ }
+
+ return tagCtx;
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerFactory.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerFactory.java
new file mode 100644
index 000000000..24b22d353
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.api.core.InternalApi;
+import com.google.api.gax.tracing.ApiTracer;
+import com.google.api.gax.tracing.ApiTracerFactory;
+import com.google.api.gax.tracing.SpanName;
+import com.google.common.collect.ImmutableMap;
+import io.opencensus.stats.StatsRecorder;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import io.opencensus.tags.Tagger;
+
+/**
+ * {@link ApiTracerFactory} that will generate OpenCensus metrics by using the {@link ApiTracer}
+ * api.
+ */
+@InternalApi("For internal use only")
+public class MetricsTracerFactory implements ApiTracerFactory {
+ private final Tagger tagger;
+ private final StatsRecorder stats;
+ private final ImmutableMap statsAttributes;
+
+ public static MetricsTracerFactory create(
+ Tagger tagger, StatsRecorder stats, ImmutableMap statsAttributes) {
+ return new MetricsTracerFactory(tagger, stats, statsAttributes);
+ }
+
+ private MetricsTracerFactory(
+ Tagger tagger, StatsRecorder stats, ImmutableMap statsAttributes) {
+ this.tagger = tagger;
+ this.stats = stats;
+ this.statsAttributes = statsAttributes;
+ }
+
+ @Override
+ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) {
+ return new MetricsTracer(operationType, tagger, stats, spanName, statsAttributes);
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java
index f5830d05d..8c6e347a0 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java
@@ -15,53 +15,63 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;
-import io.opencensus.stats.Measure;
-import io.opencensus.stats.Measure.MeasureDouble;
+import com.google.api.core.InternalApi;
import io.opencensus.stats.Measure.MeasureLong;
import io.opencensus.tags.TagKey;
-class RpcMeasureConstants {
- /**
- * Tag key that represents a Bigtable operation name.
- *
- * A Bigtable operation consists of 1 or more RPCs. By comparing metrics tagged with {@link
- * io.opencensus.contrib.grpc.metrics.RpcMeasureConstants#GRPC_CLIENT_METHOD} to methods tagged
- * with {@link RpcMeasureConstants#BIGTABLE_OP}, the end user can get a sense how many attempts an
- * operation took.
- */
- public static final TagKey BIGTABLE_OP = TagKey.create("bigtable_op");
+@InternalApi("For internal use only")
+public class RpcMeasureConstants {
+ // TagKeys
+ public static final TagKey BIGTABLE_PROJECT_ID = TagKey.create("bigtable_project_id");
+ public static final TagKey BIGTABLE_INSTANCE_ID = TagKey.create("bigtable_instance_id");
+ public static final TagKey BIGTABLE_APP_PROFILE_ID = TagKey.create("bigtable_app_profile_id");
+
+ /** Tag key that represents a Bigtable operation name. */
+ static final TagKey BIGTABLE_OP = TagKey.create("bigtable_op");
/** Tag key that represents the final status of the Bigtable operation. */
- public static final TagKey BIGTABLE_STATUS = TagKey.create("bigtable_status");
+ static final TagKey BIGTABLE_STATUS = TagKey.create("bigtable_status");
+ // Units
/** Unit to represent counts. */
private static final String COUNT = "1";
/** Unit to represent milliseconds. */
private static final String MILLISECOND = "ms";
- static final MeasureDouble BIGTABLE_OP_LATENCY =
- Measure.MeasureDouble.create(
+ // Measurements
+ /**
+ * Latency for a logic operation, which will include latencies for each attempt and exponential
+ * backoff delays.
+ */
+ static final MeasureLong BIGTABLE_OP_LATENCY =
+ MeasureLong.create(
"cloud.google.com/java/bigtable/op_latency",
"Time between request being sent to last row received, "
+ "or terminal error of the last retry attempt.",
MILLISECOND);
- static final MeasureDouble BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY =
- MeasureDouble.create(
+ /**
+ * Number of attempts a logical operation took to complete. Under normal circumstances should be
+ * 1.
+ */
+ static final MeasureLong BIGTABLE_OP_ATTEMPT_COUNT =
+ MeasureLong.MeasureLong.create(
+ "cloud.google.com/java/bigtable/op_attempt_count",
+ "Number of attempts per operation",
+ COUNT);
+
+ /** Latency that a single attempt (RPC) took to complete. */
+ static final MeasureLong BIGTABLE_ATTEMPT_LATENCY =
+ MeasureLong.create(
+ "cloud.google.com/java/bigtable/attempt_latency",
+ "Duration of an individual operation attempt",
+ MILLISECOND);
+
+ /** Latency for the caller to see the first row in a ReadRows stream. */
+ static final MeasureLong BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY =
+ MeasureLong.create(
"cloud.google.com/java/bigtable/read_rows_first_row_latency",
"Time between request being sent to the first row received",
MILLISECOND);
-
- static final MeasureLong BIGTABLE_ROWS_READ_PER_OP =
- Measure.MeasureLong.create(
- "cloud.google.com/java/bigtable/rows_read_per_op",
- "Number of rows received per ReadRows operation",
- COUNT);
-
- static final MeasureLong BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH =
- Measure.MeasureLong.create(
- "cloud.google.com/java/bigtable/mutations_per_batch",
- "Number of mutations per MutateRows request",
- COUNT);
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java
index 9ee673900..d21060c4a 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java
@@ -15,11 +15,14 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;
-import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_INSTANCE_ID;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP_ATTEMPT_COUNT;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP_LATENCY;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_PROJECT_ID;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY;
-import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ROWS_READ_PER_OP;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_STATUS;
import com.google.common.collect.ImmutableList;
@@ -28,7 +31,6 @@
import io.opencensus.stats.Aggregation.Distribution;
import io.opencensus.stats.BucketBoundaries;
import io.opencensus.stats.View;
-import io.opencensus.tags.TagKey;
import java.util.Arrays;
class RpcViewConstants {
@@ -44,6 +46,13 @@ class RpcViewConstants {
250.0, 300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0,
20000.0, 50000.0, 100000.0)));
+ private static final Aggregation AGGREGATION_ATTEMPT_COUNT =
+ Distribution.create(
+ BucketBoundaries.create(
+ ImmutableList.of(
+ 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 15.0, 20.0, 30.0, 40.0, 50.0,
+ 100.0)));
+
private static final Aggregation AGGREGATION_WITH_POWERS_OF_2 =
Distribution.create(
BucketBoundaries.create(
@@ -56,21 +65,31 @@ class RpcViewConstants {
* {@link View} for Bigtable client roundtrip latency in milliseconds including all retry
* attempts.
*/
- public static final View BIGTABLE_OP_LATENCY_VIEW =
+ static final View BIGTABLE_OP_LATENCY_VIEW =
View.create(
View.Name.create("cloud.google.com/java/bigtable/op_latency"),
- "Latency in msecs",
+ "Operation latency in msecs",
BIGTABLE_OP_LATENCY,
AGGREGATION_WITH_MILLIS_HISTOGRAM,
- ImmutableList.of(BIGTABLE_OP));
+ ImmutableList.of(
+ BIGTABLE_PROJECT_ID,
+ BIGTABLE_INSTANCE_ID,
+ BIGTABLE_APP_PROFILE_ID,
+ BIGTABLE_OP,
+ BIGTABLE_STATUS));
- static final View BIGTABLE_CLIENT_COMPLETED_OP_VIEW =
+ static final View BIGTABLE_COMPLETED_OP_VIEW =
View.create(
View.Name.create("cloud.google.com/java/bigtable/completed_ops"),
"Number of completed Bigtable client operations",
BIGTABLE_OP_LATENCY,
COUNT,
- Arrays.asList(BIGTABLE_OP, BIGTABLE_STATUS));
+ Arrays.asList(
+ BIGTABLE_PROJECT_ID,
+ BIGTABLE_INSTANCE_ID,
+ BIGTABLE_APP_PROFILE_ID,
+ BIGTABLE_OP,
+ BIGTABLE_STATUS));
static final View BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW =
View.create(
@@ -78,21 +97,31 @@ class RpcViewConstants {
"Latency to receive the first row in a ReadRows stream",
BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY,
AGGREGATION_WITH_MILLIS_HISTOGRAM,
- ImmutableList.of());
+ ImmutableList.of(BIGTABLE_PROJECT_ID, BIGTABLE_INSTANCE_ID, BIGTABLE_APP_PROFILE_ID));
- static final View BIGTABLE_ROWS_READ_PER_OP_VIEW =
+ static final View BIGTABLE_ATTEMPT_LATENCY_VIEW =
View.create(
- View.Name.create("cloud.google.com/java/bigtable/rows_per_op"),
- "Rows scanned per operation",
- BIGTABLE_ROWS_READ_PER_OP,
- AGGREGATION_WITH_POWERS_OF_2,
- ImmutableList.of());
+ View.Name.create("cloud.google.com/java/bigtable/attempt_latency"),
+ "Attempt latency in msecs",
+ BIGTABLE_ATTEMPT_LATENCY,
+ AGGREGATION_WITH_MILLIS_HISTOGRAM,
+ ImmutableList.of(
+ BIGTABLE_PROJECT_ID,
+ BIGTABLE_INSTANCE_ID,
+ BIGTABLE_APP_PROFILE_ID,
+ BIGTABLE_OP,
+ BIGTABLE_STATUS));
- static final View BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH_VIEW =
+ static final View BIGTABLE_ATTEMPTS_PER_OP_VIEW =
View.create(
- View.Name.create("cloud.google.com/java/bigtable/mutations_per_batch"),
- "Number of mutations sent in a single MutateRowsRequest",
- BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH,
- AGGREGATION_WITH_POWERS_OF_2,
- ImmutableList.of());
+ View.Name.create("cloud.google.com/java/bigtable/attempts_per_op"),
+ "Distribution of attempts per logical operation",
+ BIGTABLE_OP_ATTEMPT_COUNT,
+ AGGREGATION_ATTEMPT_COUNT,
+ ImmutableList.of(
+ BIGTABLE_PROJECT_ID,
+ BIGTABLE_INSTANCE_ID,
+ BIGTABLE_APP_PROFILE_ID,
+ BIGTABLE_OP,
+ BIGTABLE_STATUS));
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java
index a8e772e3b..cc3153949 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java
@@ -25,21 +25,15 @@
@BetaApi
public class RpcViews {
@VisibleForTesting
- static final ImmutableSet BIGTABLE_CLIENT_VIEWS_SET =
+ private static final ImmutableSet BIGTABLE_CLIENT_VIEWS_SET =
ImmutableSet.of(
RpcViewConstants.BIGTABLE_OP_LATENCY_VIEW,
- RpcViewConstants.BIGTABLE_CLIENT_COMPLETED_OP_VIEW,
+ RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW,
RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW,
- RpcViewConstants.BIGTABLE_ROWS_READ_PER_OP_VIEW,
- RpcViewConstants.BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH_VIEW);
+ RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW,
+ RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW);
- /**
- * Registers all Bigtable specific views.
- *
- * It is recommended to call this method and {@link
- * io.opencensus.contrib.grpc.metrics.RpcViews#registerClientGrpcViews()} before doing any RPC
- * call to avoid missing stats.
- */
+ /** Registers all Bigtable specific views. */
public static void registerBigtableClientViews() {
registerBigtableClientViews(Stats.getViewManager());
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java
new file mode 100644
index 000000000..cedb227ba
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerTest.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.gax.tracing.ApiTracer;
+import com.google.api.gax.tracing.ApiTracer.Scope;
+import com.google.common.collect.ImmutableList;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.threeten.bp.Duration;
+
+@RunWith(JUnit4.class)
+public class CompositeTracerTest {
+ @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Mock private ApiTracer child1;
+ @Mock private ApiTracer child2;
+
+ private CompositeTracer compositeTracer;
+
+ @Before
+ public void setup() {
+ compositeTracer = new CompositeTracer(ImmutableList.of(child1, child2));
+ }
+
+ @Test
+ public void testInScope() {
+ Scope scope1 = mock(Scope.class);
+ when(child1.inScope()).thenReturn(scope1);
+
+ Scope scope2 = mock(Scope.class);
+ when(child2.inScope()).thenReturn(scope2);
+
+ Scope parentScope = compositeTracer.inScope();
+
+ parentScope.close();
+ verify(scope1, times(1)).close();
+ }
+
+ @Test
+ public void testOperationSucceeded() {
+ compositeTracer.operationSucceeded();
+ verify(child1, times(1)).operationSucceeded();
+ verify(child2, times(1)).operationSucceeded();
+ }
+
+ @Test
+ public void testOperationCancelled() {
+ compositeTracer.operationCancelled();
+ verify(child1, times(1)).operationCancelled();
+ verify(child2, times(1)).operationCancelled();
+ }
+
+ @Test
+ public void testOperationFailed() {
+ RuntimeException error = new RuntimeException();
+ compositeTracer.operationFailed(error);
+ verify(child1, times(1)).operationFailed(error);
+ verify(child2, times(1)).operationFailed(error);
+ }
+
+ @Test
+ public void testConnectionSelected() {
+ compositeTracer.connectionSelected("connection-one");
+ verify(child1, times(1)).connectionSelected("connection-one");
+ verify(child2, times(1)).connectionSelected("connection-one");
+ }
+
+ @Test
+ public void testAttemptStarted() {
+ compositeTracer.attemptStarted(3);
+ verify(child1, times(1)).attemptStarted(3);
+ verify(child2, times(1)).attemptStarted(3);
+ }
+
+ @Test
+ public void testAttemptSucceeded() {
+ compositeTracer.attemptSucceeded();
+ verify(child1, times(1)).attemptSucceeded();
+ verify(child2, times(1)).attemptSucceeded();
+ }
+
+ @Test
+ public void testAttemptCancelled() {
+ compositeTracer.attemptCancelled();
+ verify(child1, times(1)).attemptCancelled();
+ verify(child2, times(1)).attemptCancelled();
+ }
+
+ @Test
+ public void testAttemptFailed() {
+ RuntimeException error = new RuntimeException();
+ Duration delay = Duration.ofMillis(10);
+ compositeTracer.attemptFailed(error, delay);
+ verify(child1, times(1)).attemptFailed(error, delay);
+ verify(child2, times(1)).attemptFailed(error, delay);
+ }
+
+ @Test
+ public void testAttemptFailedRetriesExhausted() {
+ RuntimeException error = new RuntimeException();
+ compositeTracer.attemptFailedRetriesExhausted(error);
+ verify(child1, times(1)).attemptFailedRetriesExhausted(error);
+ verify(child2, times(1)).attemptFailedRetriesExhausted(error);
+ }
+
+ @Test
+ public void testAttemptPermanentFailure() {
+ RuntimeException error = new RuntimeException();
+ compositeTracer.attemptPermanentFailure(error);
+ verify(child1, times(1)).attemptPermanentFailure(error);
+ verify(child2, times(1)).attemptPermanentFailure(error);
+ }
+
+ @Test
+ public void testLroStartFailed() {
+ RuntimeException error = new RuntimeException();
+ compositeTracer.lroStartFailed(error);
+ verify(child1, times(1)).lroStartFailed(error);
+ verify(child2, times(1)).lroStartFailed(error);
+ }
+
+ @Test
+ public void testLroStartSucceeded() {
+ compositeTracer.lroStartSucceeded();
+ verify(child1, times(1)).lroStartSucceeded();
+ verify(child2, times(1)).lroStartSucceeded();
+ }
+
+ @Test
+ public void testResponseReceived() {
+ compositeTracer.responseReceived();
+ verify(child1, times(1)).responseReceived();
+ verify(child2, times(1)).responseReceived();
+ }
+
+ @Test
+ public void testRequestSent() {
+ compositeTracer.requestSent();
+ verify(child1, times(1)).requestSent();
+ verify(child2, times(1)).requestSent();
+ }
+
+ @Test
+ public void testBatchRequestSent() {
+ compositeTracer.batchRequestSent(2, 20);
+ verify(child1, times(1)).batchRequestSent(2, 20);
+ verify(child2, times(1)).batchRequestSent(2, 20);
+ }
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasureMutateRowsCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasureMutateRowsCallableTest.java
deleted file mode 100644
index 2b47282da..000000000
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasureMutateRowsCallableTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Copyright 2019 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub.metrics;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutures;
-import com.google.api.gax.core.FakeApiClock;
-import com.google.api.gax.grpc.GrpcStatusCode;
-import com.google.api.gax.rpc.ApiCallContext;
-import com.google.api.gax.rpc.DeadlineExceededException;
-import com.google.api.gax.rpc.UnaryCallable;
-import com.google.cloud.bigtable.data.v2.models.BulkMutation;
-import com.google.cloud.bigtable.data.v2.models.Mutation;
-import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.FakeStatsRecorder;
-import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.FakeTagger;
-import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.MetricsRecord;
-import io.grpc.Status.Code;
-import io.opencensus.tags.TagValue;
-import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
-import org.mockito.stubbing.Answer;
-
-@RunWith(JUnit4.class)
-public class MeasureMutateRowsCallableTest {
- private static final String METHOD_NAME = "Bigtable.MutateRows";
- @Rule public final MockitoRule rule = MockitoJUnit.rule();
-
- private FakeTagger tagger;
-
- private FakeStatsRecorder statsRecorder;
-
- private FakeApiClock clock;
-
- @Mock private UnaryCallable innerCallable;
-
- private MeasuredMutateRowsCallable callable;
-
- @Before
- public void setUp() {
- tagger = new FakeTagger();
- statsRecorder = new FakeStatsRecorder();
- clock = new FakeApiClock(0);
-
- callable =
- new MeasuredMutateRowsCallable(innerCallable, METHOD_NAME, tagger, statsRecorder, clock);
- }
-
- @Test
- public void testOk() {
- Mockito.when(
- innerCallable.futureCall(
- Mockito.any(BulkMutation.class), Mockito.any(ApiCallContext.class)))
- .thenAnswer(
- new Answer>() {
- @Override
- public ApiFuture answer(InvocationOnMock invocationOnMock) {
- clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(3));
- return ApiFutures.immediateFuture(null);
- }
- });
-
- callable.call(
- BulkMutation.create("tableID")
- .add("rowKey", Mutation.create())
- .add("rowKey2", Mutation.create()));
-
- MetricsRecord metricsRecord = statsRecorder.pollRecord();
-
- assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 3.0);
- assertThat(metricsRecord.metrics)
- .containsEntry(RpcMeasureConstants.BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH, 2L);
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create(METHOD_NAME));
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK"));
- }
-
- @Test
- public void testFailure() {
- Mockito.when(
- innerCallable.futureCall(
- Mockito.any(BulkMutation.class), Mockito.any(ApiCallContext.class)))
- .thenAnswer(
- new Answer>() {
- @Override
- public ApiFuture answer(InvocationOnMock invocationOnMock) {
- clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(3));
- return ApiFutures.immediateFailedFuture(
- new DeadlineExceededException(
- "timeout!", null, GrpcStatusCode.of(Code.DEADLINE_EXCEEDED), true));
- }
- });
-
- Throwable actualError = null;
-
- try {
- callable.call(
- BulkMutation.create("tableID")
- .add("rowKey", Mutation.create())
- .add("rowKey2", Mutation.create()));
- } catch (Throwable e) {
- actualError = e;
- }
-
- assertThat(actualError).isInstanceOf(DeadlineExceededException.class);
-
- MetricsRecord metricsRecord = statsRecorder.pollRecord();
-
- assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 3.0);
- assertThat(metricsRecord.metrics)
- .containsEntry(RpcMeasureConstants.BIGTABLE_MUTATE_ROWS_ENTRIES_PER_BATCH, 2L);
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create(METHOD_NAME));
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("DEADLINE_EXCEEDED"));
- }
-}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallableTest.java
deleted file mode 100644
index 2c0863895..000000000
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredReadRowsCallableTest.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Copyright 2019 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub.metrics;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import com.google.api.gax.core.FakeApiClock;
-import com.google.api.gax.grpc.GrpcStatusCode;
-import com.google.api.gax.rpc.DeadlineExceededException;
-import com.google.cloud.bigtable.data.v2.models.Query;
-import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.FakeStatsRecorder;
-import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.FakeTagger;
-import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.MetricsRecord;
-import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCall;
-import com.google.cloud.bigtable.gaxx.testing.MockStreamingApi.MockServerStreamingCallable;
-import io.grpc.Status.Code;
-import io.opencensus.tags.TagValue;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
-
-@RunWith(JUnit4.class)
-public class MeasuredReadRowsCallableTest {
- @Rule public final MockitoRule rule = MockitoJUnit.rule();
-
- private FakeTagger tagger;
-
- private FakeStatsRecorder statsRecorder;
-
- private FakeApiClock clock;
-
- private MockServerStreamingCallable innerCallable;
-
- private MeasuredReadRowsCallable callable;
-
- @Before
- public void setUp() {
- innerCallable = new MockServerStreamingCallable<>();
-
- tagger = new FakeTagger();
- statsRecorder = new FakeStatsRecorder();
- clock = new FakeApiClock(0);
-
- callable =
- new MeasuredReadRowsCallable<>(
- innerCallable, "Bigtable.ReadRows", tagger, statsRecorder, clock);
- }
-
- @Test
- public void testOk() {
- new Thread() {
- @Override
- public void run() {
- MockServerStreamingCall lastCall = null;
-
- for (int i = 0; i < 10 && lastCall == null; i++) {
- lastCall = innerCallable.popLastCall();
- }
- clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(2));
- lastCall.getController().popLastPull();
- lastCall.getController().getObserver().onResponse("row0");
-
- clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(3));
- lastCall.getController().popLastPull();
- lastCall.getController().getObserver().onResponse("row1");
- lastCall.getController().getObserver().onComplete();
- }
- }.start();
-
- List results = callable.all().call(Query.create("fake-table"));
-
- assertThat(results).containsExactly("row0", "row1");
-
- MetricsRecord metricsRecord = statsRecorder.pollRecord();
-
- assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 5.0);
- assertThat(metricsRecord.metrics)
- .containsEntry(RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY, 2.0);
- assertThat(metricsRecord.metrics)
- .containsEntry(RpcMeasureConstants.BIGTABLE_ROWS_READ_PER_OP, 2L);
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"));
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK"));
- }
-
- @Test
- public void testEmpty() {
- new Thread() {
- @Override
- public void run() {
- MockServerStreamingCall lastCall = null;
-
- for (int i = 0; i < 10 && lastCall == null; i++) {
- lastCall = innerCallable.popLastCall();
- }
- clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(2));
- lastCall.getController().getObserver().onComplete();
- }
- }.start();
-
- List results = callable.all().call(Query.create("fake-table"));
-
- assertThat(results).isEmpty();
-
- MetricsRecord metricsRecord = statsRecorder.pollRecord();
-
- assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 2.0);
- assertThat(metricsRecord.metrics)
- .doesNotContainKey(RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY);
- assertThat(metricsRecord.metrics)
- .containsEntry(RpcMeasureConstants.BIGTABLE_ROWS_READ_PER_OP, 0L);
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"));
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK"));
- }
-
- @Test
- public void testFailure() {
- new Thread() {
- @Override
- public void run() {
- MockServerStreamingCall lastCall = null;
-
- for (int i = 0; i < 10 && lastCall == null; i++) {
- lastCall = innerCallable.popLastCall();
- }
- clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(2));
- lastCall
- .getController()
- .getObserver()
- .onError(
- new DeadlineExceededException(
- "timeout!", null, GrpcStatusCode.of(Code.DEADLINE_EXCEEDED), true));
- }
- }.start();
-
- Throwable actualError = null;
- try {
- callable.all().call(Query.create("fake-table"));
- } catch (Throwable e) {
- actualError = e;
- }
-
- assertThat(actualError).isInstanceOf(DeadlineExceededException.class);
-
- MetricsRecord metricsRecord = statsRecorder.pollRecord();
-
- assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 2.0);
- assertThat(metricsRecord.metrics)
- .doesNotContainKey(RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY);
- assertThat(metricsRecord.metrics)
- .containsEntry(RpcMeasureConstants.BIGTABLE_ROWS_READ_PER_OP, 0L);
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"));
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("DEADLINE_EXCEEDED"));
- }
-
- @Test
- public void testFailureAfterData() {
- new Thread() {
- @Override
- public void run() {
- MockServerStreamingCall lastCall = null;
-
- for (int i = 0; i < 10 && lastCall == null; i++) {
- lastCall = innerCallable.popLastCall();
- }
- clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(2));
- lastCall.getController().popLastPull();
- lastCall.getController().getObserver().onResponse("row0");
-
- clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(3));
- lastCall
- .getController()
- .getObserver()
- .onError(
- new DeadlineExceededException(
- "timeout!", null, GrpcStatusCode.of(Code.DEADLINE_EXCEEDED), true));
- }
- }.start();
-
- Throwable actualError = null;
- try {
- callable.all().call(Query.create("fake-table"));
- } catch (Throwable e) {
- actualError = e;
- }
-
- assertThat(actualError).isInstanceOf(DeadlineExceededException.class);
-
- MetricsRecord metricsRecord = statsRecorder.pollRecord();
-
- assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 5.0);
- assertThat(metricsRecord.metrics)
- .containsEntry(RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY, 2.0);
- assertThat(metricsRecord.metrics)
- .containsEntry(RpcMeasureConstants.BIGTABLE_ROWS_READ_PER_OP, 1L);
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"));
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("DEADLINE_EXCEEDED"));
- }
-}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallableTest.java
deleted file mode 100644
index 41006148f..000000000
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MeasuredUnaryCallableTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright 2019 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.bigtable.data.v2.stub.metrics;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutures;
-import com.google.api.gax.core.FakeApiClock;
-import com.google.api.gax.grpc.GrpcStatusCode;
-import com.google.api.gax.rpc.ApiCallContext;
-import com.google.api.gax.rpc.DeadlineExceededException;
-import com.google.api.gax.rpc.UnaryCallable;
-import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.FakeStatsRecorder;
-import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.FakeTagger;
-import com.google.cloud.bigtable.data.v2.stub.metrics.StatsTestUtils.MetricsRecord;
-import io.grpc.Status.Code;
-import io.opencensus.tags.TagValue;
-import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
-import org.mockito.stubbing.Answer;
-
-@RunWith(JUnit4.class)
-public class MeasuredUnaryCallableTest {
- private static final String FAKE_METHOD = "Bigtable.FakeMethod";
-
- @Rule public final MockitoRule rule = MockitoJUnit.rule();
-
- private FakeTagger tagger;
-
- private FakeStatsRecorder statsRecorder;
-
- private FakeApiClock clock;
-
- @Mock private UnaryCallable innerCallable;
-
- private MeasuredUnaryCallable callable;
-
- @Before
- public void setUp() {
- tagger = new FakeTagger();
- statsRecorder = new FakeStatsRecorder();
- clock = new FakeApiClock(0);
-
- callable =
- new MeasuredUnaryCallable<>(innerCallable, FAKE_METHOD, tagger, statsRecorder, clock);
- }
-
- @Test
- public void testOk() {
- Mockito.when(innerCallable.futureCall(Mockito.anyString(), Mockito.any(ApiCallContext.class)))
- .thenAnswer(
- new Answer>() {
- @Override
- public ApiFuture answer(InvocationOnMock invocationOnMock) {
- clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(2));
- return ApiFutures.immediateFuture("response");
- }
- });
-
- String response = callable.call("request");
-
- assertThat(response).isEqualTo("response");
-
- MetricsRecord metricsRecord = statsRecorder.pollRecord();
-
- assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 2.0);
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create(FAKE_METHOD));
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK"));
- }
-
- @Test
- public void testFailure() {
- Mockito.when(innerCallable.futureCall(Mockito.anyString(), Mockito.any(ApiCallContext.class)))
- .thenAnswer(
- new Answer>() {
- @Override
- public ApiFuture answer(InvocationOnMock invocationOnMock) {
- clock.incrementNanoTime(TimeUnit.MILLISECONDS.toNanos(2));
- return ApiFutures.immediateFailedFuture(
- new DeadlineExceededException(
- "timeout!", null, GrpcStatusCode.of(Code.DEADLINE_EXCEEDED), true));
- }
- });
-
- Throwable actualError = null;
- try {
- callable.call("request");
- } catch (Throwable e) {
- actualError = e;
- }
-
- assertThat(actualError).isInstanceOf(DeadlineExceededException.class);
-
- MetricsRecord metricsRecord = statsRecorder.pollRecord();
-
- assertThat(metricsRecord.metrics).containsEntry(RpcMeasureConstants.BIGTABLE_OP_LATENCY, 2.0);
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_OP, TagValue.create(FAKE_METHOD));
- assertThat(metricsRecord.tags)
- .containsEntry(RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("DEADLINE_EXCEEDED"));
- }
-}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java
new file mode 100644
index 000000000..9314391af
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java
@@ -0,0 +1,401 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+
+import com.google.api.gax.rpc.ClientContext;
+import com.google.bigtable.v2.BigtableGrpc;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.ReadRowsResponse;
+import com.google.bigtable.v2.ReadRowsResponse.CellChunk;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
+import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.StringValue;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import io.opencensus.common.Function;
+import io.opencensus.impl.stats.StatsComponentImpl;
+import io.opencensus.stats.AggregationData;
+import io.opencensus.stats.AggregationData.CountData;
+import io.opencensus.stats.AggregationData.DistributionData;
+import io.opencensus.stats.AggregationData.LastValueDataDouble;
+import io.opencensus.stats.AggregationData.LastValueDataLong;
+import io.opencensus.stats.AggregationData.SumDataDouble;
+import io.opencensus.stats.AggregationData.SumDataLong;
+import io.opencensus.stats.View;
+import io.opencensus.stats.ViewData;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import io.opencensus.tags.Tags;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.stubbing.Answer;
+
+@RunWith(JUnit4.class)
+public class MetricsTracerTest {
+ private static final String PROJECT_ID = "fake-project";
+ private static final String INSTANCE_ID = "fake-instance";
+ private static final String APP_PROFILE_ID = "default";
+ private static final String TABLE_ID = "fake-table";
+
+ private static final ReadRowsResponse DEFAULT_READ_ROWS_RESPONSES =
+ ReadRowsResponse.newBuilder()
+ .addChunks(
+ CellChunk.newBuilder()
+ .setRowKey(ByteString.copyFromUtf8("fake-key"))
+ .setFamilyName(StringValue.of("cf"))
+ .setQualifier(BytesValue.newBuilder().setValue(ByteString.copyFromUtf8("q")))
+ .setTimestampMicros(1_000)
+ .setValue(ByteString.copyFromUtf8("value"))
+ .setCommitRow(true))
+ .build();
+
+ @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ private Server server;
+
+ @Mock(answer = Answers.CALLS_REAL_METHODS)
+ private BigtableGrpc.BigtableImplBase mockService;
+
+ private StatsComponentImpl localStats = new StatsComponentImpl();
+ private EnhancedBigtableStub stub;
+
+ @Before
+ public void setUp() throws Exception {
+ int port;
+ try (ServerSocket ss = new ServerSocket(0)) {
+ port = ss.getLocalPort();
+ }
+ server = ServerBuilder.forPort(port).addService(mockService).build();
+ server.start();
+
+ RpcViews.registerBigtableClientViews(localStats.getViewManager());
+
+ BigtableDataSettings settings =
+ BigtableDataSettings.newBuilderForEmulator(port)
+ .setProjectId(PROJECT_ID)
+ .setInstanceId(INSTANCE_ID)
+ .setAppProfileId(APP_PROFILE_ID)
+ .build();
+ EnhancedBigtableStubSettings stubSettings = settings.getStubSettings();
+
+ stub =
+ new EnhancedBigtableStub(
+ stubSettings,
+ ClientContext.create(stubSettings),
+ Tags.getTagger(),
+ localStats.getStatsRecorder());
+ }
+
+ @After
+ public void tearDown() {
+ stub.close();
+ server.shutdown();
+ }
+
+ @Test
+ public void testReadRowsLatency() throws InterruptedException {
+ final long sleepTime = 50;
+
+ doAnswer(
+ new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ @SuppressWarnings("unchecked")
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+ Thread.sleep(sleepTime);
+ observer.onNext(DEFAULT_READ_ROWS_RESPONSES);
+ observer.onCompleted();
+ return null;
+ }
+ })
+ .when(mockService)
+ .readRows(any(ReadRowsRequest.class), anyObserver(ReadRowsResponse.class));
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ // Give OpenCensus a chance to update the views asynchronously.
+ Thread.sleep(100);
+
+ long opLatency =
+ getAggregationValueAsLong(
+ RpcViewConstants.BIGTABLE_OP_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")));
+ assertThat(opLatency).isIn(Range.closed(sleepTime, elapsed));
+ }
+
+ @Test
+ public void testReadRowsOpCount() throws InterruptedException {
+ doAnswer(
+ new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) {
+ @SuppressWarnings("unchecked")
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+ observer.onNext(DEFAULT_READ_ROWS_RESPONSES);
+ observer.onCompleted();
+ return null;
+ }
+ })
+ .when(mockService)
+ .readRows(any(ReadRowsRequest.class), anyObserver(ReadRowsResponse.class));
+
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));
+
+ // Give OpenCensus a chance to update the views asynchronously.
+ Thread.sleep(100);
+
+ long opLatency =
+ getAggregationValueAsLong(
+ RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")));
+ assertThat(opLatency).isEqualTo(2);
+ }
+
+ @Test
+ public void testReadRowsFirstRow() throws InterruptedException {
+ final long beforeSleep = 50;
+ final long afterSleep = 50;
+
+ doAnswer(
+ new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ @SuppressWarnings("unchecked")
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+ Thread.sleep(beforeSleep);
+ observer.onNext(DEFAULT_READ_ROWS_RESPONSES);
+ Thread.sleep(afterSleep);
+ observer.onCompleted();
+ return null;
+ }
+ })
+ .when(mockService)
+ .readRows(any(ReadRowsRequest.class), anyObserver(ReadRowsResponse.class));
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ // Give OpenCensus a chance to update the views asynchronously.
+ Thread.sleep(100);
+
+ long firstRowLatency =
+ getAggregationValueAsLong(
+ RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW,
+ ImmutableMap.of());
+ assertThat(firstRowLatency).isIn(Range.closed(beforeSleep, elapsed - afterSleep));
+ }
+
+ @Test
+ public void testReadRowsAttemptsPerOp() throws InterruptedException {
+ final AtomicInteger callCount = new AtomicInteger(0);
+
+ doAnswer(
+ new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) {
+ @SuppressWarnings("unchecked")
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+
+ // First call will trigger a transient error
+ if (callCount.getAndIncrement() == 0) {
+ observer.onError(new StatusRuntimeException(Status.UNAVAILABLE));
+ return null;
+ }
+
+ // Next attempt will return a row
+ observer.onNext(DEFAULT_READ_ROWS_RESPONSES);
+ observer.onCompleted();
+ return null;
+ }
+ })
+ .when(mockService)
+ .readRows(any(ReadRowsRequest.class), anyObserver(ReadRowsResponse.class));
+
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));
+
+ // Give OpenCensus a chance to update the views asynchronously.
+ Thread.sleep(100);
+
+ long opLatency =
+ getAggregationValueAsLong(
+ RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")));
+ assertThat(opLatency).isEqualTo(2);
+ }
+
+ @Test
+ public void testReadRowsAttemptLatency() throws InterruptedException {
+ final long sleepTime = 50;
+ final AtomicInteger callCount = new AtomicInteger(0);
+
+ doAnswer(
+ new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ @SuppressWarnings("unchecked")
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+
+ Thread.sleep(sleepTime);
+
+ // First attempt will return a transient error
+ if (callCount.getAndIncrement() == 0) {
+ observer.onError(new StatusRuntimeException(Status.UNAVAILABLE));
+ return null;
+ }
+ // Next attempt will be ok
+ observer.onNext(DEFAULT_READ_ROWS_RESPONSES);
+ observer.onCompleted();
+ return null;
+ }
+ })
+ .when(mockService)
+ .readRows(any(ReadRowsRequest.class), anyObserver(ReadRowsResponse.class));
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ Lists.newArrayList(stub.readRowsCallable().call(Query.create(TABLE_ID)));
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ // Give OpenCensus a chance to update the views asynchronously.
+ Thread.sleep(100);
+
+ long attemptLatency =
+ getAggregationValueAsLong(
+ RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")));
+ // Average attempt latency will be just a single wait (as opposed to op latency which will be 2x
+ // sleeptime)
+ assertThat(attemptLatency).isIn(Range.closed(sleepTime, elapsed - sleepTime));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static StreamObserver anyObserver(Class returnType) {
+ return (StreamObserver) any(returnType);
+ }
+
+ private long getAggregationValueAsLong(View view, ImmutableMap tags) {
+ ViewData viewData = localStats.getViewManager().getView(view.getName());
+ Map, AggregationData> aggregationMap =
+ Objects.requireNonNull(viewData).getAggregationMap();
+
+ List tagValues = new ArrayList<>();
+
+ for (TagKey column : view.getColumns()) {
+ if (RpcMeasureConstants.BIGTABLE_PROJECT_ID == column) {
+ tagValues.add(TagValue.create(PROJECT_ID));
+ } else if (RpcMeasureConstants.BIGTABLE_INSTANCE_ID == column) {
+ tagValues.add(TagValue.create(INSTANCE_ID));
+ } else if (RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID == column) {
+ tagValues.add(TagValue.create(APP_PROFILE_ID));
+ } else {
+ tagValues.add(tags.get(column));
+ }
+ }
+
+ AggregationData aggregationData = aggregationMap.get(tagValues);
+
+ return aggregationData.match(
+ new Function() {
+ @Override
+ public Long apply(SumDataDouble arg) {
+ return (long) arg.getSum();
+ }
+ },
+ new Function() {
+ @Override
+ public Long apply(SumDataLong arg) {
+ return arg.getSum();
+ }
+ },
+ new Function() {
+ @Override
+ public Long apply(CountData arg) {
+ return arg.getCount();
+ }
+ },
+ new Function() {
+ @Override
+ public Long apply(DistributionData arg) {
+ return (long) arg.getMean();
+ }
+ },
+ new Function() {
+ @Override
+ public Long apply(LastValueDataDouble arg) {
+ return (long) arg.getLastValue();
+ }
+ },
+ new Function() {
+ @Override
+ public Long apply(LastValueDataLong arg) {
+ return arg.getLastValue();
+ }
+ },
+ new Function() {
+ @Override
+ public Long apply(AggregationData arg) {
+ throw new UnsupportedOperationException();
+ }
+ });
+ }
+}