diff --git a/README.md b/README.md index 14db08e21..8f58eac54 100644 --- a/README.md +++ b/README.md @@ -262,12 +262,22 @@ metrics will be tagged with: each client RPC, tagged by operation name and the attempt status. Under normal circumstances, this will be identical to op_latency. However, when the client receives transient errors, op_latency will be the sum of all attempt_latencies - and the exponential delays + and the exponential delays. * `cloud.google.com/java/bigtable/attempts_per_op`: A distribution of attempts that each operation required, tagged by operation name and final operation status. Under normal circumstances, this will be 1. +### GFE metric views: + +* `cloud.google.com/java/bigtable/gfe_latency`: A distribution of the latency + between Google's network receives an RPC and reads back the first byte of + the response. + +* `cloud.google.com/java/bigtable/gfe_header_missing_count`: A counter of the + number of RPC responses received without the server-timing header, which + indicates that the request probably never reached Google's network. + By default, the functionality is disabled. For example to enable metrics using [Google Stackdriver](https://cloud.google.com/monitoring/docs/): @@ -323,6 +333,8 @@ StackdriverStatsExporter.createAndRegister( ); BigtableDataSettings.enableOpenCensusStats(); +// Enable GFE metric views +BigtableDataSettings.enableGfeOpenCensusStats(); ``` ## Version Conflicts diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java index f4d63b5ec..3002aa611 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java @@ -175,6 +175,20 @@ public static void enableOpenCensusStats() { // io.opencensus.contrib.grpc.metrics.RpcViews.registerClientGrpcBasicViews(); } + /** + * Enables OpenCensus GFE metric aggregations. + * + *

This will register views for gfe_latency and gfe_header_missing_count metrics. + * + *

