Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Surface the server-timing metric #535

Merged
merged 15 commits into from Dec 17, 2020
14 changes: 13 additions & 1 deletion README.md
Expand Up @@ -262,12 +262,22 @@ metrics will be tagged with:
each client RPC, tagged by operation name and the attempt status. Under normal
circumstances, this will be identical to op_latency. However, when the client
receives transient errors, op_latency will be the sum of all attempt_latencies
and the exponential delays
and the exponential delays.

* `cloud.google.com/java/bigtable/attempts_per_op`: A distribution of attempts that
each operation required, tagged by operation name and final operation status.
Under normal circumstances, this will be 1.

### GFE metric views:

* `cloud.google.com/java/bigtable/gfe_latency`: A distribution of the latency
between Google's network receives an RPC and reads back the first byte of
the response.

* `cloud.google.com/java/bigtable/gfe_header_missing_count`: A counter of the
number of RPC responses received without the server-timing header, which
indicates that the request probably never reached Google's network.


By default, the functionality is disabled. For example to enable metrics using
[Google Stackdriver](https://cloud.google.com/monitoring/docs/):
Expand Down Expand Up @@ -323,6 +333,8 @@ StackdriverStatsExporter.createAndRegister(
);

BigtableDataSettings.enableOpenCensusStats();
// Enable GFE metric views
BigtableDataSettings.enableGfeOpenCensusStats();
```

## Version Conflicts
Expand Down
Expand Up @@ -175,6 +175,20 @@ public static void enableOpenCensusStats() {
// io.opencensus.contrib.grpc.metrics.RpcViews.registerClientGrpcBasicViews();
}

/**
* Enables OpenCensus GFE metric aggregations.
*
* <p>This will register views for gfe_latency and gfe_header_missing_count metrics.
*
* <p>gfe_latency measures the latency between Google's network receives an RPC and reads back the
* first byte of the response. gfe_header_missing_count is a counter of the number of RPC
* responses received without the server-timing header.
*/
@BetaApi("OpenCensus stats integration is currently unstable and may change in the future")
public static void enableGfeOpenCensusStats() {
com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientGfeViews();
}

/** Returns the target project id. */
public String getProjectId() {
return stubSettings.getProjectId();
Expand Down
Expand Up @@ -66,6 +66,8 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
Expand Down Expand Up @@ -162,6 +164,15 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build());
}

ImmutableMap<TagKey, TagValue> attributes =
ImmutableMap.<TagKey, TagValue>builder()
.put(RpcMeasureConstants.BIGTABLE_PROJECT_ID, TagValue.create(settings.getProjectId()))
.put(
RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(settings.getInstanceId()))
.put(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build();
// Inject Opencensus instrumentation
builder.setTracerFactory(
new CompositeTracerFactory(
Expand All @@ -187,23 +198,17 @@ public static EnhancedBigtableStubSettings finalizeSettings(
GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class))
.build()),
// Add OpenCensus Metrics
MetricsTracerFactory.create(
tagger,
stats,
ImmutableMap.<TagKey, TagValue>builder()
.put(
RpcMeasureConstants.BIGTABLE_PROJECT_ID,
TagValue.create(settings.getProjectId()))
.put(
RpcMeasureConstants.BIGTABLE_INSTANCE_ID,
TagValue.create(settings.getInstanceId()))
.put(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build()),
MetricsTracerFactory.create(tagger, stats, attributes),
// Add user configured tracer
settings.getTracerFactory())));

builder.setHeaderTracer(
builder
.getHeaderTracer()
.toBuilder()
.setStats(stats)
.setTagger(tagger)
.setStatsAttributes(attributes)
.build());
return builder.build();
}

Expand Down Expand Up @@ -268,11 +273,10 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallable<Query, RowT> readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

SpanName span = getSpanName("ReadRows");
ServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable<>(
readRowsUserCallable,
clientContext.getTracerFactory(),
SpanName.of(CLIENT_NAME, "ReadRows"));
readRowsUserCallable, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -315,6 +319,7 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured by the {@code rowAdapter} parameter.
* <li>Add header tracer for tracking GFE metrics.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
Expand Down Expand Up @@ -356,10 +361,14 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
ServerStreamingCallable<ReadRowsRequest, RowT> watched =
Callables.watched(merging, innerSettings, clientContext);

ServerStreamingCallable<ReadRowsRequest, RowT> withHeaderTracer =
new HeaderTracerStreamingCallable<>(
watched, settings.getHeaderTracer(), getSpanName("ReadRows").toString());

// Retry logic is split into 2 parts to workaround a rare edge case described in
// ReadRowsRetryCompletedCallable
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
new ReadRowsRetryCompletedCallable<>(watched);
new ReadRowsRetryCompletedCallable<>(withHeaderTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
Expand All @@ -380,6 +389,8 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
* </ul>
*/
private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
String methodName = "SampleRowKeys";

ServerStreamingCallable<SampleRowKeysRequest, SampleRowKeysResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<SampleRowKeysRequest, SampleRowKeysResponse>newBuilder()
Expand All @@ -399,11 +410,15 @@ public Map<String, String> extract(

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> spoolable = base.all();

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
spoolable, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(spoolable, settings.sampleRowKeysSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.sampleRowKeysSettings(), clientContext);

return createUserFacingUnaryCallable(
"SampleRowKeys", new SampleRowKeysCallable(retryable, requestContext));
methodName, new SampleRowKeysCallable(retryable, requestContext));
}

/**
Expand All @@ -415,6 +430,7 @@ public Map<String, String> extract(
* </ul>
*/
private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
String methodName = "MutateRow";
UnaryCallable<MutateRowRequest, MutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<MutateRowRequest, MutateRowResponse>newBuilder()
Expand All @@ -431,11 +447,15 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
.build(),
settings.mutateRowSettings().getRetryableCodes());

UnaryCallable<MutateRowRequest, MutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(base, settings.mutateRowSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.mutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"MutateRow", new MutateRowCallable(retrying, requestContext));
methodName, new MutateRowCallable(retrying, requestContext));
}

/**
Expand All @@ -459,11 +479,13 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<BulkMutation, Void> userFacing =
new BulkMutateRowsUserFacingCallable(baseCallable, requestContext);

SpanName spanName = getSpanName("MutateRows");
UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(
userFacing, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, "MutateRows"));
new TracedUnaryCallable<>(userFacing, clientContext.getTracerFactory(), spanName);
UnaryCallable<BulkMutation, Void> withHeaderTracer =
new HeaderTracerUnaryCallable<>(traced, settings.getHeaderTracer(), spanName.toString());

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return withHeaderTracer.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -569,6 +591,7 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
* </ul>
*/
private UnaryCallable<ConditionalRowMutation, Boolean> createCheckAndMutateRowCallable() {
String methodName = "CheckAndMutateRow";
UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<CheckAndMutateRowRequest, CheckAndMutateRowResponse>newBuilder()
Expand All @@ -586,11 +609,15 @@ public Map<String, String> extract(
.build(),
settings.checkAndMutateRowSettings().getRetryableCodes());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(base, settings.checkAndMutateRowSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.checkAndMutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"CheckAndMutateRow", new CheckAndMutateRowCallable(retrying, requestContext));
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
}

/**
Expand Down Expand Up @@ -619,12 +646,16 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
})
.build(),
settings.readModifyWriteRowSettings().getRetryableCodes());
String methodName = "ReadModifyWriteRow";
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(base, settings.readModifyWriteRowSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.readModifyWriteRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"ReadModifyWriteRow", new ReadModifyWriteRowCallable(retrying, requestContext));
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
}

/**
Expand All @@ -635,8 +666,7 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
String methodName, UnaryCallable<RequestT, ResponseT> inner) {

UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(
inner, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, methodName));
new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -686,6 +716,10 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
}
// </editor-fold>

private SpanName getSpanName(String methodName) {
return SpanName.of(CLIENT_NAME, methodName);
}

@Override
public void close() {
for (BackgroundResource backgroundResource : clientContext.getBackgroundResources()) {
Expand Down
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracer;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -154,6 +155,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final String appProfileId;
private final boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private HeaderTracer headerTracer;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
Expand Down Expand Up @@ -187,6 +189,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
appProfileId = builder.appProfileId;
isRefreshingChannel = builder.isRefreshingChannel;
primedTableIds = builder.primedTableIds;
headerTracer = builder.headerTracer;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
Expand Down Expand Up @@ -231,6 +234,11 @@ public List<String> getPrimedTableIds() {
return primedTableIds;
}

/** Gets the tracer for capturing metrics in the header. */
HeaderTracer getHeaderTracer() {
return headerTracer;
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
Expand Down Expand Up @@ -488,6 +496,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private String appProfileId;
private boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private HeaderTracer headerTracer;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
Expand All @@ -511,6 +520,7 @@ private Builder() {
this.appProfileId = SERVER_DEFAULT_APP_PROFILE_ID;
this.isRefreshingChannel = false;
primedTableIds = ImmutableList.of();
headerTracer = HeaderTracer.newBuilder().build();
setCredentialsProvider(defaultCredentialsProviderBuilder().build());

// Defaults provider
Expand Down Expand Up @@ -617,6 +627,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
appProfileId = settings.appProfileId;
isRefreshingChannel = settings.isRefreshingChannel;
primedTableIds = settings.primedTableIds;
headerTracer = settings.headerTracer;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
Expand Down Expand Up @@ -739,6 +750,17 @@ public List<String> getPrimedTableIds() {
return primedTableIds;
}

/** Configure the header tracer for surfacing metrics in the header. */
mutianf marked this conversation as resolved.
Show resolved Hide resolved
Builder setHeaderTracer(HeaderTracer headerTracer) {
this.headerTracer = headerTracer;
return this;
}

/** Gets the header tracer that'll be used to surface metrics in the header. */
HeaderTracer getHeaderTracer() {
return headerTracer;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
Expand Down Expand Up @@ -818,6 +840,7 @@ public String toString() {
.add("appProfileId", appProfileId)
.add("isRefreshingChannel", isRefreshingChannel)
.add("primedTableIds", primedTableIds)
.add("headerTracer", headerTracer)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
Expand Down