Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Surface the server-timing metric #535

Merged
merged 15 commits into from Dec 17, 2020
Expand Up @@ -175,6 +175,12 @@ public static void enableOpenCensusStats() {
// io.opencensus.contrib.grpc.metrics.RpcViews.registerClientGrpcBasicViews();
}

/** Enables OpenCensus GFE metric aggregations. */
@BetaApi("OpenCensus stats integration is currently unstable and may change in the future")
public static void enableGfeOpenCensusStats() {
com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientGfeViews();
}

/** Returns the target project id. */
public String getProjectId() {
return stubSettings.getProjectId();
Expand Down
Expand Up @@ -66,6 +66,8 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
Expand Down Expand Up @@ -203,7 +205,25 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build()),
// Add user configured tracer
settings.getTracerFactory())));

builder.setHeaderTracer(
builder
.getHeaderTracer()
.toBuilder()
.setStats(stats)
.setTagger(tagger)
.setStatsAttributes(
ImmutableMap.<TagKey, TagValue>builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this map instance shared with the attrs above?

.put(
RpcMeasureConstants.BIGTABLE_PROJECT_ID,
TagValue.create(settings.getProjectId()))
.put(
RpcMeasureConstants.BIGTABLE_INSTANCE_ID,
TagValue.create(settings.getInstanceId()))
.put(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build())
.build());
return builder.build();
}

Expand Down Expand Up @@ -268,11 +288,10 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallable<Query, RowT> readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

SpanName span = getSpanName("ReadRows");
ServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable<>(
readRowsUserCallable,
clientContext.getTracerFactory(),
SpanName.of(CLIENT_NAME, "ReadRows"));
readRowsUserCallable, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -315,6 +334,7 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured by the {@code rowAdapter} parameter.
* <li>Add header tracer for tracking GFE metrics.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
Expand Down Expand Up @@ -356,10 +376,14 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
ServerStreamingCallable<ReadRowsRequest, RowT> watched =
Callables.watched(merging, innerSettings, clientContext);

ServerStreamingCallable<ReadRowsRequest, RowT> withHeaderTracer =
new HeaderTracerStreamingCallable<>(
watched, settings.getHeaderTracer(), getSpanName("ReadRows").toString());

// Retry logic is split into 2 parts to workaround a rare edge case described in
// ReadRowsRetryCompletedCallable
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
new ReadRowsRetryCompletedCallable<>(watched);
new ReadRowsRetryCompletedCallable<>(withHeaderTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
Expand All @@ -380,6 +404,8 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
* </ul>
*/
private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
String methodName = "SampleRowKeys";

ServerStreamingCallable<SampleRowKeysRequest, SampleRowKeysResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<SampleRowKeysRequest, SampleRowKeysResponse>newBuilder()
Expand All @@ -399,11 +425,15 @@ public Map<String, String> extract(

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> spoolable = base.all();

HeaderTracerUnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
spoolable, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(spoolable, settings.sampleRowKeysSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.sampleRowKeysSettings(), clientContext);

return createUserFacingUnaryCallable(
"SampleRowKeys", new SampleRowKeysCallable(retryable, requestContext));
methodName, new SampleRowKeysCallable(retryable, requestContext));
}

/**
Expand All @@ -415,6 +445,7 @@ public Map<String, String> extract(
* </ul>
*/
private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
String methodName = "MutateRow";
UnaryCallable<MutateRowRequest, MutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<MutateRowRequest, MutateRowResponse>newBuilder()
Expand All @@ -431,11 +462,15 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
.build(),
settings.mutateRowSettings().getRetryableCodes());

HeaderTracerUnaryCallable<MutateRowRequest, MutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(base, settings.mutateRowSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.mutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"MutateRow", new MutateRowCallable(retrying, requestContext));
methodName, new MutateRowCallable(retrying, requestContext));
}

