Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: add status label to gfe metrics (#1077)
  • Loading branch information
mutianf committed Dec 1, 2021
1 parent d0709e8 commit 777549e
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 38 deletions.
Expand Up @@ -37,5 +37,5 @@ public abstract class BigtableTracer extends BaseApiTracer {
* the response from server-timing header. If server-timing header is missing, increment the
* missing header count.
*/
public abstract void recordGfeMetadata(@Nullable Long latency);
public abstract void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable);
}
Expand Up @@ -173,9 +173,9 @@ public int getAttempt() {
}

@Override
public void recordGfeMetadata(@Nullable Long latency) {
public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) {
for (BigtableTracer tracer : bigtableTracers) {
tracer.recordGfeMetadata(latency);
tracer.recordGfeMetadata(latency, throwable);
}
}
}
Expand Up @@ -96,15 +96,15 @@ public void onError(Throwable t) {
// so it's not checking trailing metadata here.
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency);
tracer.recordGfeMetadata(latency, t);
outerObserver.onError(t);
}

@Override
public void onComplete() {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency);
tracer.recordGfeMetadata(latency, null);
outerObserver.onComplete();
}
}
Expand Down
Expand Up @@ -16,6 +16,8 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
Expand Down Expand Up @@ -51,27 +53,42 @@ public HeaderTracerUnaryCallable(@Nonnull UnaryCallable<RequestT, ResponseT> inn

@Override
public ApiFuture futureCall(RequestT request, ApiCallContext context) {
if (RpcViews.isGfeMetricsRegistered()) {
// tracer should always be an instance of BigtableTracer
if (RpcViews.isGfeMetricsRegistered() && context.getTracer() instanceof BigtableTracer) {
final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
final ApiCallContext contextWithResponseMetadata = responseMetadata.addHandlers(context);
HeaderTracerUnaryCallback callback =
new HeaderTracerUnaryCallback((BigtableTracer) context.getTracer(), responseMetadata);
ApiFuture<ResponseT> future = innerCallable.futureCall(request, contextWithResponseMetadata);
future.addListener(
new Runnable() {
@Override
public void run() {
// this should always be true
if (contextWithResponseMetadata.getTracer() instanceof BigtableTracer) {
BigtableTracer tracer = (BigtableTracer) contextWithResponseMetadata.getTracer();
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency);
}
}
},
MoreExecutors.directExecutor());
ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor());
return future;
} else {
return innerCallable.futureCall(request, context);
}
}

class HeaderTracerUnaryCallback<ResponseT> implements ApiFutureCallback<ResponseT> {

private final BigtableTracer tracer;
private final GrpcResponseMetadata responseMetadata;

HeaderTracerUnaryCallback(BigtableTracer tracer, GrpcResponseMetadata responseMetadata) {
this.tracer = tracer;
this.responseMetadata = responseMetadata;
}

@Override
public void onFailure(Throwable throwable) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, throwable);
}

@Override
public void onSuccess(ResponseT response) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
}
}
}
Expand Up @@ -211,7 +211,7 @@ public int getAttempt() {
}

@Override
public void recordGfeMetadata(@Nullable Long latency) {
public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) {
MeasureMap measures = stats.newMeasureMap();
if (latency != null) {
measures
Expand All @@ -220,7 +220,10 @@ public void recordGfeMetadata(@Nullable Long latency) {
} else {
measures.put(RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT, 1L);
}
measures.record(newTagCtxBuilder().build());
measures.record(
newTagCtxBuilder()
.putLocal(RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable))
.build());
}

private TagContextBuilder newTagCtxBuilder() {
Expand Down
Expand Up @@ -136,7 +136,11 @@ class RpcViewConstants {
BIGTABLE_GFE_LATENCY,
AGGREGATION_WITH_MILLIS_HISTOGRAM,
ImmutableList.of(
BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP));
BIGTABLE_INSTANCE_ID,
BIGTABLE_PROJECT_ID,
BIGTABLE_APP_PROFILE_ID,
BIGTABLE_OP,
BIGTABLE_STATUS));

