Skip to content

Commit

Permalink
feat: add throttled time to ApiTracer
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Aug 11, 2021
1 parent 19e3b2e commit 82c0f09
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 6 deletions.
Expand Up @@ -193,6 +193,12 @@ public static void enableGfeOpenCensusStats() {
com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientGfeViews();
}

/** Enables OpenCensus metric aggregations for Batch operations. */
@BetaApi("OpenCensus stats integration is currently unstable and may change in the future")
public static void enableOpenCensusBatchStats() {
com.google.cloud.bigtable.data.v2.stub.metrics.RpcViews.registerBigtableClientBatchViews();
}

/** Returns the target project id. */
public String getProjectId() {
return stubSettings.getProjectId();
Expand Down
Expand Up @@ -75,6 +75,7 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
Expand Down Expand Up @@ -130,6 +131,7 @@ public class EnhancedBigtableStub implements AutoCloseable {

private final ServerStreamingCallable<Query, Row> readRowsCallable;
private final UnaryCallable<Query, Row> readRowCallable;
private final UnaryCallable<Query, List<Row>> bulkReadRowsCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
Expand Down Expand Up @@ -273,6 +275,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext

readRowsCallable = createReadRowsCallable(new DefaultRowAdapter());
readRowCallable = createReadRowCallable(new DefaultRowAdapter());
bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter());
sampleRowKeysCallable = createSampleRowKeysCallable();
mutateRowCallable = createMutateRowCallable();
bulkMutateRowsCallable = createBulkMutateRowsCallable();
Expand Down Expand Up @@ -434,6 +437,46 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}

/**
* Creates a callable chain to handle bulk ReadRows RPCs. This is meant to be used in ReadRows
* batcher. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest}.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured in by the {@code rowAdapter} parameter.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* <li>Construct a {@link UnaryCallable} that will buffer the entire stream into memory before
* completing. If the stream is empty, then the list will be empty.
* <li>Add tracing & metrics.
* </ul>
*/
private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
RowAdapter<RowT> rowAdapter) {
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter);

ServerStreamingCallable<Query, RowT> readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

SpanName span = getSpanName("ReadRows");

// TracedBatcherUnaryCallable needs to be created before TracedUnaryCallable so a tracer is
// created before TracedBatcherUnaryCallable is called.
UnaryCallable<Query, List<RowT>> tracedBatcher =
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());

UnaryCallable<Query, List<RowT>> traced =
new TracedUnaryCallable<>(tracedBatcher, clientContext.getTracerFactory(), span);

UnaryCallable<Query, List<RowT>> withHeaderTracer =
new HeaderTracerUnaryCallable(traced, settings.getHeaderTracer(), span.toString());

return withHeaderTracer.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
* Creates a callable chain to handle SampleRowKeys RPcs. The chain will:
*
Expand Down Expand Up @@ -549,8 +592,12 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
flowControlCallable != null ? flowControlCallable : baseCallable, requestContext);

SpanName spanName = getSpanName("MutateRows");

UnaryCallable<BulkMutation, Void> tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(userFacing, clientContext.getTracerFactory(), spanName);
new TracedUnaryCallable<>(tracedBatcher, clientContext.getTracerFactory(), spanName);

UnaryCallable<BulkMutation, Void> withHeaderTracer =
new HeaderTracerUnaryCallable<>(traced, settings.getHeaderTracer(), spanName.toString());

Expand Down Expand Up @@ -588,7 +635,8 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
BulkMutation.create(tableId),
settings.bulkMutateRowsSettings().getBatchingSettings(),
clientContext.getExecutor(),
bulkMutationFlowController);
bulkMutationFlowController,
ctx == null ? clientContext.getDefaultCallContext() : ctx);
}

