Skip to content

Commit

Permalink
refactor: reimplement metrics using the ApiTracer api
Browse files Browse the repository at this point in the history
This will allow to us to keep metrics for RPCs tagged with bigtable tags like table id. This will also be the mechanism that bigtable-hbase will use to plugin its dropwizard metrics implementation
  • Loading branch information
igorbernstein2 committed Mar 5, 2020
1 parent 0afccbb commit 1e414f4
Show file tree
Hide file tree
Showing 12 changed files with 346 additions and 902 deletions.
Expand Up @@ -58,9 +58,6 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.MeasuredMutateRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MeasuredReadRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MeasuredUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
Expand All @@ -74,10 +71,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import io.opencensus.stats.Stats;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.Tagger;
import io.opencensus.tags.Tags;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand All @@ -103,10 +96,6 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final ClientContext clientContext;
private final RequestContext requestContext;

// TODO: This should probably move to ClientContext
private final Tagger tagger;
private final StatsRecorder statsRecorder;

private final ServerStreamingCallable<Query, Row> readRowsCallable;
private final UnaryCallable<Query, Row> readRowCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
Expand All @@ -119,20 +108,15 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
throws IOException {
ClientContext clientContext = ClientContext.create(settings);

return new EnhancedBigtableStub(
settings, clientContext, Tags.getTagger(), Stats.getStatsRecorder());
return new EnhancedBigtableStub(settings, clientContext);
}

@InternalApi("Visible for testing")
private EnhancedBigtableStub(
EnhancedBigtableStubSettings settings,
ClientContext clientContext,
Tagger tagger,
StatsRecorder statsRecorder) {
ClientContext clientContext) {
this.settings = settings;
this.clientContext = clientContext;
this.tagger = tagger;
this.statsRecorder = statsRecorder;
this.requestContext =
RequestContext.create(
settings.getProjectId(), settings.getInstanceId(), settings.getAppProfileId());
Expand Down Expand Up @@ -173,15 +157,7 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, "ReadRows"));

ServerStreamingCallable<Query, RowT> measured =
new MeasuredReadRowsCallable<>(
traced,
TRACING_OUTER_CLIENT_NAME + ".ReadRows",
tagger,
statsRecorder,
clientContext.getClock());

return measured.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -368,15 +344,7 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, "MutateRows"));

UnaryCallable<BulkMutation, Void> measured =
new MeasuredMutateRowsCallable(
traced,
TRACING_OUTER_CLIENT_NAME + ".MutateRows",
tagger,
statsRecorder,
clientContext.getClock());

return measured.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -548,15 +516,7 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, methodName));

UnaryCallable<RequestT, ResponseT> measured =
new MeasuredUnaryCallable<>(
traced,
TRACING_OUTER_CLIENT_NAME + "." + methodName,
tagger,
statsRecorder,
clientContext.getClock());

return measured.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
// </editor-fold>

Expand Down
Expand Up @@ -38,12 +38,16 @@
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.opencensus.stats.Stats;
import io.opencensus.tags.Tags;
import java.util.List;
import java.util.Set;
import java.util.logging.Logger;
Expand Down Expand Up @@ -519,11 +523,19 @@ private Builder() {
setStreamWatchdogProvider(baseDefaults.getStreamWatchdogProvider());

setTracerFactory(
new OpencensusTracerFactory(
ImmutableMap.of(
"gax", GaxGrpcProperties.getGaxGrpcVersion(),
"grpc", GaxGrpcProperties.getGrpcVersion(),
"gapic", GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class))));
new CompositeTracerFactory(
ImmutableList.of(
new MetricsTracerFactory(
Tags.getTagger(),
Stats.getStatsRecorder()
),
new OpencensusTracerFactory(
ImmutableMap.of(
"gax", GaxGrpcProperties.getGaxGrpcVersion(),
"grpc", GaxGrpcProperties.getGrpcVersion(),
"gapic", GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class)))
)
));

