Skip to content

Commit

Permalink
Extract server-timing trailer and create metrics for gfe latency
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Nov 12, 2020
1 parent 36e1ad2 commit 185aeff
Show file tree
Hide file tree
Showing 15 changed files with 730 additions and 114 deletions.
Expand Up @@ -23,6 +23,7 @@
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.stub.BigtableInterceptorProvider;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -128,6 +129,7 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder input) {
.setKeepAliveTimeout(
Duration.ofSeconds(10)) // wait this long before considering the connection dead
.setKeepAliveWithoutCalls(true) // sends ping without active streams
.setInterceptorProvider(BigtableInterceptorProvider.createDefault())
.build());

LOGGER.info("Connecting to the Bigtable emulator at " + hostname + ":" + port);
Expand Down
@@ -0,0 +1,44 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.common.collect.ImmutableList;
import io.grpc.ClientInterceptor;
import java.util.List;

public class BigtableInterceptorProvider implements GrpcInterceptorProvider {

private static final ClientHeaderInterceptor headerInterceptor = new ClientHeaderInterceptor();
private List<ClientInterceptor> clientInterceptors;

private BigtableInterceptorProvider(List<ClientInterceptor> interceptors) {
this.clientInterceptors = interceptors;
}

public static BigtableInterceptorProvider createDefault() {
return new BigtableInterceptorProvider(ImmutableList.<ClientInterceptor>of(headerInterceptor));
}

public static BigtableInterceptorProvider create(List<ClientInterceptor> interceptors) {
return new BigtableInterceptorProvider(interceptors);
}

@Override
public List<ClientInterceptor> getInterceptors() {
return clientInterceptors;
}
}
Expand Up @@ -42,7 +42,6 @@ class CheckAndMutateRowCallable extends UnaryCallable<ConditionalRowMutation, Bo
public ApiFuture<Boolean> futureCall(ConditionalRowMutation request, ApiCallContext context) {
ApiFuture<CheckAndMutateRowResponse> rawResponse =
inner.futureCall(request.toProto(requestContext), context);

return ApiFutures.transform(
rawResponse,
new ApiFunction<CheckAndMutateRowResponse, Boolean>() {
Expand Down
@@ -0,0 +1,96 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.gax.tracing.SpanName;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracer;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class ClientHeaderInterceptor implements ClientInterceptor {
public static final Metadata.Key<String> SERVER_TIMING_HEADER_KEY =
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);

private static final Logger LOGGER = Logger.getLogger(ClientHeaderInterceptor.class.getName());
private static final String CLIENT_NAME = "Bigtable";
private static final Pattern GFE_HEADER_PATTERN = Pattern.compile(".*dur=(?<dur>\\d+)");
private static final Pattern METHOD_DESCRIPTOR_PATTERN = Pattern.compile(".*/(?<op>[a-zA-Z]+)");

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, final CallOptions callOptions, Channel channel) {
final ClientCall<ReqT, RespT> clientCall = channel.newCall(method, callOptions);
final SpanName spanName = processMethodName(method.getFullMethodName());
final HeaderTracer tracer = callOptions.getOption(HeaderTracer.HEADER_TRACER_CONTEXT_KEY);
return new SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
processHeader(headers, tracer, spanName);
super.onHeaders(headers);
}
},
headers);
}
};
}

private SpanName processMethodName(String method) {
Matcher matcher = METHOD_DESCRIPTOR_PATTERN.matcher(method);
if (matcher.find()) {
return SpanName.of(CLIENT_NAME, matcher.group("op"));
}
LOGGER.warning(
String.format("Failed to get bigtable op name. Received method descriptor: %s.", method));
return null;
}

