Skip to content

Commit

Permalink
feat: Surface the server-timing metric (googleapis#535)
Browse files Browse the repository at this point in the history
* Extract server-timing trailer and create metrics for gfe latency

* Add more tests and refactor

* Refactor comments and imports

* reformatting

* Clean up comments

* Refactor, use GrpcMetadataResponse to get the trailer

* Fix based on the code review

* clean up HeaderTracerResponseObserver

* Add more tests for all the ops

* Improve documents, changes for directPath and more tests

* Small fixes in the doc

* small clean up
  • Loading branch information
mutianf authored and ad548 committed Mar 13, 2021
1 parent 2c9f025 commit 7686d9a
Show file tree
Hide file tree
Showing 15 changed files with 1,161 additions and 130 deletions.
14 changes: 13 additions & 1 deletion README.md
Expand Up @@ -296,12 +296,22 @@ metrics will be tagged with:
each client RPC, tagged by operation name and the attempt status. Under normal
circumstances, this will be identical to op_latency. However, when the client
receives transient errors, op_latency will be the sum of all attempt_latencies
and the exponential delays
and the exponential delays.

* `cloud.google.com/java/bigtable/attempts_per_op`: A distribution of attempts that
each operation required, tagged by operation name and final operation status.
Under normal circumstances, this will be 1.

### GFE metric views:

* `cloud.google.com/java/bigtable/gfe_latency`: A distribution of the latency
between Google's network receives an RPC and reads back the first byte of
the response.

* `cloud.google.com/java/bigtable/gfe_header_missing_count`: A counter of the
number of RPC responses received without the server-timing header, which
indicates that the request probably never reached Google's network.


By default, the functionality is disabled. For example to enable metrics using
[Google Stackdriver](https://cloud.google.com/monitoring/docs/):
Expand Down Expand Up @@ -357,6 +367,8 @@ StackdriverStatsExporter.createAndRegister(
);

BigtableDataSettings.enableOpenCensusStats();
// Enable GFE metric views
BigtableDataSettings.enableGfeOpenCensusStats();
```

## Version Conflicts
Expand Down
Expand Up @@ -175,6 +175,20 @@ public static void enableOpenCensusStats() {
// io.opencensus.contrib.grpc.metrics.RpcViews.registerClientGrpcBasicViews();
}

/**
* Enables OpenCensus GFE metric aggregations.
*
* <p>This will register views for gfe_latency and gfe_header_missing_count metrics.
*
* <p>gfe_latency measures the latency between Google's network receives an RPC and reads back the
* first byte of the response. gfe_header_missing_count is a counter of the number of RPC
* responses received without the server-timing header.
*/
@BetaApi("OpenCensus stats integration is currently unstable and may change in the future")
public static void enableGfeOpenCensusStats() {
com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientGfeViews();
}

/** Returns the target project id. */
public String getProjectId() {
return stubSettings.getProjectId();
Expand Down
Expand Up @@ -66,6 +66,8 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
Expand Down Expand Up @@ -162,6 +164,15 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build());
}

ImmutableMap<TagKey, TagValue> attributes =
ImmutableMap.<TagKey, TagValue>builder()
.put(RpcMeasureConstants.BIGTABLE_PROJECT_ID, TagValue.create(settings.getProjectId()))
.put(
RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(settings.getInstanceId()))
.put(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build();
// Inject Opencensus instrumentation
builder.setTracerFactory(
new CompositeTracerFactory(
Expand All @@ -187,23 +198,17 @@ public static EnhancedBigtableStubSettings finalizeSettings(
GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class))
.build()),
// Add OpenCensus Metrics
MetricsTracerFactory.create(
tagger,
stats,
ImmutableMap.<TagKey, TagValue>builder()
.put(
RpcMeasureConstants.BIGTABLE_PROJECT_ID,
TagValue.create(settings.getProjectId()))
.put(
RpcMeasureConstants.BIGTABLE_INSTANCE_ID,
TagValue.create(settings.getInstanceId()))
.put(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build()),
MetricsTracerFactory.create(tagger, stats, attributes),
// Add user configured tracer
settings.getTracerFactory())));

builder.setHeaderTracer(
builder
.getHeaderTracer()
.toBuilder()
.setStats(stats)
.setTagger(tagger)
.setStatsAttributes(attributes)
.build());
return builder.build();
}

Expand Down Expand Up @@ -268,11 +273,10 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallable<Query, RowT> readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

SpanName span = getSpanName("ReadRows");
ServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable<>(
readRowsUserCallable,
clientContext.getTracerFactory(),
SpanName.of(CLIENT_NAME, "ReadRows"));
readRowsUserCallable, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -315,6 +319,7 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured by the {@code rowAdapter} parameter.
* <li>Add header tracer for tracking GFE metrics.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
Expand Down Expand Up @@ -356,10 +361,14 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
ServerStreamingCallable<ReadRowsRequest, RowT> watched =
Callables.watched(merging, innerSettings, clientContext);

ServerStreamingCallable<ReadRowsRequest, RowT> withHeaderTracer =
new HeaderTracerStreamingCallable<>(
watched, settings.getHeaderTracer(), getSpanName("ReadRows").toString());

// Retry logic is split into 2 parts to workaround a rare edge case described in
// ReadRowsRetryCompletedCallable
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
new ReadRowsRetryCompletedCallable<>(watched);
new ReadRowsRetryCompletedCallable<>(withHeaderTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
Expand All @@ -380,6 +389,8 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
* </ul>
*/
private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
String methodName = "SampleRowKeys";

ServerStreamingCallable<SampleRowKeysRequest, SampleRowKeysResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<SampleRowKeysRequest, SampleRowKeysResponse>newBuilder()
Expand All @@ -399,11 +410,15 @@ public Map<String, String> extract(

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> spoolable = base.all();

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
spoolable, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(spoolable, settings.sampleRowKeysSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.sampleRowKeysSettings(), clientContext);

return createUserFacingUnaryCallable(
"SampleRowKeys", new SampleRowKeysCallable(retryable, requestContext));
methodName, new SampleRowKeysCallable(retryable, requestContext));
}

/**
Expand All @@ -415,6 +430,7 @@ public Map<String, String> extract(
* </ul>
*/
private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
String methodName = "MutateRow";
UnaryCallable<MutateRowRequest, MutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<MutateRowRequest, MutateRowResponse>newBuilder()
Expand All @@ -431,11 +447,15 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
.build(),
settings.mutateRowSettings().getRetryableCodes());

UnaryCallable<MutateRowRequest, MutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(base, settings.mutateRowSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.mutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"MutateRow", new MutateRowCallable(retrying, requestContext));
methodName, new MutateRowCallable(retrying, requestContext));
}

/**
Expand All @@ -459,11 +479,13 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<BulkMutation, Void> userFacing =
new BulkMutateRowsUserFacingCallable(baseCallable, requestContext);

SpanName spanName = getSpanName("MutateRows");
UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(
userFacing, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, "MutateRows"));
new TracedUnaryCallable<>(userFacing, clientContext.getTracerFactory(), spanName);
UnaryCallable<BulkMutation, Void> withHeaderTracer =
new HeaderTracerUnaryCallable<>(traced, settings.getHeaderTracer(), spanName.toString());

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return withHeaderTracer.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -569,6 +591,7 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
* </ul>
*/
private UnaryCallable<ConditionalRowMutation, Boolean> createCheckAndMutateRowCallable() {
String methodName = "CheckAndMutateRow";
UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<CheckAndMutateRowRequest, CheckAndMutateRowResponse>newBuilder()
Expand All @@ -586,11 +609,15 @@ public Map<String, String> extract(
.build(),
settings.checkAndMutateRowSettings().getRetryableCodes());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(base, settings.checkAndMutateRowSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.checkAndMutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"CheckAndMutateRow", new CheckAndMutateRowCallable(retrying, requestContext));
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
}

/**
Expand Down Expand Up @@ -619,12 +646,16 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
})
.build(),
settings.readModifyWriteRowSettings().getRetryableCodes());
String methodName = "ReadModifyWriteRow";
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(base, settings.readModifyWriteRowSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.readModifyWriteRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"ReadModifyWriteRow", new ReadModifyWriteRowCallable(retrying, requestContext));
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
}

/**
Expand All @@ -635,8 +666,7 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
String methodName, UnaryCallable<RequestT, ResponseT> inner) {

UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(
inner, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, methodName));
new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -686,6 +716,10 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
}
// </editor-fold>

private SpanName getSpanName(String methodName) {
return SpanName.of(CLIENT_NAME, methodName);
}

@Override
public void close() {
for (BackgroundResource backgroundResource : clientContext.getBackgroundResources()) {
Expand Down
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracer;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -154,6 +155,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final String appProfileId;
private final boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private HeaderTracer headerTracer;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
Expand Down Expand Up @@ -187,6 +189,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
appProfileId = builder.appProfileId;
isRefreshingChannel = builder.isRefreshingChannel;
primedTableIds = builder.primedTableIds;
headerTracer = builder.headerTracer;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
Expand Down Expand Up @@ -231,6 +234,11 @@ public List<String> getPrimedTableIds() {
return primedTableIds;
}

/** Gets the tracer for capturing metrics in the header. */
HeaderTracer getHeaderTracer() {
return headerTracer;
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
Expand Down Expand Up @@ -488,6 +496,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private String appProfileId;
private boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private HeaderTracer headerTracer;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
Expand All @@ -511,6 +520,7 @@ private Builder() {
this.appProfileId = SERVER_DEFAULT_APP_PROFILE_ID;
this.isRefreshingChannel = false;
primedTableIds = ImmutableList.of();
headerTracer = HeaderTracer.newBuilder().build();
setCredentialsProvider(defaultCredentialsProviderBuilder().build());

// Defaults provider
Expand Down Expand Up @@ -617,6 +627,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
appProfileId = settings.appProfileId;
isRefreshingChannel = settings.isRefreshingChannel;
primedTableIds = settings.primedTableIds;
headerTracer = settings.headerTracer;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
Expand Down Expand Up @@ -739,6 +750,17 @@ public List<String> getPrimedTableIds() {
return primedTableIds;
}

/** Configure the header tracer for surfacing metrics in the header. */
Builder setHeaderTracer(HeaderTracer headerTracer) {
this.headerTracer = headerTracer;
return this;
}

/** Gets the header tracer that'll be used to surface metrics in the header. */
HeaderTracer getHeaderTracer() {
return headerTracer;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
Expand Down Expand Up @@ -818,6 +840,7 @@ public String toString() {
.add("appProfileId", appProfileId)
.add("isRefreshingChannel", isRefreshingChannel)
.add("primedTableIds", primedTableIds)
.add("headerTracer", headerTracer)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
Expand Down

0 comments on commit 7686d9a

Please sign in to comment.