// Per-method settings using baseSettings for defaults.
readRowsSettings = ServerStreamingCallSettings.newBuilder();
Expand Down
@@ -0,0 +1,161 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracerFactory.OperationType;
import com.google.common.base.Stopwatch;
import io.opencensus.stats.MeasureMap;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.TagContextBuilder;
import io.opencensus.tags.TagValue;
import io.opencensus.tags.Tagger;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

class CensusTracer implements ApiTracer {
private final OperationType operationType;

private final TagValue methodName;
private final TagContext parentCtx;

private final Tagger tagger;
private final StatsRecorder stats;

private AtomicBoolean opFinished = new AtomicBoolean();
private Stopwatch operationTimer = Stopwatch.createStarted();
private Stopwatch firstResponseTimer = Stopwatch.createStarted();
private long operationResponseCount = 0;

private Stopwatch attemptTimer;
private int attemptCount = 0;
private long attemptResponseCount = 0;
private String attemptConnectionId = "";

CensusTracer(OperationType operationType, TagValue methodName,
TagContext parentCtx, Tagger tagger, StatsRecorder stats) {
this.operationType = operationType;
this.methodName = methodName;
this.parentCtx = parentCtx;
this.tagger = tagger;
this.stats = stats;
}

@Override
public Scope inScope() {
return new Scope() {
@Override
public void close() { }
};
}

@Override
public void operationSucceeded() {
recordOperationCompletion(null);
}

@Override
public void operationCancelled() {
recordOperationCompletion(new CancellationException());
}

@Override
public void operationFailed(Throwable throwable) {
recordOperationCompletion(throwable);
}

private void recordOperationCompletion(@Nullable Throwable throwable) {
if (!opFinished.compareAndSet(false, true)) {
return;
}

TagContextBuilder tagBuilder = tagger.toBuilder(parentCtx)
//TODO: projectId, instanceId, tableId, appProfileId
.putLocal(RpcMeasureConstants.BIGTABLE_OP, methodName)
.putLocal(
RpcMeasureConstants.BIGTABLE_STATUS, Util.extractStatus(throwable));

MeasureMap measures = stats.newMeasureMap()
.put(RpcMeasureConstants.BIGTABLE_OP_LATENCY,
operationTimer.elapsed(TimeUnit.MILLISECONDS));

if (operationType == OperationType.ServerStreaming && methodName.asString().endsWith("ReadRows")) {
measures.put(RpcMeasureConstants.BIGTABLE_ROWS_READ_PER_OP, operationResponseCount);
measures.put(RpcMeasureConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY, firstResponseTimer.elapsed(TimeUnit.MILLISECONDS));
}

measures.record(tagBuilder.build());
}

@Override
public void connectionSelected(String s) {
this.attemptConnectionId = s;
}

@Override
public void attemptStarted(int i) {
attemptTimer = Stopwatch.createStarted();
attemptCount++;
attemptResponseCount = 0;
}

@Override
public void attemptSucceeded() {
recordAttemptCompletion(null, null);
}

@Override
public void attemptCancelled() {
recordAttemptCompletion(new CancellationException(), null);
}

@Override
public void attemptFailed(Throwable throwable, Duration duration) {
recordAttemptCompletion(throwable, duration);
}

@Override
public void attemptFailedRetriesExhausted(Throwable throwable) {
recordAttemptCompletion(throwable, null);
}

@Override
public void attemptPermanentFailure(Throwable throwable) {
recordAttemptCompletion(throwable, null);
}

private void recordAttemptCompletion(@Nullable Throwable throwable, @Nullable Duration nextAttemptDelay) {
// TODO: record attempts
}

@Override
public void lroStartFailed(Throwable throwable) {
// noop
}

@Override
public void lroStartSucceeded() {
// noop
}

@Override
public void responseReceived() {
if(firstResponseTimer.isRunning()) {
firstResponseTimer.stop();
}
attemptResponseCount++;
operationResponseCount++;
}

@Override
public void requestSent() {
// noop: no operations are client streaming
}

@Override
public void batchRequestSent(long elementCount, long requestSize) {
// noop: there is a server side metric that can be used instead
}
}

0 comments on commit 1e414f4

Please sign in to comment.