private void processHeader(Metadata headers, HeaderTracer tracer, SpanName span) {
if (tracer == null) {
LOGGER.warning("Couldn't find HeaderTracer in call options. Skip extracting gfe metrics");
return;
}
if (headers.get(SERVER_TIMING_HEADER_KEY) != null) {
String serverTiming = headers.get(SERVER_TIMING_HEADER_KEY);
Matcher matcher = GFE_HEADER_PATTERN.matcher(serverTiming);
tracer.recordHeader(RpcMeasureConstants.BIGTABLE_GFE_MISSING_COUNT, 0, span);
if (matcher.find()) {
long latency = Long.valueOf(matcher.group("dur"));
tracer.recordHeader(RpcMeasureConstants.BIGTABLE_GFE_LATENCY, latency, span);
} else {
LOGGER.warning(
String.format(
"Received invalid %s header: %s, failed to add to metrics.",
SERVER_TIMING_HEADER_KEY.name(), serverTiming));
}
} else {
tracer.recordHeader(RpcMeasureConstants.BIGTABLE_GFE_MISSING_COUNT, 1l, span);
}
}
}
Expand Up @@ -23,13 +23,15 @@
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcRawCallableFactory;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.RequestParamsExtractor;
Expand Down Expand Up @@ -65,9 +67,7 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.metrics.*;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
Expand All @@ -82,6 +82,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.opencensus.stats.Stats;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.TagKey;
Expand All @@ -91,6 +92,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import javax.annotation.Nonnull;

/**
Expand All @@ -109,6 +111,8 @@
public class EnhancedBigtableStub implements AutoCloseable {
private static final String CLIENT_NAME = "Bigtable";

private static final Logger LOGGER = Logger.getLogger(EnhancedBigtableStub.class.getName());

private final EnhancedBigtableStubSettings settings;
private final ClientContext clientContext;
private final RequestContext requestContext;
Expand Down Expand Up @@ -203,13 +207,26 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build()),
// Add user configured tracer
settings.getTracerFactory())));

HeaderTracer headerTracer = builder.getHeaderTracer();
headerTracer.setStats(stats);
headerTracer.setTagger(tagger);
headerTracer.setStatsAttributes(
ImmutableMap.<TagKey, TagValue>builder()
.put(RpcMeasureConstants.BIGTABLE_PROJECT_ID, TagValue.create(settings.getProjectId()))
.put(
RpcMeasureConstants.BIGTABLE_INSTANCE_ID, TagValue.create(settings.getInstanceId()))
.put(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build());
builder.setHeaderTracer(headerTracer);
return builder.build();
}

public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext clientContext) {
this.settings = settings;
this.clientContext = clientContext;

this.requestContext =
RequestContext.create(
settings.getProjectId(), settings.getInstanceId(), settings.getAppProfileId());
Expand Down Expand Up @@ -274,7 +291,7 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
clientContext.getTracerFactory(),
SpanName.of(CLIENT_NAME, "ReadRows"));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(getContextWithTracer());
}

/**
Expand Down Expand Up @@ -463,7 +480,7 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
new TracedUnaryCallable<>(
userFacing, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, "MutateRows"));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(getContextWithTracer());
}

/**
Expand Down Expand Up @@ -638,7 +655,7 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
new TracedUnaryCallable<>(
inner, clientContext.getTracerFactory(), SpanName.of(CLIENT_NAME, methodName));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(getContextWithTracer());
}
// </editor-fold>

Expand Down Expand Up @@ -686,6 +703,21 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
}
// </editor-fold>

private GrpcCallContext getContextWithTracer() {
ApiCallContext apiCallContext = clientContext.getDefaultCallContext();
if (!(apiCallContext instanceof GrpcCallContext)) {
LOGGER.warning(
"Failed to inject tracer in call context. Expected GrpcCallContext but had "
+ apiCallContext.getClass());
}
GrpcCallContext grpcCallContext = (GrpcCallContext) apiCallContext;
CallOptions options = grpcCallContext.getCallOptions();
options =
options.withOption(HeaderTracer.HEADER_TRACER_CONTEXT_KEY, settings.getHeaderTracer());
grpcCallContext = grpcCallContext.withCallOptions(options);
return grpcCallContext;
}

@Override
public void close() {
for (BackgroundResource backgroundResource : clientContext.getBackgroundResources()) {
Expand Down

0 comments on commit 185aeff

Please sign in to comment.