Skip to content

Commit

Permalink
feat: add throttled time to ApiTracer
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Dec 3, 2021
1 parent 4ec6efc commit 81370d0
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 7 deletions.
Expand Up @@ -77,6 +77,7 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
Expand Down Expand Up @@ -132,6 +133,7 @@ public class EnhancedBigtableStub implements AutoCloseable {

private final ServerStreamingCallable<Query, Row> readRowsCallable;
private final UnaryCallable<Query, Row> readRowCallable;
private final UnaryCallable<Query, List<Row>> bulkReadRowsCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
Expand Down Expand Up @@ -267,6 +269,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext

readRowsCallable = createReadRowsCallable(new DefaultRowAdapter());
readRowCallable = createReadRowCallable(new DefaultRowAdapter());
bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter());
sampleRowKeysCallable = createSampleRowKeysCallable();
mutateRowCallable = createMutateRowCallable();
bulkMutateRowsCallable = createBulkMutateRowsCallable();
Expand Down Expand Up @@ -430,6 +433,46 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}

/**
* Creates a callable chain to handle bulk ReadRows RPCs. This is meant to be used in ReadRows
* batcher. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest}.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured in by the {@code rowAdapter} parameter.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* <li>Construct a {@link UnaryCallable} that will buffer the entire stream into memory before
* completing. If the stream is empty, then the list will be empty.
* <li>Add tracing & metrics.
* </ul>
*/
private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
RowAdapter<RowT> rowAdapter) {
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter);

ServerStreamingCallable<Query, RowT> readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

SpanName span = getSpanName("ReadRows");

// TracedBatcherUnaryCallable needs to be created before TracedUnaryCallable so a tracer is
// created before TracedBatcherUnaryCallable is called.
UnaryCallable<Query, List<RowT>> tracedBatcher =
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());

UnaryCallable<Query, List<RowT>> withHeaderTracer =
new HeaderTracerUnaryCallable(tracedBatcher);

UnaryCallable<Query, List<RowT>> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
* Creates a callable chain to handle SampleRowKeys RPcs. The chain will:
*
Expand Down Expand Up @@ -549,8 +592,12 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
flowControlCallable != null ? flowControlCallable : baseCallable, requestContext);

SpanName spanName = getSpanName("MutateRows");

UnaryCallable<BulkMutation, Void> tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> withHeaderTracer =
new HeaderTracerUnaryCallable<>(userFacing);
new HeaderTracerUnaryCallable<>(tracedBatcher);

UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName);

Expand Down Expand Up @@ -588,7 +635,8 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
BulkMutation.create(tableId),
settings.bulkMutateRowsSettings().getBatchingSettings(),
clientContext.getExecutor(),
bulkMutationFlowController);
bulkMutationFlowController,
ctx == null ? clientContext.getDefaultCallContext() : ctx);
}

/**
Expand All @@ -609,7 +657,7 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
@Nonnull Query query, @Nullable GrpcCallContext ctx) {
Preconditions.checkNotNull(query, "query cannot be null");
UnaryCallable<Query, List<Row>> callable = readRowsCallable().all();
UnaryCallable<Query, List<Row>> callable = bulkReadRowsCallable;
if (ctx != null) {
callable = callable.withDefaultCallContext(ctx);
}
Expand All @@ -618,7 +666,9 @@ public Batcher<ByteString, Row> newBulkReadRowsBatcher(
callable,
query,
settings.bulkReadRowsSettings().getBatchingSettings(),
clientContext.getExecutor());
clientContext.getExecutor(),
null,
ctx == null ? clientContext.getDefaultCallContext() : ctx);
}

/**
Expand Down
Expand Up @@ -38,4 +38,7 @@ public abstract class BigtableTracer extends BaseApiTracer {
* missing header count.
*/
public abstract void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable);

/** Adds an annotation of the total throttled time of a batch. */
public abstract void batchRequestThrottled(long throttledTimeMs);
}
Expand Up @@ -178,4 +178,11 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
tracer.recordGfeMetadata(latency, throwable);
}
}

@Override
public void batchRequestThrottled(long throttledTimeMs) {
for (BigtableTracer tracer : bigtableTracers) {
tracer.batchRequestThrottled(throttledTimeMs);
}
}
}
Expand Up @@ -226,6 +226,15 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
.build());
}

@Override
public void batchRequestThrottled(long totalThrottledMs) {
MeasureMap measures =
stats
.newMeasureMap()
.put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, totalThrottledMs);
measures.record(newTagCtxBuilder().build());
}

private TagContextBuilder newTagCtxBuilder() {
TagContextBuilder tagCtx =
tagger
Expand Down
Expand Up @@ -88,4 +88,11 @@ public class RpcMeasureConstants {
"cloud.google.com/java/bigtable/gfe_header_missing_count",
"Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network",
COUNT);

/** Total throttled time of a batch in msecs. */
public static final MeasureLong BIGTABLE_BATCH_THROTTLED_TIME =
MeasureLong.create(
"cloud.google.com/java/bigtable/batch_throttled_time",
"Total throttled time of a batch in msecs",
MILLISECOND);
}
Expand Up @@ -17,6 +17,7 @@

import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_LATENCY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_INSTANCE_ID;
Expand Down Expand Up @@ -154,4 +155,13 @@ class RpcViewConstants {
BIGTABLE_APP_PROFILE_ID,
BIGTABLE_OP,
BIGTABLE_STATUS));

static final View BIGTABLE_BATCH_THROTTLED_TIME_VIEW =
View.create(
View.Name.create("cloud.google.com/java/bigtable/batch_throttled_time"),
"Total throttled time of a batch in msecs",
BIGTABLE_BATCH_THROTTLED_TIME,
AGGREGATION_WITH_MILLIS_HISTOGRAM,
ImmutableList.of(
BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP));
}
Expand Up @@ -31,7 +31,8 @@ public class RpcViews {
RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW,
RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW,
RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW,
RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW);
RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW,
RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW);

private static final ImmutableSet<View> GFE_VIEW_SET =
ImmutableSet.of(
Expand Down
@@ -0,0 +1,50 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.ApiTracer;

/**
* This callable will extract total throttled time from {@link ApiCallContext} and add it to {@link
* ApiTracer}.
*/
@InternalApi
public final class TracedBatcherUnaryCallable<RequestT, ResponseT>
extends UnaryCallable<RequestT, ResponseT> {
private final UnaryCallable<RequestT, ResponseT> innerCallable;

public TracedBatcherUnaryCallable(UnaryCallable innerCallable) {
this.innerCallable = innerCallable;
}

@Override
public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context) {
if (context.getOption(Batcher.THROTTLED_TIME_KEY) != null) {
ApiTracer tracer = context.getTracer();
// this should always be true
if (tracer instanceof BigtableTracer) {
((BigtableTracer) tracer)
.batchRequestThrottled(context.getOption(Batcher.THROTTLED_TIME_KEY));
}
}
return innerCallable.futureCall(request, context);
}
}

0 comments on commit 81370d0

Please sign in to comment.