/**
Expand All @@ -609,7 +657,7 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
@Nonnull Query query, @Nullable GrpcCallContext ctx) {
Preconditions.checkNotNull(query, "query cannot be null");
UnaryCallable<Query, List<Row>> callable = readRowsCallable().all();
UnaryCallable<Query, List<Row>> callable = bulkReadRowsCallable;
if (ctx != null) {
callable = callable.withDefaultCallContext(ctx);
}
Expand All @@ -618,7 +666,9 @@ public Batcher<ByteString, Row> newBulkReadRowsBatcher(
callable,
query,
settings.bulkReadRowsSettings().getBatchingSettings(),
clientContext.getExecutor());
clientContext.getExecutor(),
null,
ctx == null ? clientContext.getDefaultCallContext() : ctx);
}

/**
Expand Down
Expand Up @@ -152,4 +152,11 @@ public void batchRequestSent(long elementCount, long requestSize) {
child.batchRequestSent(elementCount, requestSize);
}
}

@Override
public void batchRequestThrottled(long throttledTimeMs) {
for (ApiTracer child : children) {
child.batchRequestThrottled(throttledTimeMs);
}
}
}
Expand Up @@ -203,6 +203,15 @@ public void batchRequestSent(long elementCount, long requestSize) {
// noop
}

@Override
public void batchRequestThrottled(long totalThrottledMs) {
MeasureMap measures =
stats
.newMeasureMap()
.put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, totalThrottledMs);
measures.record(newTagCtxBuilder().build());
}

private TagContextBuilder newTagCtxBuilder() {
TagContextBuilder tagCtx =
tagger
Expand Down
Expand Up @@ -88,4 +88,11 @@ public class RpcMeasureConstants {
"cloud.google.com/java/bigtable/gfe_header_missing_count",
"Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network",
COUNT);

/** Total throttled time of a batch in msecs. */
public static final MeasureLong BIGTABLE_BATCH_THROTTLED_TIME =
MeasureLong.create(
"cloud.google.com/java/bigtable/batch_throttled_time",
"Total throttled time of a batch in msecs",
MILLISECOND);
}
Expand Up @@ -17,6 +17,7 @@

import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_LATENCY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_INSTANCE_ID;
Expand Down Expand Up @@ -146,4 +147,13 @@ class RpcViewConstants {
SUM,
ImmutableList.of(
BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP));

static final View BIGTABLE_BATCH_THROTTLED_TIME_VIEW =
View.create(
View.Name.create("cloud.google.com/java/bigtable/batch_throttled_time"),
"Total throttled time of a batch in msecs",
BIGTABLE_BATCH_THROTTLED_TIME,
AGGREGATION_WITH_MILLIS_HISTOGRAM,
ImmutableList.of(
BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP));
}
Expand Up @@ -38,6 +38,9 @@ public class RpcViews {
RpcViewConstants.BIGTABLE_GFE_LATENCY_VIEW,
RpcViewConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT_VIEW);

private static final ImmutableSet<View> BIGTABLE_BATCH_VIEWS_SET =
ImmutableSet.of(RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW);

private static boolean gfeMetricsRegistered = false;

/** Registers all Bigtable specific views. */
Expand All @@ -55,6 +58,11 @@ public static void registerBigtableClientGfeViews() {
registerBigtableClientGfeViews(Stats.getViewManager());
}

/** Register Bigtable batch views. */
public static void registerBigtableClientBatchViews() {
registerBigtableClientBatchViews(Stats.getViewManager());
}

@VisibleForTesting
static void registerBigtableClientViews(ViewManager viewManager) {
for (View view : BIGTABLE_CLIENT_VIEWS_SET) {
Expand All @@ -70,6 +78,13 @@ static void registerBigtableClientGfeViews(ViewManager viewManager) {
gfeMetricsRegistered = true;
}

@VisibleForTesting
static void registerBigtableClientBatchViews(ViewManager viewManager) {
for (View view : BIGTABLE_BATCH_VIEWS_SET) {
viewManager.registerView(view);
}
}

static boolean isGfeMetricsRegistered() {
return gfeMetricsRegistered;
}
Expand Down
@@ -0,0 +1,47 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.gax.tracing.TracedUnaryCallable;
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.ApiTracer;

/**
* This callable will extract total throttled time from {@link ApiCallContext} and add it to {@link
* ApiTracer}.
*/
@InternalApi
public final class TracedBatcherUnaryCallable<RequestT, ResponseT>
extends UnaryCallable<RequestT, ResponseT> {
private final UnaryCallable<RequestT, ResponseT> innerCallable;

public TracedBatcherUnaryCallable(UnaryCallable innerCallable) {
this.innerCallable = innerCallable;
}

@Override
public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context) {
if (context.getOption(Batcher.THROTTLED_TIME_KEY) != null) {
ApiTracer tracer = context.getTracer();
tracer.batchRequestThrottled(context.getOption(Batcher.THROTTLED_TIME_KEY));
}
return innerCallable.futureCall(request, context);
}
}
Expand Up @@ -29,6 +29,7 @@
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
Expand Down Expand Up @@ -439,6 +440,7 @@ public void testCallContextPropagatedInMutationBatcher()
.setPrimedTableIds("table1", "table2")
.build();

ClientContext context = ClientContext.create(settings);
try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings)) {
// clear the previous contexts
contextInterceptor.contexts.clear();
Expand Down

0 comments on commit 82c0f09

Please sign in to comment.