diff --git a/README.md b/README.md
index e9bb4b0f59..bf339adc4a 100644
--- a/README.md
+++ b/README.md
@@ -296,12 +296,22 @@ metrics will be tagged with:
each client RPC, tagged by operation name and the attempt status. Under normal
circumstances, this will be identical to op_latency. However, when the client
receives transient errors, op_latency will be the sum of all attempt_latencies
- and the exponential delays
+ and the exponential delays.
* `cloud.google.com/java/bigtable/attempts_per_op`: A distribution of attempts that
each operation required, tagged by operation name and final operation status.
Under normal circumstances, this will be 1.
+### GFE metric views:
+
+* `cloud.google.com/java/bigtable/gfe_latency`: A distribution of the latency
+ between Google's network receives an RPC and reads back the first byte of
+ the response.
+
+* `cloud.google.com/java/bigtable/gfe_header_missing_count`: A counter of the
+ number of RPC responses received without the server-timing header, which
+ indicates that the request probably never reached Google's network.
+
By default, the functionality is disabled. For example to enable metrics using
[Google Stackdriver](https://cloud.google.com/monitoring/docs/):
@@ -357,6 +367,8 @@ StackdriverStatsExporter.createAndRegister(
);
BigtableDataSettings.enableOpenCensusStats();
+// Enable GFE metric views
+BigtableDataSettings.enableGfeOpenCensusStats();
```
## Version Conflicts
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java
index f4d63b5ec0..3002aa6113 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java
@@ -175,6 +175,20 @@ public static void enableOpenCensusStats() {
// io.opencensus.contrib.grpc.metrics.RpcViews.registerClientGrpcBasicViews();
}
+ /**
+ * Enables OpenCensus GFE metric aggregations.
+ *
+ *
This will register views for gfe_latency and gfe_header_missing_count metrics.
+ *
+ *
gfe_latency measures the latency between Google's network receives an RPC and reads back the
+ * first byte of the response. gfe_header_missing_count is a counter of the number of RPC
+ * responses received without the server-timing header.
+ */
+ @BetaApi("OpenCensus stats integration is currently unstable and may change in the future")
+ public static void enableGfeOpenCensusStats() {
+ com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientGfeViews();
+ }
+
/** Returns the target project id. */
public String getProjectId() {
return stubSettings.getProjectId();
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
index d729d6244d..448390396f 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
@@ -66,6 +66,8 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
+import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
+import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
@@ -162,6 +164,15 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build());
}
+ ImmutableMap attributes =
+ ImmutableMap.builder()
+ .put(RpcMeasureConstants.BIGTABLE_PROJECT_ID, TagValue.create(settings.getProjectId()))
+ .put(
+ RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(settings.getInstanceId()))
+ .put(
+ RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
+ TagValue.create(settings.getAppProfileId()))
+ .build();
// Inject Opencensus instrumentation
builder.setTracerFactory(
new CompositeTracerFactory(
@@ -187,23 +198,17 @@ public static EnhancedBigtableStubSettings finalizeSettings(
GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class))
.build()),
// Add OpenCensus Metrics
- MetricsTracerFactory.create(
- tagger,
- stats,
- ImmutableMap.builder()
- .put(
- RpcMeasureConstants.BIGTABLE_PROJECT_ID,
- TagValue.create(settings.getProjectId()))
- .put(
- RpcMeasureConstants.BIGTABLE_INSTANCE_ID,
- TagValue.create(settings.getInstanceId()))
- .put(
- RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
- TagValue.create(settings.getAppProfileId()))
- .build()),
+ MetricsTracerFactory.create(tagger, stats, attributes),
// Add user configured tracer
settings.getTracerFactory())));
-
+ builder.setHeaderTracer(
+ builder
+ .getHeaderTracer()
+ .toBuilder()
+ .setStats(stats)
+ .setTagger(tagger)
+ .setStatsAttributes(attributes)
+ .build());
return builder.build();
}
@@ -268,11 +273,10 @@ public ServerStreamingCallable createReadRowsCallable(
ServerStreamingCallable readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);
+ SpanName span = getSpanName("ReadRows");
ServerStreamingCallable traced =
new TracedServerStreamingCallable<>(
- readRowsUserCallable,
- clientContext.getTracerFactory(),
- SpanName.of(CLIENT_NAME, "ReadRows"));
+ readRowsUserCallable, clientContext.getTracerFactory(), span);
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
@@ -315,6 +319,7 @@ public UnaryCallable createReadRowCallable(RowAdapter
* Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured by the {@code rowAdapter} parameter.
+ * Add header tracer for tracking GFE metrics.
* Retry/resume on failure.
* Filter out marker rows.
*
@@ -356,10 +361,14 @@ public Map extract(ReadRowsRequest readRowsRequest) {
ServerStreamingCallable watched =
Callables.watched(merging, innerSettings, clientContext);
+ ServerStreamingCallable withHeaderTracer =
+ new HeaderTracerStreamingCallable<>(
+ watched, settings.getHeaderTracer(), getSpanName("ReadRows").toString());
+
// Retry logic is split into 2 parts to workaround a rare edge case described in
// ReadRowsRetryCompletedCallable
ServerStreamingCallable retrying1 =
- new ReadRowsRetryCompletedCallable<>(watched);
+ new ReadRowsRetryCompletedCallable<>(withHeaderTracer);
ServerStreamingCallable retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
@@ -380,6 +389,8 @@ public Map extract(ReadRowsRequest readRowsRequest) {
*
*/
private UnaryCallable> createSampleRowKeysCallable() {
+ String methodName = "SampleRowKeys";
+
ServerStreamingCallable base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.newBuilder()
@@ -399,11 +410,15 @@ public Map extract(
UnaryCallable> spoolable = base.all();
+ UnaryCallable> withHeaderTracer =
+ new HeaderTracerUnaryCallable<>(
+ spoolable, settings.getHeaderTracer(), getSpanName(methodName).toString());
+
UnaryCallable> retryable =
- Callables.retrying(spoolable, settings.sampleRowKeysSettings(), clientContext);
+ Callables.retrying(withHeaderTracer, settings.sampleRowKeysSettings(), clientContext);
return createUserFacingUnaryCallable(
- "SampleRowKeys", new SampleRowKeysCallable(retryable, requestContext));
+ methodName, new SampleRowKeysCallable(retryable, requestContext));
}
/**
@@ -415,6 +430,7 @@ public Map extract(
*
*/
private UnaryCallable createMutateRowCallable() {
+ String methodName = "MutateRow";
UnaryCallable base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.newBuilder()
@@ -431,11 +447,15 @@ public Map extract(MutateRowRequest mutateRowRequest) {
.build(),
settings.mutateRowSettings().getRetryableCodes());
+ UnaryCallable withHeaderTracer =
+ new HeaderTracerUnaryCallable<>(
+ base, settings.getHeaderTracer(), getSpanName(methodName).toString());
+
UnaryCallable retrying =
- Callables.retrying(base, settings.mutateRowSettings(), clientContext);
+ Callables.retrying(withHeaderTracer, settings.mutateRowSettings(), clientContext);
return createUserFacingUnaryCallable(
- "MutateRow", new MutateRowCallable(retrying, requestContext));
+ methodName, new MutateRowCallable(retrying, requestContext));
}
/**
@@ -459,11 +479,13 @@ private UnaryCallable createBulkMutateRowsCallable() {
UnaryCallable userFacing =
new BulkMutateRowsUserFacingCallable(baseCallable, requestContext);
+ SpanName spanName = getSpanName("MutateRows");
UnaryCallable traced =
- new TracedUnaryCallable<>(
- userFacing, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, "MutateRows"));
+ new TracedUnaryCallable<>(userFacing, clientContext.getTracerFactory(), spanName);
+ UnaryCallable withHeaderTracer =
+ new HeaderTracerUnaryCallable<>(traced, settings.getHeaderTracer(), spanName.toString());
- return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
+ return withHeaderTracer.withDefaultCallContext(clientContext.getDefaultCallContext());
}
/**
@@ -569,6 +591,7 @@ public Map extract(MutateRowsRequest mutateRowsRequest) {
*
*/
private UnaryCallable createCheckAndMutateRowCallable() {
+ String methodName = "CheckAndMutateRow";
UnaryCallable base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.newBuilder()
@@ -586,11 +609,15 @@ public Map extract(
.build(),
settings.checkAndMutateRowSettings().getRetryableCodes());
+ UnaryCallable withHeaderTracer =
+ new HeaderTracerUnaryCallable<>(
+ base, settings.getHeaderTracer(), getSpanName(methodName).toString());
+
UnaryCallable retrying =
- Callables.retrying(base, settings.checkAndMutateRowSettings(), clientContext);
+ Callables.retrying(withHeaderTracer, settings.checkAndMutateRowSettings(), clientContext);
return createUserFacingUnaryCallable(
- "CheckAndMutateRow", new CheckAndMutateRowCallable(retrying, requestContext));
+ methodName, new CheckAndMutateRowCallable(retrying, requestContext));
}
/**
@@ -619,12 +646,16 @@ public Map extract(ReadModifyWriteRowRequest request) {
})
.build(),
settings.readModifyWriteRowSettings().getRetryableCodes());
+ String methodName = "ReadModifyWriteRow";
+ UnaryCallable withHeaderTracer =
+ new HeaderTracerUnaryCallable<>(
+ base, settings.getHeaderTracer(), getSpanName(methodName).toString());
UnaryCallable retrying =
- Callables.retrying(base, settings.readModifyWriteRowSettings(), clientContext);
+ Callables.retrying(withHeaderTracer, settings.readModifyWriteRowSettings(), clientContext);
return createUserFacingUnaryCallable(
- "ReadModifyWriteRow", new ReadModifyWriteRowCallable(retrying, requestContext));
+ methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
}
/**
@@ -635,8 +666,7 @@ private UnaryCallable createUserFacin
String methodName, UnaryCallable inner) {
UnaryCallable traced =
- new TracedUnaryCallable<>(
- inner, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, methodName));
+ new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName));
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
@@ -686,6 +716,10 @@ public UnaryCallable readModifyWriteRowCallable() {
}
//
+ private SpanName getSpanName(String methodName) {
+ return SpanName.of(CLIENT_NAME, methodName);
+ }
+
@Override
public void close() {
for (BackgroundResource backgroundResource : clientContext.getBackgroundResources()) {
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
index 72b22c35e2..eaea47f4ef 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java
@@ -38,6 +38,7 @@
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
+import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracer;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.common.base.MoreObjects;
@@ -154,6 +155,7 @@ public class EnhancedBigtableStubSettings extends StubSettings primedTableIds;
+ private HeaderTracer headerTracer;
private final ServerStreamingCallSettings readRowsSettings;
private final UnaryCallSettings readRowSettings;
@@ -187,6 +189,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
appProfileId = builder.appProfileId;
isRefreshingChannel = builder.isRefreshingChannel;
primedTableIds = builder.primedTableIds;
+ headerTracer = builder.headerTracer;
// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
@@ -231,6 +234,11 @@ public List getPrimedTableIds() {
return primedTableIds;
}
+ /** Gets the tracer for capturing metrics in the header. */
+ HeaderTracer getHeaderTracer() {
+ return headerTracer;
+ }
+
/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
@@ -488,6 +496,7 @@ public static class Builder extends StubSettings.Builder primedTableIds;
+ private HeaderTracer headerTracer;
private final ServerStreamingCallSettings.Builder readRowsSettings;
private final UnaryCallSettings.Builder readRowSettings;
@@ -511,6 +520,7 @@ private Builder() {
this.appProfileId = SERVER_DEFAULT_APP_PROFILE_ID;
this.isRefreshingChannel = false;
primedTableIds = ImmutableList.of();
+ headerTracer = HeaderTracer.newBuilder().build();
setCredentialsProvider(defaultCredentialsProviderBuilder().build());
// Defaults provider
@@ -617,6 +627,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
appProfileId = settings.appProfileId;
isRefreshingChannel = settings.isRefreshingChannel;
primedTableIds = settings.primedTableIds;
+ headerTracer = settings.headerTracer;
// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
@@ -739,6 +750,17 @@ public List getPrimedTableIds() {
return primedTableIds;
}
+ /** Configure the header tracer for surfacing metrics in the header. */
+ Builder setHeaderTracer(HeaderTracer headerTracer) {
+ this.headerTracer = headerTracer;
+ return this;
+ }
+
+ /** Gets the header tracer that'll be used to surface metrics in the header. */
+ HeaderTracer getHeaderTracer() {
+ return headerTracer;
+ }
+
/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder readRowsSettings() {
return readRowsSettings;
@@ -818,6 +840,7 @@ public String toString() {
.add("appProfileId", appProfileId)
.add("isRefreshingChannel", isRefreshingChannel)
.add("primedTableIds", primedTableIds)
+ .add("headerTracer", headerTracer)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracer.java
new file mode 100644
index 0000000000..f3eb0ef1e2
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracer.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.api.core.InternalApi;
+import com.google.auto.value.AutoValue;
+import com.google.common.base.MoreObjects;
+import io.grpc.Metadata;
+import io.opencensus.stats.MeasureMap;
+import io.opencensus.stats.Stats;
+import io.opencensus.stats.StatsRecorder;
+import io.opencensus.tags.TagContextBuilder;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import io.opencensus.tags.Tagger;
+import io.opencensus.tags.Tags;
+import java.util.Collections;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
+
+@InternalApi
+@AutoValue
+public abstract class HeaderTracer {
+
+ private static final Metadata.Key SERVER_TIMING_HEADER_KEY =
+ Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);
+ private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?\\d+)");
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ //
+ public abstract Builder setTagger(@Nonnull Tagger tagger);
+
+ public abstract Builder setStats(@Nonnull StatsRecorder stats);
+
+ public abstract Builder setStatsAttributes(@Nonnull Map statsAttributes);
+
+ abstract HeaderTracer autoBuild();
+
+ public HeaderTracer build() {
+ HeaderTracer headerTracer = autoBuild();
+ return headerTracer;
+ }
+ //
+ }
+
+ public abstract Tagger getTagger();
+
+ public abstract StatsRecorder getStats();
+
+ public abstract Map getStatsAttributes();
+
+ /**
+ * If the header has a server-timing field, extract the metric and publish it to OpenCensus.
+ * Otherwise increment the gfe header missing counter by 1.
+ */
+ public void recordGfeMetadata(@Nonnull Metadata metadata, @Nonnull String spanName) {
+ MeasureMap measures = getStats().newMeasureMap();
+ if (metadata.get(SERVER_TIMING_HEADER_KEY) != null) {
+ String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY);
+ Matcher matcher = SERVER_TIMING_HEADER_PATTERN.matcher(serverTiming);
+ measures.put(RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT, 0L);
+ if (matcher.find()) {
+ long latency = Long.valueOf(matcher.group("dur"));
+ measures.put(RpcMeasureConstants.BIGTABLE_GFE_LATENCY, latency);
+ }
+ } else {
+ measures.put(RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT, 1L);
+ }
+ measures.record(newTagCtxBuilder(spanName).build());
+ }
+
+ public void recordGfeMissingHeader(@Nonnull String spanName) {
+ MeasureMap measures =
+ getStats().newMeasureMap().put(RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT, 1L);
+ measures.record(newTagCtxBuilder(spanName).build());
+ }
+
+ private TagContextBuilder newTagCtxBuilder(String span) {
+ TagContextBuilder tagContextBuilder = getTagger().currentBuilder();
+ if (span != null) {
+ tagContextBuilder.putLocal(RpcMeasureConstants.BIGTABLE_OP, TagValue.create(span));
+ }
+ // Copy client level tags in
+ for (Map.Entry entry : getStatsAttributes().entrySet()) {
+ tagContextBuilder.putLocal(entry.getKey(), entry.getValue());
+ }
+ return tagContextBuilder;
+ }
+
+ public static Builder newBuilder() {
+ return new AutoValue_HeaderTracer.Builder()
+ .setTagger(Tags.getTagger())
+ .setStats(Stats.getStatsRecorder())
+ .setStatsAttributes(Collections.emptyMap());
+ }
+
+ public abstract Builder toBuilder();
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("stats", getStats())
+ .add("tagger", getTagger())
+ .add("statsAttributes", getStatsAttributes())
+ .toString();
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java
new file mode 100644
index 0000000000..fdca9297aa
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.api.core.InternalApi;
+import com.google.api.gax.grpc.GrpcResponseMetadata;
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.StreamController;
+import com.google.common.base.Preconditions;
+import io.grpc.Metadata;
+import javax.annotation.Nonnull;
+
+/**
+ * This callable will inject a {@link GrpcResponseMetadata} to access the headers and trailers
+ * returned by gRPC methods upon completion. The {@link HeaderTracer} will process metrics that were
+ * injected in the header/trailer and publish them to OpenCensus. If {@link
+ * GrpcResponseMetadata#getMetadata()} returned null, it probably means that the request has never
+ * reached GFE, and it'll increment the gfe_header_missing_counter in this case.
+ *
+ * If GFE metrics are not registered in {@link RpcViews}, skip injecting GrpcResponseMetadata.
+ * This is for the case where direct path is enabled, all the requests won't go through GFE and
+ * therefore won't have the server-timing header.
+ *
+ *
This class is considered an internal implementation detail and not meant to be used by
+ * applications.
+ */
+@InternalApi
+public class HeaderTracerStreamingCallable
+ extends ServerStreamingCallable {
+
+ private final ServerStreamingCallable innerCallable;
+ private final HeaderTracer headerTracer;
+ private final String spanName;
+
+ public HeaderTracerStreamingCallable(
+ @Nonnull ServerStreamingCallable callable,
+ @Nonnull HeaderTracer headerTracer,
+ @Nonnull String spanName) {
+ this.innerCallable = Preconditions.checkNotNull(callable, "Inner callable must be set");
+ this.headerTracer = Preconditions.checkNotNull(headerTracer, "HeaderTracer must be set");
+ this.spanName = Preconditions.checkNotNull(spanName, "Span name must be set");
+ }
+
+ @Override
+ public void call(
+ RequestT request, ResponseObserver responseObserver, ApiCallContext context) {
+ final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
+ if (RpcViews.isGfeMetricsRegistered()) {
+ HeaderTracerResponseObserver innerObserver =
+ new HeaderTracerResponseObserver<>(
+ responseObserver, headerTracer, responseMetadata, spanName);
+ innerCallable.call(request, innerObserver, responseMetadata.addHandlers(context));
+ } else {
+ innerCallable.call(request, responseObserver, context);
+ }
+ }
+
+ private class HeaderTracerResponseObserver implements ResponseObserver {
+
+ private ResponseObserver outerObserver;
+ private HeaderTracer headerTracer;
+ private GrpcResponseMetadata responseMetadata;
+ private String spanName;
+
+ HeaderTracerResponseObserver(
+ ResponseObserver observer,
+ HeaderTracer headerTracer,
+ GrpcResponseMetadata metadata,
+ String spanName) {
+ this.outerObserver = observer;
+ this.headerTracer = headerTracer;
+ this.responseMetadata = metadata;
+ this.spanName = spanName;
+ }
+
+ @Override
+ public void onStart(final StreamController controller) {
+ outerObserver.onStart(controller);
+ }
+
+ @Override
+ public void onResponse(ResponseT response) {
+ outerObserver.onResponse(response);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // server-timing metric will be added through GrpcResponseMetadata#onHeaders(Metadata),
+ // so it's not checking trailing metadata here.
+ Metadata metadata = responseMetadata.getMetadata();
+ if (metadata != null) {
+ headerTracer.recordGfeMetadata(metadata, spanName);
+ } else {
+ headerTracer.recordGfeMissingHeader(spanName);
+ }
+ outerObserver.onError(t);
+ }
+
+ @Override
+ public void onComplete() {
+ Metadata metadata = responseMetadata.getMetadata();
+ if (metadata != null) {
+ headerTracer.recordGfeMetadata(metadata, spanName);
+ } else {
+ headerTracer.recordGfeMissingHeader(spanName);
+ }
+ outerObserver.onComplete();
+ }
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable.java
new file mode 100644
index 0000000000..17d84b2a2e
--- /dev/null
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.InternalApi;
+import com.google.api.gax.grpc.GrpcResponseMetadata;
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.grpc.Metadata;
+import javax.annotation.Nonnull;
+
+/**
+ * This callable will inject a {@link GrpcResponseMetadata} to access the headers and trailers
+ * returned by gRPC methods upon completion. The {@link HeaderTracer} will process metrics that were
+ * injected in the header/trailer and publish them to OpenCensus. If {@link
+ * GrpcResponseMetadata#getMetadata()} returned null, it probably means that the request has never
+ * reached GFE, and it'll increment the gfe_header_missing_counter in this case.
+ *
+ * If GFE metrics are not registered in {@link RpcViews}, skip injecting GrpcResponseMetadata.
+ * This is for the case where direct path is enabled, all the requests won't go through GFE and
+ * therefore won't have the server-timing header.
+ *
+ *
This class is considered an internal implementation detail and not meant to be used by
+ * applications.
+ */
+@InternalApi
+public class HeaderTracerUnaryCallable
+ extends UnaryCallable {
+
+ private final UnaryCallable innerCallable;
+ private final HeaderTracer headerTracer;
+ private final String spanName;
+
+ public HeaderTracerUnaryCallable(
+ @Nonnull UnaryCallable innerCallable,
+ @Nonnull HeaderTracer headerTracer,
+ @Nonnull String spanName) {
+ this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set");
+ this.headerTracer = Preconditions.checkNotNull(headerTracer, "HeaderTracer must be set");
+ this.spanName = Preconditions.checkNotNull(spanName, "Span name must be set");
+ }
+
+ @Override
+ public ApiFuture futureCall(RequestT request, ApiCallContext context) {
+ if (RpcViews.isGfeMetricsRegistered()) {
+ final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
+ ApiFuture future =
+ innerCallable.futureCall(request, responseMetadata.addHandlers(context));
+ future.addListener(
+ new Runnable() {
+ @Override
+ public void run() {
+ Metadata metadata = responseMetadata.getMetadata();
+ if (metadata != null) {
+ headerTracer.recordGfeMetadata(metadata, spanName);
+ } else {
+ headerTracer.recordGfeMissingHeader(spanName);
+ }
+ }
+ },
+ MoreExecutors.directExecutor());
+ return future;
+ } else {
+ return innerCallable.futureCall(request, context);
+ }
+ }
+}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java
index 8c6e347a0f..e6e5c70db1 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java
@@ -74,4 +74,18 @@ public class RpcMeasureConstants {
"cloud.google.com/java/bigtable/read_rows_first_row_latency",
"Time between request being sent to the first row received",
MILLISECOND);
+
+ /** GFE t4t7 latency extracted from server-timing header. */
+ public static final MeasureLong BIGTABLE_GFE_LATENCY =
+ MeasureLong.create(
+ "cloud.google.com/java/bigtable/gfe_latency",
+ "Latency between Google's network receives an RPC and reads back the first byte of the response",
+ MILLISECOND);
+
+ /** Number of responses without the server-timing header. */
+ public static final MeasureLong BIGTABLE_GFE_HEADER_MISSING_COUNT =
+ MeasureLong.create(
+ "cloud.google.com/java/bigtable/gfe_header_missing_count",
+ "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network",
+ COUNT);
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java
index d21060c4ac..8a14c01b13 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java
@@ -17,6 +17,8 @@
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT;
+import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_LATENCY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_INSTANCE_ID;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP_ATTEMPT_COUNT;
@@ -29,6 +31,7 @@
import io.opencensus.stats.Aggregation;
import io.opencensus.stats.Aggregation.Count;
import io.opencensus.stats.Aggregation.Distribution;
+import io.opencensus.stats.Aggregation.Sum;
import io.opencensus.stats.BucketBoundaries;
import io.opencensus.stats.View;
import java.util.Arrays;
@@ -36,6 +39,7 @@
class RpcViewConstants {
// Aggregations
private static final Aggregation COUNT = Count.create();
+ private static final Aggregation SUM = Sum.create();
private static final Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM =
Distribution.create(
@@ -124,4 +128,22 @@ class RpcViewConstants {
BIGTABLE_APP_PROFILE_ID,
BIGTABLE_OP,
BIGTABLE_STATUS));
+
+ static final View BIGTABLE_GFE_LATENCY_VIEW =
+ View.create(
+ View.Name.create("cloud.google.com/java/bigtable/gfe_latency"),
+ "Latency between Google's network receives an RPC and reads back the first byte of the response",
+ BIGTABLE_GFE_LATENCY,
+ AGGREGATION_WITH_MILLIS_HISTOGRAM,
+ ImmutableList.of(
+ BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP));
+
+ static final View BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW =
+ View.create(
+ View.Name.create("cloud.google.com/java/bigtable/gfe_header_missing_count"),
+ "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network",
+ BIGTABLE_GFE_HEADER_MISSING_COUNT,
+ SUM,
+ ImmutableList.of(
+ BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP));
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java
index cc31539496..9e8f6084a2 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java
@@ -33,15 +33,49 @@ public class RpcViews {
RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW,
RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW);
+ private static final ImmutableSet GFE_VIEW_SET =
+ ImmutableSet.of(
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW);
+
+ private static boolean gfeMetricsRegistered = false;
+
/** Registers all Bigtable specific views. */
public static void registerBigtableClientViews() {
registerBigtableClientViews(Stats.getViewManager());
}
+ /**
+ * Register views for GFE metrics, including gfe_latency and gfe_header_missing_count. gfe_latency
+ * measures the latency between Google's network receives an RPC and reads back the first byte of
+ * the response. gfe_header_missing_count is a counter of the number of RPC responses without a
+ * server-timing header.
+ */
+ public static void registerBigtableClientGfeViews() {
+ registerBigtableClientGfeViews(Stats.getViewManager());
+ }
+
@VisibleForTesting
static void registerBigtableClientViews(ViewManager viewManager) {
for (View view : BIGTABLE_CLIENT_VIEWS_SET) {
viewManager.registerView(view);
}
}
+
+ @VisibleForTesting
+ static void registerBigtableClientGfeViews(ViewManager viewManager) {
+ for (View view : GFE_VIEW_SET) {
+ viewManager.registerView(view);
+ }
+ gfeMetricsRegistered = true;
+ }
+
+ static boolean isGfeMetricsRegistered() {
+ return gfeMetricsRegistered;
+ }
+
+ @VisibleForTesting
+ static void setGfeMetricsRegistered(boolean gfeMetricsRegistered) {
+ RpcViews.gfeMetricsRegistered = gfeMetricsRegistered;
+ }
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java
index 2cd55a311c..10ba675826 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java
@@ -32,6 +32,7 @@
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
+import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracer;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Range;
@@ -75,6 +76,7 @@ public void settingsAreNotLostTest() {
CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class);
WatchdogProvider watchdogProvider = Mockito.mock(WatchdogProvider.class);
Duration watchdogInterval = Duration.ofSeconds(12);
+ HeaderTracer headerTracer = Mockito.mock(HeaderTracer.class);
EnhancedBigtableStubSettings.Builder builder =
EnhancedBigtableStubSettings.newBuilder()
@@ -85,7 +87,8 @@ public void settingsAreNotLostTest() {
.setEndpoint(endpoint)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
- .setStreamWatchdogCheckInterval(watchdogInterval);
+ .setStreamWatchdogCheckInterval(watchdogInterval)
+ .setHeaderTracer(headerTracer);
verifyBuilder(
builder,
@@ -96,7 +99,8 @@ public void settingsAreNotLostTest() {
endpoint,
credentialsProvider,
watchdogProvider,
- watchdogInterval);
+ watchdogInterval,
+ headerTracer);
verifySettings(
builder.build(),
projectId,
@@ -106,7 +110,8 @@ public void settingsAreNotLostTest() {
endpoint,
credentialsProvider,
watchdogProvider,
- watchdogInterval);
+ watchdogInterval,
+ headerTracer);
verifyBuilder(
builder.build().toBuilder(),
projectId,
@@ -116,7 +121,8 @@ public void settingsAreNotLostTest() {
endpoint,
credentialsProvider,
watchdogProvider,
- watchdogInterval);
+ watchdogInterval,
+ headerTracer);
}
private void verifyBuilder(
@@ -128,7 +134,8 @@ private void verifyBuilder(
String endpoint,
CredentialsProvider credentialsProvider,
WatchdogProvider watchdogProvider,
- Duration watchdogInterval) {
+ Duration watchdogInterval,
+ HeaderTracer headerTracer) {
assertThat(builder.getProjectId()).isEqualTo(projectId);
assertThat(builder.getInstanceId()).isEqualTo(instanceId);
assertThat(builder.getAppProfileId()).isEqualTo(appProfileId);
@@ -137,6 +144,7 @@ private void verifyBuilder(
assertThat(builder.getCredentialsProvider()).isEqualTo(credentialsProvider);
assertThat(builder.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider);
assertThat(builder.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval);
+ assertThat(builder.getHeaderTracer()).isEqualTo(headerTracer);
}
private void verifySettings(
@@ -148,7 +156,8 @@ private void verifySettings(
String endpoint,
CredentialsProvider credentialsProvider,
WatchdogProvider watchdogProvider,
- Duration watchdogInterval) {
+ Duration watchdogInterval,
+ HeaderTracer headerTracer) {
assertThat(settings.getProjectId()).isEqualTo(projectId);
assertThat(settings.getInstanceId()).isEqualTo(instanceId);
assertThat(settings.getAppProfileId()).isEqualTo(appProfileId);
@@ -157,6 +166,7 @@ private void verifySettings(
assertThat(settings.getCredentialsProvider()).isEqualTo(credentialsProvider);
assertThat(settings.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider);
assertThat(settings.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval);
+ assertThat(settings.getHeaderTracer()).isEqualTo(headerTracer);
}
@Test
@@ -622,12 +632,26 @@ public void isRefreshingChannelFalseValueTest() {
assertThat(builder.build().toBuilder().isRefreshingChannel()).isFalse();
}
+ @Test
+ public void verifyDefaultHeaderTracerNotNullTest() {
+ String dummyProjectId = "my-project";
+ String dummyInstanceId = "my-instance";
+ EnhancedBigtableStubSettings.Builder builder =
+ EnhancedBigtableStubSettings.newBuilder()
+ .setProjectId(dummyProjectId)
+ .setInstanceId(dummyInstanceId);
+ assertThat(builder.getHeaderTracer()).isNotNull();
+ assertThat(builder.build().getHeaderTracer()).isNotNull();
+ assertThat(builder.build().toBuilder().getHeaderTracer()).isNotNull();
+ }
+
static final String[] SETTINGS_LIST = {
"projectId",
"instanceId",
"appProfileId",
"isRefreshingChannel",
"primedTableIds",
+ "headerTracer",
"readRowsSettings",
"readRowSettings",
"sampleRowKeysSettings",
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerCallableTest.java
new file mode 100644
index 0000000000..9538b6a135
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerCallableTest.java
@@ -0,0 +1,428 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.api.gax.rpc.ClientContext;
+import com.google.api.gax.rpc.UnavailableException;
+import com.google.bigtable.v2.BigtableGrpc.BigtableImplBase;
+import com.google.bigtable.v2.CheckAndMutateRowRequest;
+import com.google.bigtable.v2.CheckAndMutateRowResponse;
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.MutateRowsRequest;
+import com.google.bigtable.v2.MutateRowsResponse;
+import com.google.bigtable.v2.ReadModifyWriteRowRequest;
+import com.google.bigtable.v2.ReadModifyWriteRowResponse;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.ReadRowsResponse;
+import com.google.bigtable.v2.SampleRowKeysRequest;
+import com.google.bigtable.v2.SampleRowKeysResponse;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.FakeServiceHelper;
+import com.google.cloud.bigtable.data.v2.internal.NameUtil;
+import com.google.cloud.bigtable.data.v2.models.BulkMutation;
+import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
+import com.google.cloud.bigtable.data.v2.models.Mutation;
+import com.google.cloud.bigtable.data.v2.models.Query;
+import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
+import com.google.cloud.bigtable.data.v2.models.RowMutation;
+import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
+import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
+import com.google.common.collect.ImmutableMap;
+import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import io.opencensus.impl.stats.StatsComponentImpl;
+import io.opencensus.stats.StatsComponent;
+import io.opencensus.stats.ViewData;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import io.opencensus.tags.Tags;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class HeaderTracerCallableTest {
+ private FakeServiceHelper serviceHelper;
+ private FakeServiceHelper serviceHelperNoHeader;
+
+ private FakeService fakeService = new FakeService();
+
+ private final StatsComponent localStats = new StatsComponentImpl();
+ private EnhancedBigtableStub stub;
+ private EnhancedBigtableStub noHeaderStub;
+ private int attempts;
+
+ private static final String PROJECT_ID = "fake-project";
+ private static final String INSTANCE_ID = "fake-instance";
+ private static final String APP_PROFILE_ID = "default";
+ private static final String TABLE_ID = "fake-table";
+
+ private static final long WAIT_FOR_METRICS_TIME_MS = 1_000;
+
+ private AtomicInteger fakeServerTiming;
+
+ @Before
+ public void setUp() throws Exception {
+ RpcViews.registerBigtableClientGfeViews(localStats.getViewManager());
+
+ // Create a server that'll inject a server-timing header with a random number and a stub that
+ // connects to this server.
+ fakeServerTiming = new AtomicInteger(new Random().nextInt(1000) + 1);
+ serviceHelper =
+ new FakeServiceHelper(
+ new ServerInterceptor() {
+ @Override
+ public ServerCall.Listener interceptCall(
+ ServerCall serverCall,
+ Metadata metadata,
+ ServerCallHandler serverCallHandler) {
+ return serverCallHandler.startCall(
+ new SimpleForwardingServerCall(serverCall) {
+ @Override
+ public void sendHeaders(Metadata headers) {
+ headers.put(
+ Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER),
+ String.format("gfet4t7; dur=%d", fakeServerTiming.get()));
+ super.sendHeaders(headers);
+ }
+ },
+ metadata);
+ }
+ },
+ fakeService);
+ serviceHelper.start();
+
+ BigtableDataSettings settings =
+ BigtableDataSettings.newBuilderForEmulator(serviceHelper.getPort())
+ .setProjectId(PROJECT_ID)
+ .setInstanceId(INSTANCE_ID)
+ .setAppProfileId(APP_PROFILE_ID)
+ .build();
+ EnhancedBigtableStubSettings stubSettings =
+ EnhancedBigtableStub.finalizeSettings(
+ settings.getStubSettings(), Tags.getTagger(), localStats.getStatsRecorder());
+ attempts = stubSettings.readRowsSettings().getRetrySettings().getMaxAttempts();
+ stub = new EnhancedBigtableStub(stubSettings, ClientContext.create(stubSettings));
+
+ // Create another server without injecting the server-timing header and another stub that
+ // connects to it.
+ serviceHelperNoHeader = new FakeServiceHelper(fakeService);
+ serviceHelperNoHeader.start();
+
+ BigtableDataSettings noHeaderSettings =
+ BigtableDataSettings.newBuilderForEmulator(serviceHelperNoHeader.getPort())
+ .setProjectId(PROJECT_ID)
+ .setInstanceId(INSTANCE_ID)
+ .setAppProfileId(APP_PROFILE_ID)
+ .build();
+ EnhancedBigtableStubSettings noHeaderStubSettings =
+ EnhancedBigtableStub.finalizeSettings(
+ noHeaderSettings.getStubSettings(), Tags.getTagger(), localStats.getStatsRecorder());
+ noHeaderStub =
+ new EnhancedBigtableStub(noHeaderStubSettings, ClientContext.create(noHeaderStubSettings));
+ }
+
+ @After
+ public void tearDown() {
+ stub.close();
+ noHeaderStub.close();
+ serviceHelper.shutdown();
+ serviceHelperNoHeader.shutdown();
+ }
+
+ @Test
+ public void testGFELatencyMetricReadRows() throws InterruptedException {
+ stub.readRowsCallable().call(Query.create(TABLE_ID));
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+
+ long latency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+
+ assertThat(latency).isEqualTo(fakeServerTiming.get());
+ }
+
+ @Test
+ public void testGFELatencyMetricMutateRow() throws InterruptedException {
+ stub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "fake-key"));
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+
+ long latency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRow")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+
+ assertThat(latency).isEqualTo(fakeServerTiming.get());
+ }
+
+ @Test
+ public void testGFELatencyMetricMutateRows() throws InterruptedException {
+ BulkMutation mutations =
+ BulkMutation.create(TABLE_ID)
+ .add("key", Mutation.create().setCell("fake-family", "fake-qualifier", "fake-value"));
+ stub.bulkMutateRowsCallable().call(mutations);
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+
+ long latency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRows")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+
+ assertThat(latency).isEqualTo(fakeServerTiming.get());
+ }
+
+ @Test
+ public void testGFELatencySampleRowKeys() throws InterruptedException {
+ stub.sampleRowKeysCallable().call(TABLE_ID);
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+ long latency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.SampleRowKeys")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(latency).isEqualTo(fakeServerTiming.get());
+ }
+
+ @Test
+ public void testGFELatencyCheckAndMutateRow() throws InterruptedException {
+ ConditionalRowMutation mutation =
+ ConditionalRowMutation.create(TABLE_ID, "fake-key")
+ .then(Mutation.create().setCell("fake-family", "fake-qualifier", "fake-value"));
+ stub.checkAndMutateRowCallable().call(mutation);
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+ long latency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.CheckAndMutateRow")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(latency).isEqualTo(fakeServerTiming.get());
+ }
+
+ @Test
+ public void testGFELatencyReadModifyWriteRow() throws InterruptedException {
+ ReadModifyWriteRow request =
+ ReadModifyWriteRow.create(TABLE_ID, "fake-key")
+ .append("fake-family", "fake-qualifier", "suffix");
+ stub.readModifyWriteRowCallable().call(request);
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+ long latency =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadModifyWriteRow")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(latency).isEqualTo(fakeServerTiming.get());
+ }
+
+ @Test
+ public void testGFEMissingHeaderMetric() throws InterruptedException {
+ // Make a few calls to the server which will inject the server-timing header and the counter
+ // should be 0.
+ stub.readRowsCallable().call(Query.create(TABLE_ID));
+ stub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "key"));
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+ long mutateRowMissingCount =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
+ ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRow")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ long readRowsMissingCount =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+
+ assertThat(mutateRowMissingCount).isEqualTo(0);
+ assertThat(readRowsMissingCount).isEqualTo(0);
+
+ // Make a few more calls to the server which won't add the header and the counter should match
+ // the number of requests sent.
+ int readRowsCalls = new Random().nextInt(10) + 1;
+ int mutateRowCalls = new Random().nextInt(10) + 1;
+ for (int i = 0; i < mutateRowCalls; i++) {
+ noHeaderStub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "fake-key" + i));
+ }
+ for (int i = 0; i < readRowsCalls; i++) {
+ noHeaderStub.readRowsCallable().call(Query.create(TABLE_ID));
+ }
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+
+ mutateRowMissingCount =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
+ ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRow")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ readRowsMissingCount =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
+ ImmutableMap.of(
+ RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+
+ assertThat(mutateRowMissingCount).isEqualTo(mutateRowCalls);
+ assertThat(readRowsMissingCount).isEqualTo(readRowsCalls);
+ }
+
+ @Test
+ public void testMetricsWithErrorResponse() throws InterruptedException {
+ try {
+ stub.readRowsCallable().call(Query.create("random-table-id")).iterator().next();
+ fail("readrows should throw exception");
+ } catch (Exception e) {
+ assertThat(e).isInstanceOf(UnavailableException.class);
+ }
+
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+ long missingCount =
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
+ RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
+ ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
+ assertThat(missingCount).isEqualTo(attempts);
+ }
+
+ @Test
+ public void testCallableBypassed() throws InterruptedException {
+ RpcViews.setGfeMetricsRegistered(false);
+ stub.readRowsCallable().call(Query.create(TABLE_ID));
+ Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
+ ViewData headerMissingView =
+ localStats
+ .getViewManager()
+ .getView(RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW.getName());
+ ViewData latencyView =
+ localStats.getViewManager().getView(RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW.getName());
+ // Verify that the view is registered by it's not collecting metrics
+ assertThat(headerMissingView).isNotNull();
+ assertThat(latencyView).isNotNull();
+ assertThat(headerMissingView.getAggregationMap()).isEmpty();
+ assertThat(latencyView.getAggregationMap()).isEmpty();
+ }
+
+ private class FakeService extends BigtableImplBase {
+ private final String defaultTableName =
+ NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID);
+
+ @Override
+ public void readRows(ReadRowsRequest request, StreamObserver observer) {
+ if (!request.getTableName().equals(defaultTableName)) {
+ observer.onError(new StatusRuntimeException(Status.UNAVAILABLE));
+ return;
+ }
+ observer.onNext(ReadRowsResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void mutateRow(MutateRowRequest request, StreamObserver observer) {
+ observer.onNext(MutateRowResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void mutateRows(MutateRowsRequest request, StreamObserver observer) {
+ observer.onNext(MutateRowsResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void sampleRowKeys(
+ SampleRowKeysRequest request, StreamObserver observer) {
+ observer.onNext(SampleRowKeysResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void checkAndMutateRow(
+ CheckAndMutateRowRequest request, StreamObserver observer) {
+ observer.onNext(CheckAndMutateRowResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+
+ @Override
+ public void readModifyWriteRow(
+ ReadModifyWriteRowRequest request, StreamObserver observer) {
+ observer.onNext(ReadModifyWriteRowResponse.getDefaultInstance());
+ observer.onCompleted();
+ }
+ }
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerTest.java
new file mode 100644
index 0000000000..30e24dbfe3
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.metrics;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.collect.ImmutableMap;
+import io.opencensus.impl.stats.StatsComponentImpl;
+import io.opencensus.stats.StatsComponent;
+import io.opencensus.stats.StatsRecorder;
+import io.opencensus.tags.TagKey;
+import io.opencensus.tags.TagValue;
+import io.opencensus.tags.Tagger;
+import java.util.Map;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+@RunWith(JUnit4.class)
+public class HeaderTracerTest {
+
+ private final StatsComponent localStats = new StatsComponentImpl();
+
+ @Test
+ public void testDefaultBuilder() {
+ HeaderTracer.Builder builder = HeaderTracer.newBuilder();
+ HeaderTracer tracer = builder.build();
+ assertThat(tracer.getStats()).isNotNull();
+ assertThat(tracer.getTagger()).isNotNull();
+ assertThat(tracer.getStatsAttributes()).isNotNull();
+ assertThat(tracer.getStatsAttributes()).isEmpty();
+ }
+
+ @Test
+ public void testBuilder() {
+ HeaderTracer.Builder builder = HeaderTracer.newBuilder();
+ Map attrs =
+ ImmutableMap.of(TagKey.create("fake-key"), TagValue.create("fake-value"));
+ Tagger tagger = Mockito.mock(Tagger.class);
+ StatsRecorder stats = localStats.getStatsRecorder();
+ builder.setStats(stats).setStatsAttributes(attrs).setTagger(tagger);
+ HeaderTracer headerTracer = builder.build();
+ assertThat(headerTracer.getStats()).isEqualTo(stats);
+ assertThat(headerTracer.getTagger()).isEqualTo(tagger);
+ assertThat(headerTracer.getStatsAttributes()).isEqualTo(attrs);
+ }
+
+ @Test
+ public void testToBuilder() {
+ HeaderTracer.Builder builder = HeaderTracer.newBuilder();
+ Map attrs =
+ ImmutableMap.of(TagKey.create("fake-key"), TagValue.create("fake-value"));
+ Tagger tagger = Mockito.mock(Tagger.class);
+ StatsRecorder stats = localStats.getStatsRecorder();
+ builder.setStats(stats).setStatsAttributes(attrs).setTagger(tagger);
+ HeaderTracer headerTracer = builder.build();
+
+ HeaderTracer.Builder newBuilder = headerTracer.toBuilder();
+ assertThat(newBuilder.build().getStats()).isEqualTo(stats);
+ assertThat(newBuilder.build().getTagger()).isEqualTo(tagger);
+ assertThat(newBuilder.build().getStatsAttributes()).isEqualTo(attrs);
+ }
+}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java
index 4b025303e4..56d65f8298 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java
@@ -39,24 +39,10 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
-import io.opencensus.common.Function;
import io.opencensus.impl.stats.StatsComponentImpl;
-import io.opencensus.stats.AggregationData;
-import io.opencensus.stats.AggregationData.CountData;
-import io.opencensus.stats.AggregationData.DistributionData;
-import io.opencensus.stats.AggregationData.LastValueDataDouble;
-import io.opencensus.stats.AggregationData.LastValueDataLong;
-import io.opencensus.stats.AggregationData.SumDataDouble;
-import io.opencensus.stats.AggregationData.SumDataLong;
-import io.opencensus.stats.View;
-import io.opencensus.stats.ViewData;
import io.opencensus.tags.TagKey;
import io.opencensus.tags.TagValue;
import io.opencensus.tags.Tags;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
@@ -117,7 +103,6 @@ public void setUp() throws Exception {
EnhancedBigtableStubSettings stubSettings =
EnhancedBigtableStub.finalizeSettings(
settings.getStubSettings(), Tags.getTagger(), localStats.getStatsRecorder());
-
stub = new EnhancedBigtableStub(stubSettings, ClientContext.create(stubSettings));
}
@@ -155,11 +140,15 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(100);
long opLatency =
- getAggregationValueAsLong(
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
RpcViewConstants.BIGTABLE_OP_LATENCY_VIEW,
ImmutableMap.of(
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
- RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")));
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
assertThat(opLatency).isIn(Range.closed(sleepTime, elapsed));
}
@@ -187,11 +176,15 @@ public Object answer(InvocationOnMock invocation) {
Thread.sleep(100);
long opLatency =
- getAggregationValueAsLong(
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW,
ImmutableMap.of(
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
- RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")));
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
assertThat(opLatency).isEqualTo(2);
}
@@ -225,9 +218,13 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(100);
long firstRowLatency =
- getAggregationValueAsLong(
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW,
- ImmutableMap.of());
+ ImmutableMap.of(),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
// adding buffer time to the upper range to allow for a race between the emulator and the client
// recording the duration
@@ -267,11 +264,15 @@ public Object answer(InvocationOnMock invocation) {
Thread.sleep(100);
long opLatency =
- getAggregationValueAsLong(
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW,
ImmutableMap.of(
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
- RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")));
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
assertThat(opLatency).isEqualTo(2);
}
@@ -312,11 +313,15 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(100);
long attemptLatency =
- getAggregationValueAsLong(
+ StatsTestUtils.getAggregationValueAsLong(
+ localStats,
RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW,
ImmutableMap.of(
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
- RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")));
+ RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
+ PROJECT_ID,
+ INSTANCE_ID,
+ APP_PROFILE_ID);
// Average attempt latency will be just a single wait (as opposed to op latency which will be 2x
// sleeptime)
assertThat(attemptLatency).isIn(Range.closed(sleepTime, elapsed - sleepTime));
@@ -326,70 +331,4 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
private static StreamObserver anyObserver(Class returnType) {
return (StreamObserver) any(returnType);
}
-
- private long getAggregationValueAsLong(View view, ImmutableMap tags) {
- ViewData viewData = localStats.getViewManager().getView(view.getName());
- Map, AggregationData> aggregationMap =
- Objects.requireNonNull(viewData).getAggregationMap();
-
- List tagValues = new ArrayList<>();
-
- for (TagKey column : view.getColumns()) {
- if (RpcMeasureConstants.BIGTABLE_PROJECT_ID == column) {
- tagValues.add(TagValue.create(PROJECT_ID));
- } else if (RpcMeasureConstants.BIGTABLE_INSTANCE_ID == column) {
- tagValues.add(TagValue.create(INSTANCE_ID));
- } else if (RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID == column) {
- tagValues.add(TagValue.create(APP_PROFILE_ID));
- } else {
- tagValues.add(tags.get(column));
- }
- }
-
- AggregationData aggregationData = aggregationMap.get(tagValues);
-
- return aggregationData.match(
- new Function() {
- @Override
- public Long apply(SumDataDouble arg) {
- return (long) arg.getSum();
- }
- },
- new Function() {
- @Override
- public Long apply(SumDataLong arg) {
- return arg.getSum();
- }
- },
- new Function() {
- @Override
- public Long apply(CountData arg) {
- return arg.getCount();
- }
- },
- new Function() {
- @Override
- public Long apply(DistributionData arg) {
- return (long) arg.getMean();
- }
- },
- new Function() {
- @Override
- public Long apply(LastValueDataDouble arg) {
- return (long) arg.getLastValue();
- }
- },
- new Function() {
- @Override
- public Long apply(LastValueDataLong arg) {
- return arg.getLastValue();
- }
- },
- new Function() {
- @Override
- public Long apply(AggregationData arg) {
- throw new UnsupportedOperationException();
- }
- });
- }
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java
index ff37e75a87..6aede96161 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java
@@ -23,9 +23,13 @@
import com.google.common.collect.Maps;
import io.grpc.Context;
import io.opencensus.common.Scope;
+import io.opencensus.stats.AggregationData;
import io.opencensus.stats.Measure;
import io.opencensus.stats.MeasureMap;
+import io.opencensus.stats.StatsComponent;
import io.opencensus.stats.StatsRecorder;
+import io.opencensus.stats.View;
+import io.opencensus.stats.ViewData;
import io.opencensus.tags.Tag;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.TagContextBuilder;
@@ -35,9 +39,12 @@
import io.opencensus.tags.TagValue;
import io.opencensus.tags.Tagger;
import io.opencensus.tags.unsafe.ContextUtils;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -264,4 +271,76 @@ private static ImmutableMap getTags(TagContext tags) {
? ((FakeTagContext) tags).getTags()
: ImmutableMap.of();
}
+
+ public static long getAggregationValueAsLong(
+ StatsComponent stats,
+ View view,
+ ImmutableMap tags,
+ String projectId,
+ String instanceId,
+ String appProfileId) {
+ ViewData viewData = stats.getViewManager().getView(view.getName());
+ Map, AggregationData> aggregationMap =
+ Objects.requireNonNull(viewData).getAggregationMap();
+
+ List tagValues = new ArrayList<>();
+
+ for (TagKey column : view.getColumns()) {
+ if (RpcMeasureConstants.BIGTABLE_PROJECT_ID == column) {
+ tagValues.add(TagValue.create(projectId));
+ } else if (RpcMeasureConstants.BIGTABLE_INSTANCE_ID == column) {
+ tagValues.add(TagValue.create(instanceId));
+ } else if (RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID == column) {
+ tagValues.add(TagValue.create(appProfileId));
+ } else {
+ tagValues.add(tags.get(column));
+ }
+ }
+
+ AggregationData aggregationData = aggregationMap.get(tagValues);
+
+ return aggregationData.match(
+ new io.opencensus.common.Function() {
+ @Override
+ public Long apply(AggregationData.SumDataDouble arg) {
+ return (long) arg.getSum();
+ }
+ },
+ new io.opencensus.common.Function() {
+ @Override
+ public Long apply(AggregationData.SumDataLong arg) {
+ return arg.getSum();
+ }
+ },
+ new io.opencensus.common.Function() {
+ @Override
+ public Long apply(AggregationData.CountData arg) {
+ return arg.getCount();
+ }
+ },
+ new io.opencensus.common.Function() {
+ @Override
+ public Long apply(AggregationData.DistributionData arg) {
+ return (long) arg.getMean();
+ }
+ },
+ new io.opencensus.common.Function() {
+ @Override
+ public Long apply(AggregationData.LastValueDataDouble arg) {
+ return (long) arg.getLastValue();
+ }
+ },
+ new io.opencensus.common.Function() {
+ @Override
+ public Long apply(AggregationData.LastValueDataLong arg) {
+ return arg.getLastValue();
+ }
+ },
+ new io.opencensus.common.Function() {
+ @Override
+ public Long apply(AggregationData arg) {
+ throw new UnsupportedOperationException();
+ }
+ });
+ }
}