gfe_latency measures the latency between Google's network receives an RPC and reads back the + * first byte of the response. gfe_header_missing_count is a counter of the number of RPC + * responses received without the server-timing header. + */ + @BetaApi("OpenCensus stats integration is currently unstable and may change in the future") + public static void enableGfeOpenCensusStats() { + com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientGfeViews(); + } + /** Returns the target project id. */ public String getProjectId() { return stubSettings.getProjectId(); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index d729d6244..448390396 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -66,6 +66,8 @@ import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory; +import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable; +import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory; import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants; import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable; @@ -162,6 +164,15 @@ public static EnhancedBigtableStubSettings finalizeSettings( .build()); } + ImmutableMap attributes = + ImmutableMap.builder() + .put(RpcMeasureConstants.BIGTABLE_PROJECT_ID, TagValue.create(settings.getProjectId())) + .put( + RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(settings.getInstanceId())) + .put( + RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, + TagValue.create(settings.getAppProfileId())) + .build(); // Inject Opencensus instrumentation builder.setTracerFactory( new CompositeTracerFactory( @@ -187,23 +198,17 @@ public static EnhancedBigtableStubSettings finalizeSettings( GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class)) .build()), // Add OpenCensus Metrics - MetricsTracerFactory.create( - tagger, - stats, - ImmutableMap.builder() - .put( - RpcMeasureConstants.BIGTABLE_PROJECT_ID, - TagValue.create(settings.getProjectId())) - .put( - RpcMeasureConstants.BIGTABLE_INSTANCE_ID, - TagValue.create(settings.getInstanceId())) - .put( - RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID, - TagValue.create(settings.getAppProfileId())) - .build()), + MetricsTracerFactory.create(tagger, stats, attributes), // Add user configured tracer settings.getTracerFactory()))); - + builder.setHeaderTracer( + builder + .getHeaderTracer() + .toBuilder() + .setStats(stats) + .setTagger(tagger) + .setStatsAttributes(attributes) + .build()); return builder.build(); } @@ -268,11 +273,10 @@ public ServerStreamingCallable createReadRowsCallable( ServerStreamingCallable readRowsUserCallable = new ReadRowsUserCallable<>(readRowsCallable, requestContext); + SpanName span = getSpanName("ReadRows"); ServerStreamingCallable traced = new TracedServerStreamingCallable<>( - readRowsUserCallable, - clientContext.getTracerFactory(), - SpanName.of(CLIENT_NAME, "ReadRows")); + readRowsUserCallable, clientContext.getTracerFactory(), span); return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } @@ -315,6 +319,7 @@ public UnaryCallable createReadRowCallable(RowAdapter *

  • Upon receiving the response stream, it will merge the {@link * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row * implementation can be configured by the {@code rowAdapter} parameter. + *
  • Add header tracer for tracking GFE metrics. *
  • Retry/resume on failure. *
  • Filter out marker rows. * @@ -356,10 +361,14 @@ public Map extract(ReadRowsRequest readRowsRequest) { ServerStreamingCallable watched = Callables.watched(merging, innerSettings, clientContext); + ServerStreamingCallable withHeaderTracer = + new HeaderTracerStreamingCallable<>( + watched, settings.getHeaderTracer(), getSpanName("ReadRows").toString()); + // Retry logic is split into 2 parts to workaround a rare edge case described in // ReadRowsRetryCompletedCallable ServerStreamingCallable retrying1 = - new ReadRowsRetryCompletedCallable<>(watched); + new ReadRowsRetryCompletedCallable<>(withHeaderTracer); ServerStreamingCallable retrying2 = Callables.retrying(retrying1, innerSettings, clientContext); @@ -380,6 +389,8 @@ public Map extract(ReadRowsRequest readRowsRequest) { * */ private UnaryCallable> createSampleRowKeysCallable() { + String methodName = "SampleRowKeys"; + ServerStreamingCallable base = GrpcRawCallableFactory.createServerStreamingCallable( GrpcCallSettings.newBuilder() @@ -399,11 +410,15 @@ public Map extract( UnaryCallable> spoolable = base.all(); + UnaryCallable> withHeaderTracer = + new HeaderTracerUnaryCallable<>( + spoolable, settings.getHeaderTracer(), getSpanName(methodName).toString()); + UnaryCallable> retryable = - Callables.retrying(spoolable, settings.sampleRowKeysSettings(), clientContext); + Callables.retrying(withHeaderTracer, settings.sampleRowKeysSettings(), clientContext); return createUserFacingUnaryCallable( - "SampleRowKeys", new SampleRowKeysCallable(retryable, requestContext)); + methodName, new SampleRowKeysCallable(retryable, requestContext)); } /** @@ -415,6 +430,7 @@ public Map extract( * */ private UnaryCallable createMutateRowCallable() { + String methodName = "MutateRow"; UnaryCallable base = GrpcRawCallableFactory.createUnaryCallable( GrpcCallSettings.newBuilder() @@ -431,11 +447,15 @@ public Map extract(MutateRowRequest mutateRowRequest) { .build(), settings.mutateRowSettings().getRetryableCodes()); + UnaryCallable withHeaderTracer = + new HeaderTracerUnaryCallable<>( + base, settings.getHeaderTracer(), getSpanName(methodName).toString()); + UnaryCallable retrying = - Callables.retrying(base, settings.mutateRowSettings(), clientContext); + Callables.retrying(withHeaderTracer, settings.mutateRowSettings(), clientContext); return createUserFacingUnaryCallable( - "MutateRow", new MutateRowCallable(retrying, requestContext)); + methodName, new MutateRowCallable(retrying, requestContext)); } /** @@ -459,11 +479,13 @@ private UnaryCallable createBulkMutateRowsCallable() { UnaryCallable userFacing = new BulkMutateRowsUserFacingCallable(baseCallable, requestContext); + SpanName spanName = getSpanName("MutateRows"); UnaryCallable traced = - new TracedUnaryCallable<>( - userFacing, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, "MutateRows")); + new TracedUnaryCallable<>(userFacing, clientContext.getTracerFactory(), spanName); + UnaryCallable withHeaderTracer = + new HeaderTracerUnaryCallable<>(traced, settings.getHeaderTracer(), spanName.toString()); - return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + return withHeaderTracer.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -569,6 +591,7 @@ public Map extract(MutateRowsRequest mutateRowsRequest) { * */ private UnaryCallable createCheckAndMutateRowCallable() { + String methodName = "CheckAndMutateRow"; UnaryCallable base = GrpcRawCallableFactory.createUnaryCallable( GrpcCallSettings.newBuilder() @@ -586,11 +609,15 @@ public Map extract( .build(), settings.checkAndMutateRowSettings().getRetryableCodes()); + UnaryCallable withHeaderTracer = + new HeaderTracerUnaryCallable<>( + base, settings.getHeaderTracer(), getSpanName(methodName).toString()); + UnaryCallable retrying = - Callables.retrying(base, settings.checkAndMutateRowSettings(), clientContext); + Callables.retrying(withHeaderTracer, settings.checkAndMutateRowSettings(), clientContext); return createUserFacingUnaryCallable( - "CheckAndMutateRow", new CheckAndMutateRowCallable(retrying, requestContext)); + methodName, new CheckAndMutateRowCallable(retrying, requestContext)); } /** @@ -619,12 +646,16 @@ public Map extract(ReadModifyWriteRowRequest request) { }) .build(), settings.readModifyWriteRowSettings().getRetryableCodes()); + String methodName = "ReadModifyWriteRow"; + UnaryCallable withHeaderTracer = + new HeaderTracerUnaryCallable<>( + base, settings.getHeaderTracer(), getSpanName(methodName).toString()); UnaryCallable retrying = - Callables.retrying(base, settings.readModifyWriteRowSettings(), clientContext); + Callables.retrying(withHeaderTracer, settings.readModifyWriteRowSettings(), clientContext); return createUserFacingUnaryCallable( - "ReadModifyWriteRow", new ReadModifyWriteRowCallable(retrying, requestContext)); + methodName, new ReadModifyWriteRowCallable(retrying, requestContext)); } /** @@ -635,8 +666,7 @@ private UnaryCallable createUserFacin String methodName, UnaryCallable inner) { UnaryCallable traced = - new TracedUnaryCallable<>( - inner, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, methodName)); + new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName)); return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } @@ -686,6 +716,10 @@ public UnaryCallable readModifyWriteRowCallable() { } // + private SpanName getSpanName(String methodName) { + return SpanName.of(CLIENT_NAME, methodName); + } + @Override public void close() { for (BackgroundResource backgroundResource : clientContext.getBackgroundResources()) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 72b22c35e..eaea47f4e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -38,6 +38,7 @@ import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracer; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor; import com.google.common.base.MoreObjects; @@ -154,6 +155,7 @@ public class EnhancedBigtableStubSettings extends StubSettings primedTableIds; + private HeaderTracer headerTracer; private final ServerStreamingCallSettings readRowsSettings; private final UnaryCallSettings readRowSettings; @@ -187,6 +189,7 @@ private EnhancedBigtableStubSettings(Builder builder) { appProfileId = builder.appProfileId; isRefreshingChannel = builder.isRefreshingChannel; primedTableIds = builder.primedTableIds; + headerTracer = builder.headerTracer; // Per method settings. readRowsSettings = builder.readRowsSettings.build(); @@ -231,6 +234,11 @@ public List getPrimedTableIds() { return primedTableIds; } + /** Gets the tracer for capturing metrics in the header. */ + HeaderTracer getHeaderTracer() { + return headerTracer; + } + /** Returns a builder for the default ChannelProvider for this service. */ public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() { return BigtableStubSettings.defaultGrpcTransportProviderBuilder() @@ -488,6 +496,7 @@ public static class Builder extends StubSettings.Builder primedTableIds; + private HeaderTracer headerTracer; private final ServerStreamingCallSettings.Builder readRowsSettings; private final UnaryCallSettings.Builder readRowSettings; @@ -511,6 +520,7 @@ private Builder() { this.appProfileId = SERVER_DEFAULT_APP_PROFILE_ID; this.isRefreshingChannel = false; primedTableIds = ImmutableList.of(); + headerTracer = HeaderTracer.newBuilder().build(); setCredentialsProvider(defaultCredentialsProviderBuilder().build()); // Defaults provider @@ -617,6 +627,7 @@ private Builder(EnhancedBigtableStubSettings settings) { appProfileId = settings.appProfileId; isRefreshingChannel = settings.isRefreshingChannel; primedTableIds = settings.primedTableIds; + headerTracer = settings.headerTracer; // Per method settings. readRowsSettings = settings.readRowsSettings.toBuilder(); @@ -739,6 +750,17 @@ public List getPrimedTableIds() { return primedTableIds; } + /** Configure the header tracer for surfacing metrics in the header. */ + Builder setHeaderTracer(HeaderTracer headerTracer) { + this.headerTracer = headerTracer; + return this; + } + + /** Gets the header tracer that'll be used to surface metrics in the header. */ + HeaderTracer getHeaderTracer() { + return headerTracer; + } + /** Returns the builder for the settings used for calls to readRows. */ public ServerStreamingCallSettings.Builder readRowsSettings() { return readRowsSettings; @@ -818,6 +840,7 @@ public String toString() { .add("appProfileId", appProfileId) .add("isRefreshingChannel", isRefreshingChannel) .add("primedTableIds", primedTableIds) + .add("headerTracer", headerTracer) .add("readRowsSettings", readRowsSettings) .add("readRowSettings", readRowSettings) .add("sampleRowKeysSettings", sampleRowKeysSettings) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracer.java new file mode 100644 index 000000000..f3eb0ef1e --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracer.java @@ -0,0 +1,123 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.core.InternalApi; +import com.google.auto.value.AutoValue; +import com.google.common.base.MoreObjects; +import io.grpc.Metadata; +import io.opencensus.stats.MeasureMap; +import io.opencensus.stats.Stats; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.Tags; +import java.util.Collections; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nonnull; + +@InternalApi +@AutoValue +public abstract class HeaderTracer { + + private static final Metadata.Key SERVER_TIMING_HEADER_KEY = + Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER); + private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?\\d+)"); + + @AutoValue.Builder + public abstract static class Builder { + // + public abstract Builder setTagger(@Nonnull Tagger tagger); + + public abstract Builder setStats(@Nonnull StatsRecorder stats); + + public abstract Builder setStatsAttributes(@Nonnull Map statsAttributes); + + abstract HeaderTracer autoBuild(); + + public HeaderTracer build() { + HeaderTracer headerTracer = autoBuild(); + return headerTracer; + } + // + } + + public abstract Tagger getTagger(); + + public abstract StatsRecorder getStats(); + + public abstract Map getStatsAttributes(); + + /** + * If the header has a server-timing field, extract the metric and publish it to OpenCensus. + * Otherwise increment the gfe header missing counter by 1. + */ + public void recordGfeMetadata(@Nonnull Metadata metadata, @Nonnull String spanName) { + MeasureMap measures = getStats().newMeasureMap(); + if (metadata.get(SERVER_TIMING_HEADER_KEY) != null) { + String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY); + Matcher matcher = SERVER_TIMING_HEADER_PATTERN.matcher(serverTiming); + measures.put(RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT, 0L); + if (matcher.find()) { + long latency = Long.valueOf(matcher.group("dur")); + measures.put(RpcMeasureConstants.BIGTABLE_GFE_LATENCY, latency); + } + } else { + measures.put(RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT, 1L); + } + measures.record(newTagCtxBuilder(spanName).build()); + } + + public void recordGfeMissingHeader(@Nonnull String spanName) { + MeasureMap measures = + getStats().newMeasureMap().put(RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT, 1L); + measures.record(newTagCtxBuilder(spanName).build()); + } + + private TagContextBuilder newTagCtxBuilder(String span) { + TagContextBuilder tagContextBuilder = getTagger().currentBuilder(); + if (span != null) { + tagContextBuilder.putLocal(RpcMeasureConstants.BIGTABLE_OP, TagValue.create(span)); + } + // Copy client level tags in + for (Map.Entry entry : getStatsAttributes().entrySet()) { + tagContextBuilder.putLocal(entry.getKey(), entry.getValue()); + } + return tagContextBuilder; + } + + public static Builder newBuilder() { + return new AutoValue_HeaderTracer.Builder() + .setTagger(Tags.getTagger()) + .setStats(Stats.getStatsRecorder()) + .setStatsAttributes(Collections.emptyMap()); + } + + public abstract Builder toBuilder(); + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("stats", getStats()) + .add("tagger", getTagger()) + .add("statsAttributes", getStatsAttributes()) + .toString(); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java new file mode 100644 index 000000000..fdca9297a --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerStreamingCallable.java @@ -0,0 +1,125 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.core.InternalApi; +import com.google.api.gax.grpc.GrpcResponseMetadata; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; +import com.google.common.base.Preconditions; +import io.grpc.Metadata; +import javax.annotation.Nonnull; + +/** + * This callable will inject a {@link GrpcResponseMetadata} to access the headers and trailers + * returned by gRPC methods upon completion. The {@link HeaderTracer} will process metrics that were + * injected in the header/trailer and publish them to OpenCensus. If {@link + * GrpcResponseMetadata#getMetadata()} returned null, it probably means that the request has never + * reached GFE, and it'll increment the gfe_header_missing_counter in this case. + * + *

    If GFE metrics are not registered in {@link RpcViews}, skip injecting GrpcResponseMetadata. + * This is for the case where direct path is enabled, all the requests won't go through GFE and + * therefore won't have the server-timing header. + * + *

    This class is considered an internal implementation detail and not meant to be used by + * applications. + */ +@InternalApi +public class HeaderTracerStreamingCallable + extends ServerStreamingCallable { + + private final ServerStreamingCallable innerCallable; + private final HeaderTracer headerTracer; + private final String spanName; + + public HeaderTracerStreamingCallable( + @Nonnull ServerStreamingCallable callable, + @Nonnull HeaderTracer headerTracer, + @Nonnull String spanName) { + this.innerCallable = Preconditions.checkNotNull(callable, "Inner callable must be set"); + this.headerTracer = Preconditions.checkNotNull(headerTracer, "HeaderTracer must be set"); + this.spanName = Preconditions.checkNotNull(spanName, "Span name must be set"); + } + + @Override + public void call( + RequestT request, ResponseObserver responseObserver, ApiCallContext context) { + final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata(); + if (RpcViews.isGfeMetricsRegistered()) { + HeaderTracerResponseObserver innerObserver = + new HeaderTracerResponseObserver<>( + responseObserver, headerTracer, responseMetadata, spanName); + innerCallable.call(request, innerObserver, responseMetadata.addHandlers(context)); + } else { + innerCallable.call(request, responseObserver, context); + } + } + + private class HeaderTracerResponseObserver implements ResponseObserver { + + private ResponseObserver outerObserver; + private HeaderTracer headerTracer; + private GrpcResponseMetadata responseMetadata; + private String spanName; + + HeaderTracerResponseObserver( + ResponseObserver observer, + HeaderTracer headerTracer, + GrpcResponseMetadata metadata, + String spanName) { + this.outerObserver = observer; + this.headerTracer = headerTracer; + this.responseMetadata = metadata; + this.spanName = spanName; + } + + @Override + public void onStart(final StreamController controller) { + outerObserver.onStart(controller); + } + + @Override + public void onResponse(ResponseT response) { + outerObserver.onResponse(response); + } + + @Override + public void onError(Throwable t) { + // server-timing metric will be added through GrpcResponseMetadata#onHeaders(Metadata), + // so it's not checking trailing metadata here. + Metadata metadata = responseMetadata.getMetadata(); + if (metadata != null) { + headerTracer.recordGfeMetadata(metadata, spanName); + } else { + headerTracer.recordGfeMissingHeader(spanName); + } + outerObserver.onError(t); + } + + @Override + public void onComplete() { + Metadata metadata = responseMetadata.getMetadata(); + if (metadata != null) { + headerTracer.recordGfeMetadata(metadata, spanName); + } else { + headerTracer.recordGfeMissingHeader(spanName); + } + outerObserver.onComplete(); + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable.java new file mode 100644 index 000000000..17d84b2a2 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable.java @@ -0,0 +1,83 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import com.google.api.core.ApiFuture; +import com.google.api.core.InternalApi; +import com.google.api.gax.grpc.GrpcResponseMetadata; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Metadata; +import javax.annotation.Nonnull; + +/** + * This callable will inject a {@link GrpcResponseMetadata} to access the headers and trailers + * returned by gRPC methods upon completion. The {@link HeaderTracer} will process metrics that were + * injected in the header/trailer and publish them to OpenCensus. If {@link + * GrpcResponseMetadata#getMetadata()} returned null, it probably means that the request has never + * reached GFE, and it'll increment the gfe_header_missing_counter in this case. + * + *

    If GFE metrics are not registered in {@link RpcViews}, skip injecting GrpcResponseMetadata. + * This is for the case where direct path is enabled, all the requests won't go through GFE and + * therefore won't have the server-timing header. + * + *

    This class is considered an internal implementation detail and not meant to be used by + * applications. + */ +@InternalApi +public class HeaderTracerUnaryCallable + extends UnaryCallable { + + private final UnaryCallable innerCallable; + private final HeaderTracer headerTracer; + private final String spanName; + + public HeaderTracerUnaryCallable( + @Nonnull UnaryCallable innerCallable, + @Nonnull HeaderTracer headerTracer, + @Nonnull String spanName) { + this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set"); + this.headerTracer = Preconditions.checkNotNull(headerTracer, "HeaderTracer must be set"); + this.spanName = Preconditions.checkNotNull(spanName, "Span name must be set"); + } + + @Override + public ApiFuture futureCall(RequestT request, ApiCallContext context) { + if (RpcViews.isGfeMetricsRegistered()) { + final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata(); + ApiFuture future = + innerCallable.futureCall(request, responseMetadata.addHandlers(context)); + future.addListener( + new Runnable() { + @Override + public void run() { + Metadata metadata = responseMetadata.getMetadata(); + if (metadata != null) { + headerTracer.recordGfeMetadata(metadata, spanName); + } else { + headerTracer.recordGfeMissingHeader(spanName); + } + } + }, + MoreExecutors.directExecutor()); + return future; + } else { + return innerCallable.futureCall(request, context); + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java index 8c6e347a0..e6e5c70db 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcMeasureConstants.java @@ -74,4 +74,18 @@ public class RpcMeasureConstants { "cloud.google.com/java/bigtable/read_rows_first_row_latency", "Time between request being sent to the first row received", MILLISECOND); + + /** GFE t4t7 latency extracted from server-timing header. */ + public static final MeasureLong BIGTABLE_GFE_LATENCY = + MeasureLong.create( + "cloud.google.com/java/bigtable/gfe_latency", + "Latency between Google's network receives an RPC and reads back the first byte of the response", + MILLISECOND); + + /** Number of responses without the server-timing header. */ + public static final MeasureLong BIGTABLE_GFE_HEADER_MISSING_COUNT = + MeasureLong.create( + "cloud.google.com/java/bigtable/gfe_header_missing_count", + "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network", + COUNT); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java index d21060c4a..8a14c01b1 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViewConstants.java @@ -17,6 +17,8 @@ import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT; +import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_LATENCY; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_INSTANCE_ID; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP; import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_OP_ATTEMPT_COUNT; @@ -29,6 +31,7 @@ import io.opencensus.stats.Aggregation; import io.opencensus.stats.Aggregation.Count; import io.opencensus.stats.Aggregation.Distribution; +import io.opencensus.stats.Aggregation.Sum; import io.opencensus.stats.BucketBoundaries; import io.opencensus.stats.View; import java.util.Arrays; @@ -36,6 +39,7 @@ class RpcViewConstants { // Aggregations private static final Aggregation COUNT = Count.create(); + private static final Aggregation SUM = Sum.create(); private static final Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM = Distribution.create( @@ -124,4 +128,22 @@ class RpcViewConstants { BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP, BIGTABLE_STATUS)); + + static final View BIGTABLE_GFE_LATENCY_VIEW = + View.create( + View.Name.create("cloud.google.com/java/bigtable/gfe_latency"), + "Latency between Google's network receives an RPC and reads back the first byte of the response", + BIGTABLE_GFE_LATENCY, + AGGREGATION_WITH_MILLIS_HISTOGRAM, + ImmutableList.of( + BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP)); + + static final View BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW = + View.create( + View.Name.create("cloud.google.com/java/bigtable/gfe_header_missing_count"), + "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network", + BIGTABLE_GFE_HEADER_MISSING_COUNT, + SUM, + ImmutableList.of( + BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP)); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java index cc3153949..9e8f6084a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/RpcViews.java @@ -33,15 +33,49 @@ public class RpcViews { RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW, RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW); + private static final ImmutableSet GFE_VIEW_SET = + ImmutableSet.of( + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW); + + private static boolean gfeMetricsRegistered = false; + /** Registers all Bigtable specific views. */ public static void registerBigtableClientViews() { registerBigtableClientViews(Stats.getViewManager()); } + /** + * Register views for GFE metrics, including gfe_latency and gfe_header_missing_count. gfe_latency + * measures the latency between Google's network receives an RPC and reads back the first byte of + * the response. gfe_header_missing_count is a counter of the number of RPC responses without a + * server-timing header. + */ + public static void registerBigtableClientGfeViews() { + registerBigtableClientGfeViews(Stats.getViewManager()); + } + @VisibleForTesting static void registerBigtableClientViews(ViewManager viewManager) { for (View view : BIGTABLE_CLIENT_VIEWS_SET) { viewManager.registerView(view); } } + + @VisibleForTesting + static void registerBigtableClientGfeViews(ViewManager viewManager) { + for (View view : GFE_VIEW_SET) { + viewManager.registerView(view); + } + gfeMetricsRegistered = true; + } + + static boolean isGfeMetricsRegistered() { + return gfeMetricsRegistered; + } + + @VisibleForTesting + static void setGfeMetricsRegistered(boolean gfeMetricsRegistered) { + RpcViews.gfeMetricsRegistered = gfeMetricsRegistered; + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java index 2cd55a311..10ba67582 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettingsTest.java @@ -32,6 +32,7 @@ import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracer; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Range; @@ -75,6 +76,7 @@ public void settingsAreNotLostTest() { CredentialsProvider credentialsProvider = Mockito.mock(CredentialsProvider.class); WatchdogProvider watchdogProvider = Mockito.mock(WatchdogProvider.class); Duration watchdogInterval = Duration.ofSeconds(12); + HeaderTracer headerTracer = Mockito.mock(HeaderTracer.class); EnhancedBigtableStubSettings.Builder builder = EnhancedBigtableStubSettings.newBuilder() @@ -85,7 +87,8 @@ public void settingsAreNotLostTest() { .setEndpoint(endpoint) .setCredentialsProvider(credentialsProvider) .setStreamWatchdogProvider(watchdogProvider) - .setStreamWatchdogCheckInterval(watchdogInterval); + .setStreamWatchdogCheckInterval(watchdogInterval) + .setHeaderTracer(headerTracer); verifyBuilder( builder, @@ -96,7 +99,8 @@ public void settingsAreNotLostTest() { endpoint, credentialsProvider, watchdogProvider, - watchdogInterval); + watchdogInterval, + headerTracer); verifySettings( builder.build(), projectId, @@ -106,7 +110,8 @@ public void settingsAreNotLostTest() { endpoint, credentialsProvider, watchdogProvider, - watchdogInterval); + watchdogInterval, + headerTracer); verifyBuilder( builder.build().toBuilder(), projectId, @@ -116,7 +121,8 @@ public void settingsAreNotLostTest() { endpoint, credentialsProvider, watchdogProvider, - watchdogInterval); + watchdogInterval, + headerTracer); } private void verifyBuilder( @@ -128,7 +134,8 @@ private void verifyBuilder( String endpoint, CredentialsProvider credentialsProvider, WatchdogProvider watchdogProvider, - Duration watchdogInterval) { + Duration watchdogInterval, + HeaderTracer headerTracer) { assertThat(builder.getProjectId()).isEqualTo(projectId); assertThat(builder.getInstanceId()).isEqualTo(instanceId); assertThat(builder.getAppProfileId()).isEqualTo(appProfileId); @@ -137,6 +144,7 @@ private void verifyBuilder( assertThat(builder.getCredentialsProvider()).isEqualTo(credentialsProvider); assertThat(builder.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider); assertThat(builder.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval); + assertThat(builder.getHeaderTracer()).isEqualTo(headerTracer); } private void verifySettings( @@ -148,7 +156,8 @@ private void verifySettings( String endpoint, CredentialsProvider credentialsProvider, WatchdogProvider watchdogProvider, - Duration watchdogInterval) { + Duration watchdogInterval, + HeaderTracer headerTracer) { assertThat(settings.getProjectId()).isEqualTo(projectId); assertThat(settings.getInstanceId()).isEqualTo(instanceId); assertThat(settings.getAppProfileId()).isEqualTo(appProfileId); @@ -157,6 +166,7 @@ private void verifySettings( assertThat(settings.getCredentialsProvider()).isEqualTo(credentialsProvider); assertThat(settings.getStreamWatchdogProvider()).isSameInstanceAs(watchdogProvider); assertThat(settings.getStreamWatchdogCheckInterval()).isEqualTo(watchdogInterval); + assertThat(settings.getHeaderTracer()).isEqualTo(headerTracer); } @Test @@ -622,12 +632,26 @@ public void isRefreshingChannelFalseValueTest() { assertThat(builder.build().toBuilder().isRefreshingChannel()).isFalse(); } + @Test + public void verifyDefaultHeaderTracerNotNullTest() { + String dummyProjectId = "my-project"; + String dummyInstanceId = "my-instance"; + EnhancedBigtableStubSettings.Builder builder = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(dummyProjectId) + .setInstanceId(dummyInstanceId); + assertThat(builder.getHeaderTracer()).isNotNull(); + assertThat(builder.build().getHeaderTracer()).isNotNull(); + assertThat(builder.build().toBuilder().getHeaderTracer()).isNotNull(); + } + static final String[] SETTINGS_LIST = { "projectId", "instanceId", "appProfileId", "isRefreshingChannel", "primedTableIds", + "headerTracer", "readRowsSettings", "readRowSettings", "sampleRowKeysSettings", diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerCallableTest.java new file mode 100644 index 000000000..9538b6a13 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerCallableTest.java @@ -0,0 +1,428 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; + +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.UnavailableException; +import com.google.bigtable.v2.BigtableGrpc.BigtableImplBase; +import com.google.bigtable.v2.CheckAndMutateRowRequest; +import com.google.bigtable.v2.CheckAndMutateRowResponse; +import com.google.bigtable.v2.MutateRowRequest; +import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.ReadModifyWriteRowRequest; +import com.google.bigtable.v2.ReadModifyWriteRowResponse; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.SampleRowKeysRequest; +import com.google.bigtable.v2.SampleRowKeysResponse; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.FakeServiceHelper; +import com.google.cloud.bigtable.data.v2.internal.NameUtil; +import com.google.cloud.bigtable.data.v2.models.BulkMutation; +import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; +import com.google.cloud.bigtable.data.v2.models.Mutation; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; +import com.google.common.collect.ImmutableMap; +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import io.opencensus.impl.stats.StatsComponentImpl; +import io.opencensus.stats.StatsComponent; +import io.opencensus.stats.ViewData; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tags; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class HeaderTracerCallableTest { + private FakeServiceHelper serviceHelper; + private FakeServiceHelper serviceHelperNoHeader; + + private FakeService fakeService = new FakeService(); + + private final StatsComponent localStats = new StatsComponentImpl(); + private EnhancedBigtableStub stub; + private EnhancedBigtableStub noHeaderStub; + private int attempts; + + private static final String PROJECT_ID = "fake-project"; + private static final String INSTANCE_ID = "fake-instance"; + private static final String APP_PROFILE_ID = "default"; + private static final String TABLE_ID = "fake-table"; + + private static final long WAIT_FOR_METRICS_TIME_MS = 1_000; + + private AtomicInteger fakeServerTiming; + + @Before + public void setUp() throws Exception { + RpcViews.registerBigtableClientGfeViews(localStats.getViewManager()); + + // Create a server that'll inject a server-timing header with a random number and a stub that + // connects to this server. + fakeServerTiming = new AtomicInteger(new Random().nextInt(1000) + 1); + serviceHelper = + new FakeServiceHelper( + new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall serverCall, + Metadata metadata, + ServerCallHandler serverCallHandler) { + return serverCallHandler.startCall( + new SimpleForwardingServerCall(serverCall) { + @Override + public void sendHeaders(Metadata headers) { + headers.put( + Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER), + String.format("gfet4t7; dur=%d", fakeServerTiming.get())); + super.sendHeaders(headers); + } + }, + metadata); + } + }, + fakeService); + serviceHelper.start(); + + BigtableDataSettings settings = + BigtableDataSettings.newBuilderForEmulator(serviceHelper.getPort()) + .setProjectId(PROJECT_ID) + .setInstanceId(INSTANCE_ID) + .setAppProfileId(APP_PROFILE_ID) + .build(); + EnhancedBigtableStubSettings stubSettings = + EnhancedBigtableStub.finalizeSettings( + settings.getStubSettings(), Tags.getTagger(), localStats.getStatsRecorder()); + attempts = stubSettings.readRowsSettings().getRetrySettings().getMaxAttempts(); + stub = new EnhancedBigtableStub(stubSettings, ClientContext.create(stubSettings)); + + // Create another server without injecting the server-timing header and another stub that + // connects to it. + serviceHelperNoHeader = new FakeServiceHelper(fakeService); + serviceHelperNoHeader.start(); + + BigtableDataSettings noHeaderSettings = + BigtableDataSettings.newBuilderForEmulator(serviceHelperNoHeader.getPort()) + .setProjectId(PROJECT_ID) + .setInstanceId(INSTANCE_ID) + .setAppProfileId(APP_PROFILE_ID) + .build(); + EnhancedBigtableStubSettings noHeaderStubSettings = + EnhancedBigtableStub.finalizeSettings( + noHeaderSettings.getStubSettings(), Tags.getTagger(), localStats.getStatsRecorder()); + noHeaderStub = + new EnhancedBigtableStub(noHeaderStubSettings, ClientContext.create(noHeaderStubSettings)); + } + + @After + public void tearDown() { + stub.close(); + noHeaderStub.close(); + serviceHelper.shutdown(); + serviceHelperNoHeader.shutdown(); + } + + @Test + public void testGFELatencyMetricReadRows() throws InterruptedException { + stub.readRowsCallable().call(Query.create(TABLE_ID)); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + + long latency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + + assertThat(latency).isEqualTo(fakeServerTiming.get()); + } + + @Test + public void testGFELatencyMetricMutateRow() throws InterruptedException { + stub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "fake-key")); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + + long latency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRow")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + + assertThat(latency).isEqualTo(fakeServerTiming.get()); + } + + @Test + public void testGFELatencyMetricMutateRows() throws InterruptedException { + BulkMutation mutations = + BulkMutation.create(TABLE_ID) + .add("key", Mutation.create().setCell("fake-family", "fake-qualifier", "fake-value")); + stub.bulkMutateRowsCallable().call(mutations); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + + long latency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRows")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + + assertThat(latency).isEqualTo(fakeServerTiming.get()); + } + + @Test + public void testGFELatencySampleRowKeys() throws InterruptedException { + stub.sampleRowKeysCallable().call(TABLE_ID); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + long latency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.SampleRowKeys")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(latency).isEqualTo(fakeServerTiming.get()); + } + + @Test + public void testGFELatencyCheckAndMutateRow() throws InterruptedException { + ConditionalRowMutation mutation = + ConditionalRowMutation.create(TABLE_ID, "fake-key") + .then(Mutation.create().setCell("fake-family", "fake-qualifier", "fake-value")); + stub.checkAndMutateRowCallable().call(mutation); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + long latency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.CheckAndMutateRow")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(latency).isEqualTo(fakeServerTiming.get()); + } + + @Test + public void testGFELatencyReadModifyWriteRow() throws InterruptedException { + ReadModifyWriteRow request = + ReadModifyWriteRow.create(TABLE_ID, "fake-key") + .append("fake-family", "fake-qualifier", "suffix"); + stub.readModifyWriteRowCallable().call(request); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + long latency = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadModifyWriteRow")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(latency).isEqualTo(fakeServerTiming.get()); + } + + @Test + public void testGFEMissingHeaderMetric() throws InterruptedException { + // Make a few calls to the server which will inject the server-timing header and the counter + // should be 0. + stub.readRowsCallable().call(Query.create(TABLE_ID)); + stub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "key")); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + long mutateRowMissingCount = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW, + ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRow")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + long readRowsMissingCount = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + + assertThat(mutateRowMissingCount).isEqualTo(0); + assertThat(readRowsMissingCount).isEqualTo(0); + + // Make a few more calls to the server which won't add the header and the counter should match + // the number of requests sent. + int readRowsCalls = new Random().nextInt(10) + 1; + int mutateRowCalls = new Random().nextInt(10) + 1; + for (int i = 0; i < mutateRowCalls; i++) { + noHeaderStub.mutateRowCallable().call(RowMutation.create(TABLE_ID, "fake-key" + i)); + } + for (int i = 0; i < readRowsCalls; i++) { + noHeaderStub.readRowsCallable().call(Query.create(TABLE_ID)); + } + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + + mutateRowMissingCount = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW, + ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRow")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + readRowsMissingCount = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW, + ImmutableMap.of( + RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + + assertThat(mutateRowMissingCount).isEqualTo(mutateRowCalls); + assertThat(readRowsMissingCount).isEqualTo(readRowsCalls); + } + + @Test + public void testMetricsWithErrorResponse() throws InterruptedException { + try { + stub.readRowsCallable().call(Query.create("random-table-id")).iterator().next(); + fail("readrows should throw exception"); + } catch (Exception e) { + assertThat(e).isInstanceOf(UnavailableException.class); + } + + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + long missingCount = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW, + ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(missingCount).isEqualTo(attempts); + } + + @Test + public void testCallableBypassed() throws InterruptedException { + RpcViews.setGfeMetricsRegistered(false); + stub.readRowsCallable().call(Query.create(TABLE_ID)); + Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + ViewData headerMissingView = + localStats + .getViewManager() + .getView(RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW.getName()); + ViewData latencyView = + localStats.getViewManager().getView(RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW.getName()); + // Verify that the view is registered by it's not collecting metrics + assertThat(headerMissingView).isNotNull(); + assertThat(latencyView).isNotNull(); + assertThat(headerMissingView.getAggregationMap()).isEmpty(); + assertThat(latencyView.getAggregationMap()).isEmpty(); + } + + private class FakeService extends BigtableImplBase { + private final String defaultTableName = + NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID); + + @Override + public void readRows(ReadRowsRequest request, StreamObserver observer) { + if (!request.getTableName().equals(defaultTableName)) { + observer.onError(new StatusRuntimeException(Status.UNAVAILABLE)); + return; + } + observer.onNext(ReadRowsResponse.getDefaultInstance()); + observer.onCompleted(); + } + + @Override + public void mutateRow(MutateRowRequest request, StreamObserver observer) { + observer.onNext(MutateRowResponse.getDefaultInstance()); + observer.onCompleted(); + } + + @Override + public void mutateRows(MutateRowsRequest request, StreamObserver observer) { + observer.onNext(MutateRowsResponse.getDefaultInstance()); + observer.onCompleted(); + } + + @Override + public void sampleRowKeys( + SampleRowKeysRequest request, StreamObserver observer) { + observer.onNext(SampleRowKeysResponse.getDefaultInstance()); + observer.onCompleted(); + } + + @Override + public void checkAndMutateRow( + CheckAndMutateRowRequest request, StreamObserver observer) { + observer.onNext(CheckAndMutateRowResponse.getDefaultInstance()); + observer.onCompleted(); + } + + @Override + public void readModifyWriteRow( + ReadModifyWriteRowRequest request, StreamObserver observer) { + observer.onNext(ReadModifyWriteRowResponse.getDefaultInstance()); + observer.onCompleted(); + } + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerTest.java new file mode 100644 index 000000000..30e24dbfe --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.metrics; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.collect.ImmutableMap; +import io.opencensus.impl.stats.StatsComponentImpl; +import io.opencensus.stats.StatsComponent; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import java.util.Map; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +@RunWith(JUnit4.class) +public class HeaderTracerTest { + + private final StatsComponent localStats = new StatsComponentImpl(); + + @Test + public void testDefaultBuilder() { + HeaderTracer.Builder builder = HeaderTracer.newBuilder(); + HeaderTracer tracer = builder.build(); + assertThat(tracer.getStats()).isNotNull(); + assertThat(tracer.getTagger()).isNotNull(); + assertThat(tracer.getStatsAttributes()).isNotNull(); + assertThat(tracer.getStatsAttributes()).isEmpty(); + } + + @Test + public void testBuilder() { + HeaderTracer.Builder builder = HeaderTracer.newBuilder(); + Map attrs = + ImmutableMap.of(TagKey.create("fake-key"), TagValue.create("fake-value")); + Tagger tagger = Mockito.mock(Tagger.class); + StatsRecorder stats = localStats.getStatsRecorder(); + builder.setStats(stats).setStatsAttributes(attrs).setTagger(tagger); + HeaderTracer headerTracer = builder.build(); + assertThat(headerTracer.getStats()).isEqualTo(stats); + assertThat(headerTracer.getTagger()).isEqualTo(tagger); + assertThat(headerTracer.getStatsAttributes()).isEqualTo(attrs); + } + + @Test + public void testToBuilder() { + HeaderTracer.Builder builder = HeaderTracer.newBuilder(); + Map attrs = + ImmutableMap.of(TagKey.create("fake-key"), TagValue.create("fake-value")); + Tagger tagger = Mockito.mock(Tagger.class); + StatsRecorder stats = localStats.getStatsRecorder(); + builder.setStats(stats).setStatsAttributes(attrs).setTagger(tagger); + HeaderTracer headerTracer = builder.build(); + + HeaderTracer.Builder newBuilder = headerTracer.toBuilder(); + assertThat(newBuilder.build().getStats()).isEqualTo(stats); + assertThat(newBuilder.build().getTagger()).isEqualTo(tagger); + assertThat(newBuilder.build().getStatsAttributes()).isEqualTo(attrs); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java index 4b025303e..56d65f829 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java @@ -39,24 +39,10 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; -import io.opencensus.common.Function; import io.opencensus.impl.stats.StatsComponentImpl; -import io.opencensus.stats.AggregationData; -import io.opencensus.stats.AggregationData.CountData; -import io.opencensus.stats.AggregationData.DistributionData; -import io.opencensus.stats.AggregationData.LastValueDataDouble; -import io.opencensus.stats.AggregationData.LastValueDataLong; -import io.opencensus.stats.AggregationData.SumDataDouble; -import io.opencensus.stats.AggregationData.SumDataLong; -import io.opencensus.stats.View; -import io.opencensus.stats.ViewData; import io.opencensus.tags.TagKey; import io.opencensus.tags.TagValue; import io.opencensus.tags.Tags; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; @@ -117,7 +103,6 @@ public void setUp() throws Exception { EnhancedBigtableStubSettings stubSettings = EnhancedBigtableStub.finalizeSettings( settings.getStubSettings(), Tags.getTagger(), localStats.getStatsRecorder()); - stub = new EnhancedBigtableStub(stubSettings, ClientContext.create(stubSettings)); } @@ -155,11 +140,15 @@ public Object answer(InvocationOnMock invocation) throws Throwable { Thread.sleep(100); long opLatency = - getAggregationValueAsLong( + StatsTestUtils.getAggregationValueAsLong( + localStats, RpcViewConstants.BIGTABLE_OP_LATENCY_VIEW, ImmutableMap.of( RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"), - RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK"))); + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); assertThat(opLatency).isIn(Range.closed(sleepTime, elapsed)); } @@ -187,11 +176,15 @@ public Object answer(InvocationOnMock invocation) { Thread.sleep(100); long opLatency = - getAggregationValueAsLong( + StatsTestUtils.getAggregationValueAsLong( + localStats, RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW, ImmutableMap.of( RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"), - RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK"))); + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); assertThat(opLatency).isEqualTo(2); } @@ -225,9 +218,13 @@ public Object answer(InvocationOnMock invocation) throws Throwable { Thread.sleep(100); long firstRowLatency = - getAggregationValueAsLong( + StatsTestUtils.getAggregationValueAsLong( + localStats, RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW, - ImmutableMap.of()); + ImmutableMap.of(), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); // adding buffer time to the upper range to allow for a race between the emulator and the client // recording the duration @@ -267,11 +264,15 @@ public Object answer(InvocationOnMock invocation) { Thread.sleep(100); long opLatency = - getAggregationValueAsLong( + StatsTestUtils.getAggregationValueAsLong( + localStats, RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW, ImmutableMap.of( RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"), - RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK"))); + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); assertThat(opLatency).isEqualTo(2); } @@ -312,11 +313,15 @@ public Object answer(InvocationOnMock invocation) throws Throwable { Thread.sleep(100); long attemptLatency = - getAggregationValueAsLong( + StatsTestUtils.getAggregationValueAsLong( + localStats, RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW, ImmutableMap.of( RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"), - RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK"))); + RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); // Average attempt latency will be just a single wait (as opposed to op latency which will be 2x // sleeptime) assertThat(attemptLatency).isIn(Range.closed(sleepTime, elapsed - sleepTime)); @@ -326,70 +331,4 @@ public Object answer(InvocationOnMock invocation) throws Throwable { private static StreamObserver anyObserver(Class returnType) { return (StreamObserver) any(returnType); } - - private long getAggregationValueAsLong(View view, ImmutableMap tags) { - ViewData viewData = localStats.getViewManager().getView(view.getName()); - Map, AggregationData> aggregationMap = - Objects.requireNonNull(viewData).getAggregationMap(); - - List tagValues = new ArrayList<>(); - - for (TagKey column : view.getColumns()) { - if (RpcMeasureConstants.BIGTABLE_PROJECT_ID == column) { - tagValues.add(TagValue.create(PROJECT_ID)); - } else if (RpcMeasureConstants.BIGTABLE_INSTANCE_ID == column) { - tagValues.add(TagValue.create(INSTANCE_ID)); - } else if (RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID == column) { - tagValues.add(TagValue.create(APP_PROFILE_ID)); - } else { - tagValues.add(tags.get(column)); - } - } - - AggregationData aggregationData = aggregationMap.get(tagValues); - - return aggregationData.match( - new Function() { - @Override - public Long apply(SumDataDouble arg) { - return (long) arg.getSum(); - } - }, - new Function() { - @Override - public Long apply(SumDataLong arg) { - return arg.getSum(); - } - }, - new Function() { - @Override - public Long apply(CountData arg) { - return arg.getCount(); - } - }, - new Function() { - @Override - public Long apply(DistributionData arg) { - return (long) arg.getMean(); - } - }, - new Function() { - @Override - public Long apply(LastValueDataDouble arg) { - return (long) arg.getLastValue(); - } - }, - new Function() { - @Override - public Long apply(LastValueDataLong arg) { - return arg.getLastValue(); - } - }, - new Function() { - @Override - public Long apply(AggregationData arg) { - throw new UnsupportedOperationException(); - } - }); - } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java index ff37e75a8..6aede9616 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java @@ -23,9 +23,13 @@ import com.google.common.collect.Maps; import io.grpc.Context; import io.opencensus.common.Scope; +import io.opencensus.stats.AggregationData; import io.opencensus.stats.Measure; import io.opencensus.stats.MeasureMap; +import io.opencensus.stats.StatsComponent; import io.opencensus.stats.StatsRecorder; +import io.opencensus.stats.View; +import io.opencensus.stats.ViewData; import io.opencensus.tags.Tag; import io.opencensus.tags.TagContext; import io.opencensus.tags.TagContextBuilder; @@ -35,9 +39,12 @@ import io.opencensus.tags.TagValue; import io.opencensus.tags.Tagger; import io.opencensus.tags.unsafe.ContextUtils; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -264,4 +271,76 @@ private static ImmutableMap getTags(TagContext tags) { ? ((FakeTagContext) tags).getTags() : ImmutableMap.of(); } + + public static long getAggregationValueAsLong( + StatsComponent stats, + View view, + ImmutableMap tags, + String projectId, + String instanceId, + String appProfileId) { + ViewData viewData = stats.getViewManager().getView(view.getName()); + Map, AggregationData> aggregationMap = + Objects.requireNonNull(viewData).getAggregationMap(); + + List tagValues = new ArrayList<>(); + + for (TagKey column : view.getColumns()) { + if (RpcMeasureConstants.BIGTABLE_PROJECT_ID == column) { + tagValues.add(TagValue.create(projectId)); + } else if (RpcMeasureConstants.BIGTABLE_INSTANCE_ID == column) { + tagValues.add(TagValue.create(instanceId)); + } else if (RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID == column) { + tagValues.add(TagValue.create(appProfileId)); + } else { + tagValues.add(tags.get(column)); + } + } + + AggregationData aggregationData = aggregationMap.get(tagValues); + + return aggregationData.match( + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.SumDataDouble arg) { + return (long) arg.getSum(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.SumDataLong arg) { + return arg.getSum(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.CountData arg) { + return arg.getCount(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.DistributionData arg) { + return (long) arg.getMean(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.LastValueDataDouble arg) { + return (long) arg.getLastValue(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.LastValueDataLong arg) { + return arg.getLastValue(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData arg) { + throw new UnsupportedOperationException(); + } + }); + } }