Skip to content

Commit

Permalink
feat: add batch throttled ms metric (#888)
Browse files Browse the repository at this point in the history
* feat: add throttled time to ApiTracer

* fix abstract class

* update based on review

* fix format

* updates on comments

* make tests more readable

* update year

* fix test

* make the test more readable
  • Loading branch information
mutianf committed Dec 10, 2021
1 parent bd873bc commit 0d197a5
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 22 deletions.
Expand Up @@ -77,6 +77,7 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
Expand All @@ -88,6 +89,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -132,6 +134,7 @@ public class EnhancedBigtableStub implements AutoCloseable {

private final ServerStreamingCallable<Query, Row> readRowsCallable;
private final UnaryCallable<Query, Row> readRowCallable;
private final UnaryCallable<Query, List<Row>> bulkReadRowsCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
Expand Down Expand Up @@ -267,6 +270,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext

readRowsCallable = createReadRowsCallable(new DefaultRowAdapter());
readRowCallable = createReadRowCallable(new DefaultRowAdapter());
bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter());
sampleRowKeysCallable = createSampleRowKeysCallable();
mutateRowCallable = createMutateRowCallable();
bulkMutateRowsCallable = createBulkMutateRowsCallable();
Expand Down Expand Up @@ -430,6 +434,46 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}

/**
* Creates a callable chain to handle bulk ReadRows RPCs. This is meant to be used in ReadRows
* batcher. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest}.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured in by the {@code rowAdapter} parameter.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* <li>Construct a {@link UnaryCallable} that will buffer the entire stream into memory before
* completing. If the stream is empty, then the list will be empty.
* <li>Add tracing & metrics.
* </ul>
*/
private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
RowAdapter<RowT> rowAdapter) {
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter);

ServerStreamingCallable<Query, RowT> readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

SpanName span = getSpanName("ReadRows");

// The TracedBatcherUnaryCallable has to be wrapped by the TracedUnaryCallable, so that
// TracedUnaryCallable can inject a tracer for the TracedBatcherUnaryCallable to interact with
UnaryCallable<Query, List<RowT>> tracedBatcher =
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());

UnaryCallable<Query, List<RowT>> withHeaderTracer =
new HeaderTracerUnaryCallable(tracedBatcher);

UnaryCallable<Query, List<RowT>> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
* Creates a callable chain to handle SampleRowKeys RPcs. The chain will:
*
Expand Down Expand Up @@ -549,8 +593,12 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
flowControlCallable != null ? flowControlCallable : baseCallable, requestContext);

SpanName spanName = getSpanName("MutateRows");

UnaryCallable<BulkMutation, Void> tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> withHeaderTracer =
new HeaderTracerUnaryCallable<>(userFacing);
new HeaderTracerUnaryCallable<>(tracedBatcher);

UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName);

Expand Down Expand Up @@ -578,17 +626,14 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
*/
public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
@Nonnull String tableId, @Nullable GrpcCallContext ctx) {
UnaryCallable<BulkMutation, Void> callable = this.bulkMutateRowsCallable;
if (ctx != null) {
callable = callable.withDefaultCallContext(ctx);
}
return new BatcherImpl<>(
settings.bulkMutateRowsSettings().getBatchingDescriptor(),
callable,
bulkMutateRowsCallable,
BulkMutation.create(tableId),
settings.bulkMutateRowsSettings().getBatchingSettings(),
clientContext.getExecutor(),
bulkMutationFlowController);
bulkMutationFlowController,
MoreObjects.firstNonNull(ctx, clientContext.getDefaultCallContext()));
}

/**
Expand All @@ -609,16 +654,14 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
@Nonnull Query query, @Nullable GrpcCallContext ctx) {
Preconditions.checkNotNull(query, "query cannot be null");
UnaryCallable<Query, List<Row>> callable = readRowsCallable().all();
if (ctx != null) {
callable = callable.withDefaultCallContext(ctx);
}
return new BatcherImpl<>(
settings.bulkReadRowsSettings().getBatchingDescriptor(),
callable,
bulkReadRowsCallable,
query,
settings.bulkReadRowsSettings().getBatchingSettings(),
clientContext.getExecutor());
clientContext.getExecutor(),
null,
MoreObjects.firstNonNull(ctx, clientContext.getDefaultCallContext()));
}

/**
Expand Down
Expand Up @@ -21,21 +21,40 @@
import com.google.api.gax.tracing.BaseApiTracer;
import javax.annotation.Nullable;

/** A Bigtable specific {@link ApiTracer} that includes additional contexts. */
/**
* A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base
* implementation that does nothing.
*/
@BetaApi("This surface is stable yet it might be removed in the future.")
public abstract class BigtableTracer extends BaseApiTracer {
public class BigtableTracer extends BaseApiTracer {

private volatile int attempt = 0;

@Override
public void attemptStarted(int attemptNumber) {
this.attempt = attemptNumber;
}

/**
* Get the attempt number of the current call. Attempt number for the current call is passed in
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from
* {@link ApiCallContext}. Attempt number starts from 0.
*/
public abstract int getAttempt();
public int getAttempt() {
return attempt;
}

/**
* Record the latency between Google's network receives the RPC and reads back the first byte of
* the response from server-timing header. If server-timing header is missing, increment the
* missing header count.
*/
public abstract void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable);
public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) {
// noop
}