/**
Expand All @@ -459,11 +494,13 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<BulkMutation, Void> userFacing =
new BulkMutateRowsUserFacingCallable(baseCallable, requestContext);

SpanName spanName = getSpanName("MutateRows");
UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(
userFacing, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, "MutateRows"));
new TracedUnaryCallable<>(userFacing, clientContext.getTracerFactory(), spanName);
HeaderTracerUnaryCallable<BulkMutation, Void> withHeaderTracer =
new HeaderTracerUnaryCallable<>(traced, settings.getHeaderTracer(), spanName.toString());

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return withHeaderTracer.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -569,6 +606,7 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
* </ul>
*/
private UnaryCallable<ConditionalRowMutation, Boolean> createCheckAndMutateRowCallable() {
String methodName = "CheckAndMutateRow";
UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<CheckAndMutateRowRequest, CheckAndMutateRowResponse>newBuilder()
Expand All @@ -586,11 +624,16 @@ public Map<String, String> extract(
.build(),
settings.checkAndMutateRowSettings().getRetryableCodes());

HeaderTracerUnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse>
withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(base, settings.checkAndMutateRowSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.checkAndMutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"CheckAndMutateRow", new CheckAndMutateRowCallable(retrying, requestContext));
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
}

/**
Expand Down Expand Up @@ -619,12 +662,16 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
})
.build(),
settings.readModifyWriteRowSettings().getRetryableCodes());

String methodName = "ReadModifyWriteRow";
HeaderTracerUnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse>
withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(base, settings.readModifyWriteRowSettings(), clientContext);
Callables.retrying(withHeaderTracer, settings.readModifyWriteRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"ReadModifyWriteRow", new ReadModifyWriteRowCallable(retrying, requestContext));
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
}

/**
Expand All @@ -635,8 +682,7 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
String methodName, UnaryCallable<RequestT, ResponseT> inner) {

UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(
inner, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, methodName));
new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
Expand Down Expand Up @@ -686,6 +732,10 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
}
// </editor-fold>

private SpanName getSpanName(String methodName) {
return SpanName.of(CLIENT_NAME, methodName);
}

@Override
public void close() {
for (BackgroundResource backgroundResource : clientContext.getBackgroundResources()) {
Expand Down
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracer;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -157,6 +158,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final String appProfileId;
private final boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private HeaderTracer headerTracer;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
Expand Down Expand Up @@ -196,6 +198,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
appProfileId = builder.appProfileId;
isRefreshingChannel = builder.isRefreshingChannel;
primedTableIds = builder.primedTableIds;
headerTracer = builder.headerTracer;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
Expand Down Expand Up @@ -240,6 +243,11 @@ public List<String> getPrimedTableIds() {
return primedTableIds;
}

/** Gets the tracer for capturing metrics in the header. */
HeaderTracer getHeaderTracer() {
return headerTracer;
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
Expand Down Expand Up @@ -501,6 +509,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private String appProfileId;
private boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private HeaderTracer headerTracer;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
Expand All @@ -524,6 +533,7 @@ private Builder() {
this.appProfileId = SERVER_DEFAULT_APP_PROFILE_ID;
this.isRefreshingChannel = false;
primedTableIds = ImmutableList.of();
headerTracer = HeaderTracer.newBuilder().build();
setCredentialsProvider(defaultCredentialsProviderBuilder().build());

// Defaults provider
Expand Down Expand Up @@ -637,6 +647,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
appProfileId = settings.appProfileId;
isRefreshingChannel = settings.isRefreshingChannel;
primedTableIds = settings.primedTableIds;
headerTracer = settings.headerTracer;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
Expand Down Expand Up @@ -759,6 +770,17 @@ public List<String> getPrimedTableIds() {
return primedTableIds;
}

/** Configure the header tracer for surfacing metrics in the header. */
mutianf marked this conversation as resolved.
Show resolved Hide resolved
Builder setHeaderTracer(HeaderTracer headerTracer) {
this.headerTracer = headerTracer;
return this;
}

/** Gets the header tracer that'll be used to surface metrics in the header. */
HeaderTracer getHeaderTracer() {
return headerTracer;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
Expand Down Expand Up @@ -838,6 +860,7 @@ public String toString() {
.add("appProfileId", appProfileId)
.add("isRefreshingChannel", isRefreshingChannel)
.add("primedTableIds", primedTableIds)
.add("headerTracer", headerTracer)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
Expand Down