static final View BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW =
View.create(
Expand All @@ -145,5 +149,9 @@ class RpcViewConstants {
BIGTABLE_GFE_HEADER_MISSING_COUNT,
SUM,
ImmutableList.of(
BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP));
BIGTABLE_INSTANCE_ID,
BIGTABLE_PROJECT_ID,
BIGTABLE_APP_PROFILE_ID,
BIGTABLE_OP,
BIGTABLE_STATUS));
}
Expand Up @@ -23,6 +23,8 @@
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracer.Scope;
import com.google.common.collect.ImmutableList;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -222,8 +224,9 @@ public void testGetAttempt() {

@Test
public void testRecordGfeLatency() {
compositeTracer.recordGfeMetadata(20L);
verify(child3, times(1)).recordGfeMetadata(20L);
verify(child4, times(1)).recordGfeMetadata(20L);
Throwable t = new StatusRuntimeException(Status.UNAVAILABLE);
compositeTracer.recordGfeMetadata(20L, t);
verify(child3, times(1)).recordGfeMetadata(20L, t);
verify(child4, times(1)).recordGfeMetadata(20L, t);
}
}
Expand Up @@ -168,7 +168,8 @@ public void testGFELatencyMetricReadRows() throws InterruptedException {
localStats,
RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
ImmutableMap.<TagKey, TagValue>of(
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")),
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
PROJECT_ID,
INSTANCE_ID,
APP_PROFILE_ID);
Expand All @@ -186,7 +187,9 @@ public void testGFELatencyMetricMutateRow() throws InterruptedException {
StatsTestUtils.getAggregationValueAsLong(
localStats,
RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRow")),
ImmutableMap.of(
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRow"),
RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
PROJECT_ID,
INSTANCE_ID,
APP_PROFILE_ID);
Expand All @@ -208,7 +211,8 @@ public void testGFELatencyMetricMutateRows() throws InterruptedException {
localStats,
RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
ImmutableMap.of(
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRows")),
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRows"),
RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
PROJECT_ID,
INSTANCE_ID,
APP_PROFILE_ID);
Expand All @@ -226,7 +230,8 @@ public void testGFELatencySampleRowKeys() throws InterruptedException {
localStats,
RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
ImmutableMap.of(
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.SampleRowKeys")),
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.SampleRowKeys"),
RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
PROJECT_ID,
INSTANCE_ID,
APP_PROFILE_ID);
Expand All @@ -246,7 +251,8 @@ public void testGFELatencyCheckAndMutateRow() throws InterruptedException {
localStats,
RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
ImmutableMap.of(
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.CheckAndMutateRow")),
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.CheckAndMutateRow"),
RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
PROJECT_ID,
INSTANCE_ID,
APP_PROFILE_ID);
Expand All @@ -266,7 +272,8 @@ public void testGFELatencyReadModifyWriteRow() throws InterruptedException {
localStats,
RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
ImmutableMap.of(
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadModifyWriteRow")),
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadModifyWriteRow"),
RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
PROJECT_ID,
INSTANCE_ID,
APP_PROFILE_ID);
Expand All @@ -285,7 +292,11 @@ public void testGFEMissingHeaderMetric() throws InterruptedException {
StatsTestUtils.getAggregationValueAsLong(
localStats,
RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRow")),
ImmutableMap.of(
RpcMeasureConstants.BIGTABLE_OP,
TagValue.create("Bigtable.MutateRow"),
RpcMeasureConstants.BIGTABLE_STATUS,
TagValue.create("OK")),
PROJECT_ID,
INSTANCE_ID,
APP_PROFILE_ID);
Expand All @@ -294,7 +305,8 @@ public void testGFEMissingHeaderMetric() throws InterruptedException {
localStats,
RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
ImmutableMap.<TagKey, TagValue>of(
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")),
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows"),
RpcMeasureConstants.BIGTABLE_STATUS, TagValue.create("OK")),
PROJECT_ID,
INSTANCE_ID,
APP_PROFILE_ID);
Expand All @@ -321,7 +333,11 @@ public void testGFEMissingHeaderMetric() throws InterruptedException {
StatsTestUtils.getAggregationValueAsLong(
localStats,
RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.MutateRow")),
ImmutableMap.of(
RpcMeasureConstants.BIGTABLE_OP,
TagValue.create("Bigtable.MutateRow"),
RpcMeasureConstants.BIGTABLE_STATUS,
TagValue.create("OK")),
PROJECT_ID,
INSTANCE_ID,
APP_PROFILE_ID);
Expand All @@ -330,7 +346,10 @@ public void testGFEMissingHeaderMetric() throws InterruptedException {
localStats,
RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
ImmutableMap.<TagKey, TagValue>of(
RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")),
RpcMeasureConstants.BIGTABLE_OP,
TagValue.create("Bigtable.ReadRows"),
RpcMeasureConstants.BIGTABLE_STATUS,
TagValue.create("OK")),
PROJECT_ID,
INSTANCE_ID,
APP_PROFILE_ID);
Expand All @@ -353,7 +372,11 @@ public void testMetricsWithErrorResponse() throws InterruptedException {
StatsTestUtils.getAggregationValueAsLong(
localStats,
RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW,
ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")),
ImmutableMap.of(
RpcMeasureConstants.BIGTABLE_OP,
TagValue.create("Bigtable.ReadRows"),
RpcMeasureConstants.BIGTABLE_STATUS,
TagValue.create("UNAVAILABLE")),
PROJECT_ID,
INSTANCE_ID,
APP_PROFILE_ID);
Expand Down

0 comments on commit 777549e

Please sign in to comment.