/** Adds an annotation of the total throttled time of a batch. */
public void batchRequestThrottled(long throttledTimeMs) {
// noop
}
}
Expand Up @@ -178,4 +178,11 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
tracer.recordGfeMetadata(latency, throwable);
}
}

@Override
public void batchRequestThrottled(long throttledTimeMs) {
for (BigtableTracer tracer : bigtableTracers) {
tracer.batchRequestThrottled(throttledTimeMs);
}
}
}
Expand Up @@ -129,8 +129,8 @@ public void connectionSelected(String s) {
}

@Override
public void attemptStarted(int i) {
attempt = i;
public void attemptStarted(int attemptNumber) {
attempt = attemptNumber;
attemptCount++;
attemptTimer = Stopwatch.createStarted();
attemptResponseCount = 0;
Expand Down Expand Up @@ -226,6 +226,15 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
.build());
}

@Override
public void batchRequestThrottled(long totalThrottledMs) {
MeasureMap measures =
stats
.newMeasureMap()
.put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, totalThrottledMs);
measures.record(newTagCtxBuilder().build());
}

private TagContextBuilder newTagCtxBuilder() {
TagContextBuilder tagCtx =
tagger
Expand Down
Expand Up @@ -88,4 +88,11 @@ public class RpcMeasureConstants {
"cloud.google.com/java/bigtable/gfe_header_missing_count",
"Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network",
COUNT);

/** Total throttled time of a batch in msecs. */
public static final MeasureLong BIGTABLE_BATCH_THROTTLED_TIME =
MeasureLong.create(
"cloud.google.com/java/bigtable/batch_throttled_time",
"Total throttled time of a batch in msecs",
MILLISECOND);
}
Expand Up @@ -17,6 +17,7 @@

import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_LATENCY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_INSTANCE_ID;
Expand Down Expand Up @@ -154,4 +155,14 @@ class RpcViewConstants {
BIGTABLE_APP_PROFILE_ID,
BIGTABLE_OP,
BIGTABLE_STATUS));

// use distribution so we can correlate batch throttled time with op_latency
static final View BIGTABLE_BATCH_THROTTLED_TIME_VIEW =
View.create(
View.Name.create("cloud.google.com/java/bigtable/batch_throttled_time"),
"Total throttled time of a batch in msecs",
BIGTABLE_BATCH_THROTTLED_TIME,
AGGREGATION_WITH_MILLIS_HISTOGRAM,
ImmutableList.of(
BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP));
}
Expand Up @@ -31,7 +31,8 @@ public class RpcViews {
RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW,
RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW,
RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW,
RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW);
RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW,
RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW);

private static final ImmutableSet<View> GFE_VIEW_SET =
ImmutableSet.of(
Expand Down
@@ -0,0 +1,50 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.ApiTracer;

/**
* This callable will extract total throttled time from {@link ApiCallContext} and add it to {@link
* ApiTracer}. This class needs to be wrapped by a callable that injects the {@link ApiTracer}.
*/
@InternalApi
public final class TracedBatcherUnaryCallable<RequestT, ResponseT>
extends UnaryCallable<RequestT, ResponseT> {
private final UnaryCallable<RequestT, ResponseT> innerCallable;

public TracedBatcherUnaryCallable(UnaryCallable innerCallable) {
this.innerCallable = innerCallable;
}

@Override
public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context) {
if (context.getOption(Batcher.THROTTLED_TIME_KEY) != null) {
ApiTracer tracer = context.getTracer();
// this should always be true
if (tracer instanceof BigtableTracer) {
((BigtableTracer) tracer)
.batchRequestThrottled(context.getOption(Batcher.THROTTLED_TIME_KEY));
}
}
return innerCallable.futureCall(request, context);
}
}
Expand Up @@ -15,16 +15,20 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracer.Scope;
import com.google.cloud.bigtable.misc_utilities.MethodComparator;
import com.google.common.collect.ImmutableList;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.lang.reflect.Method;
import java.util.Arrays;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -229,4 +233,20 @@ public void testRecordGfeLatency() {
verify(child3, times(1)).recordGfeMetadata(20L, t);
verify(child4, times(1)).recordGfeMetadata(20L, t);
}

@Test
public void testBatchRequestThrottled() {
compositeTracer.batchRequestThrottled(5L);
verify(child3, times(1)).batchRequestThrottled(5L);
verify(child4, times(1)).batchRequestThrottled(5L);
}

@Test
public void testMethodsOverride() {
Method[] baseMethods = BigtableTracer.class.getDeclaredMethods();
Method[] compositeTracerMethods = CompositeTracer.class.getDeclaredMethods();
assertThat(Arrays.asList(compositeTracerMethods))
.comparingElementsUsing(MethodComparator.METHOD_CORRESPONDENCE)
.containsAtLeastElementsIn(baseMethods);
}
}

0 comments on commit 0d197a5

Please sign in to comment.