Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add batch throttled ms metric #888

Merged
merged 9 commits into from Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -77,6 +77,7 @@
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.TracedBatcherUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
Expand All @@ -88,6 +89,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -132,6 +134,7 @@ public class EnhancedBigtableStub implements AutoCloseable {

private final ServerStreamingCallable<Query, Row> readRowsCallable;
private final UnaryCallable<Query, Row> readRowCallable;
private final UnaryCallable<Query, List<Row>> bulkReadRowsCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
Expand Down Expand Up @@ -267,6 +270,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext

readRowsCallable = createReadRowsCallable(new DefaultRowAdapter());
readRowCallable = createReadRowCallable(new DefaultRowAdapter());
bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter());
sampleRowKeysCallable = createSampleRowKeysCallable();
mutateRowCallable = createMutateRowCallable();
bulkMutateRowsCallable = createBulkMutateRowsCallable();
Expand Down Expand Up @@ -430,6 +434,46 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}

/**
* Creates a callable chain to handle bulk ReadRows RPCs. This is meant to be used in ReadRows
* batcher. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest}.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured in by the {@code rowAdapter} parameter.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* <li>Construct a {@link UnaryCallable} that will buffer the entire stream into memory before
* completing. If the stream is empty, then the list will be empty.
* <li>Add tracing & metrics.
* </ul>
*/
private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
RowAdapter<RowT> rowAdapter) {
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter);

ServerStreamingCallable<Query, RowT> readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

SpanName span = getSpanName("ReadRows");

// The TracedBatcherUnaryCallable has to be wrapped by the TracedUnaryCallable, so that
// TracedUnaryCallable can inject a tracer for the TracedBatcherUnaryCallable to interact with
UnaryCallable<Query, List<RowT>> tracedBatcher =
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());

UnaryCallable<Query, List<RowT>> withHeaderTracer =
new HeaderTracerUnaryCallable(tracedBatcher);

UnaryCallable<Query, List<RowT>> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
* Creates a callable chain to handle SampleRowKeys RPcs. The chain will:
*
Expand Down Expand Up @@ -549,8 +593,12 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
flowControlCallable != null ? flowControlCallable : baseCallable, requestContext);

SpanName spanName = getSpanName("MutateRows");

UnaryCallable<BulkMutation, Void> tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> withHeaderTracer =
new HeaderTracerUnaryCallable<>(userFacing);
new HeaderTracerUnaryCallable<>(tracedBatcher);

UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName);

Expand Down Expand Up @@ -578,17 +626,14 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
*/
public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
@Nonnull String tableId, @Nullable GrpcCallContext ctx) {
UnaryCallable<BulkMutation, Void> callable = this.bulkMutateRowsCallable;
if (ctx != null) {
callable = callable.withDefaultCallContext(ctx);
}
return new BatcherImpl<>(
settings.bulkMutateRowsSettings().getBatchingDescriptor(),
callable,
bulkMutateRowsCallable,
BulkMutation.create(tableId),
settings.bulkMutateRowsSettings().getBatchingSettings(),
clientContext.getExecutor(),
bulkMutationFlowController);
bulkMutationFlowController,
MoreObjects.firstNonNull(ctx, clientContext.getDefaultCallContext()));
}

/**
Expand All @@ -609,16 +654,14 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
@Nonnull Query query, @Nullable GrpcCallContext ctx) {
Preconditions.checkNotNull(query, "query cannot be null");
UnaryCallable<Query, List<Row>> callable = readRowsCallable().all();
if (ctx != null) {
callable = callable.withDefaultCallContext(ctx);
}
return new BatcherImpl<>(
settings.bulkReadRowsSettings().getBatchingDescriptor(),
callable,
bulkReadRowsCallable,
query,
settings.bulkReadRowsSettings().getBatchingSettings(),
clientContext.getExecutor());
clientContext.getExecutor(),
null,
MoreObjects.firstNonNull(ctx, clientContext.getDefaultCallContext()));
}

/**
Expand Down
Expand Up @@ -21,21 +21,40 @@
import com.google.api.gax.tracing.BaseApiTracer;
import javax.annotation.Nullable;

/** A Bigtable specific {@link ApiTracer} that includes additional contexts. */
/**
* A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base
* implementation that does nothing.
*/
@BetaApi("This surface is stable yet it might be removed in the future.")
public abstract class BigtableTracer extends BaseApiTracer {
public class BigtableTracer extends BaseApiTracer {

private volatile int attempt = 0;

@Override
public void attemptStarted(int attemptNumber) {
this.attempt = attemptNumber;
}

/**
* Get the attempt number of the current call. Attempt number for the current call is passed in
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from
* {@link ApiCallContext}. Attempt number starts from 0.
*/
public abstract int getAttempt();
public int getAttempt() {
return attempt;
}

/**
* Record the latency between Google's network receives the RPC and reads back the first byte of
* the response from server-timing header. If server-timing header is missing, increment the
* missing header count.
*/
public abstract void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable);
public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) {
// noop
}

/** Adds an annotation of the total throttled time of a batch. */
public void batchRequestThrottled(long throttledTimeMs) {
// noop
}
}
Expand Up @@ -178,4 +178,11 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
tracer.recordGfeMetadata(latency, throwable);
}
}

@Override
public void batchRequestThrottled(long throttledTimeMs) {
for (BigtableTracer tracer : bigtableTracers) {
tracer.batchRequestThrottled(throttledTimeMs);
}
}
mutianf marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Up @@ -129,8 +129,8 @@ public void connectionSelected(String s) {
}

@Override
public void attemptStarted(int i) {
attempt = i;
public void attemptStarted(int attemptNumber) {
attempt = attemptNumber;
attemptCount++;
attemptTimer = Stopwatch.createStarted();
attemptResponseCount = 0;
Expand Down Expand Up @@ -226,6 +226,15 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
.build());
}

@Override
public void batchRequestThrottled(long totalThrottledMs) {
MeasureMap measures =
stats
.newMeasureMap()
.put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, totalThrottledMs);
measures.record(newTagCtxBuilder().build());
}

mutianf marked this conversation as resolved.
Show resolved Hide resolved
private TagContextBuilder newTagCtxBuilder() {
TagContextBuilder tagCtx =
tagger
Expand Down
Expand Up @@ -88,4 +88,11 @@ public class RpcMeasureConstants {
"cloud.google.com/java/bigtable/gfe_header_missing_count",
"Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network",
COUNT);

/** Total throttled time of a batch in msecs. */
public static final MeasureLong BIGTABLE_BATCH_THROTTLED_TIME =
MeasureLong.create(
"cloud.google.com/java/bigtable/batch_throttled_time",
"Total throttled time of a batch in msecs",
MILLISECOND);
}
Expand Up @@ -17,6 +17,7 @@

import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_HEADER_MISSING_COUNT;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_GFE_LATENCY;
import static com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants.BIGTABLE_INSTANCE_ID;
Expand Down Expand Up @@ -154,4 +155,14 @@ class RpcViewConstants {
BIGTABLE_APP_PROFILE_ID,
BIGTABLE_OP,
BIGTABLE_STATUS));

// use distribution so we can correlate batch throttled time with op_latency
static final View BIGTABLE_BATCH_THROTTLED_TIME_VIEW =
View.create(
View.Name.create("cloud.google.com/java/bigtable/batch_throttled_time"),
"Total throttled time of a batch in msecs",
BIGTABLE_BATCH_THROTTLED_TIME,
AGGREGATION_WITH_MILLIS_HISTOGRAM,
ImmutableList.of(
BIGTABLE_INSTANCE_ID, BIGTABLE_PROJECT_ID, BIGTABLE_APP_PROFILE_ID, BIGTABLE_OP));
}
Expand Up @@ -31,7 +31,8 @@ public class RpcViews {
RpcViewConstants.BIGTABLE_COMPLETED_OP_VIEW,
RpcViewConstants.BIGTABLE_READ_ROWS_FIRST_ROW_LATENCY_VIEW,
RpcViewConstants.BIGTABLE_ATTEMPT_LATENCY_VIEW,
RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW);
RpcViewConstants.BIGTABLE_ATTEMPTS_PER_OP_VIEW,
RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW);

private static final ImmutableSet<View> GFE_VIEW_SET =
ImmutableSet.of(
Expand Down
@@ -0,0 +1,50 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.ApiTracer;

/**
* This callable will extract total throttled time from {@link ApiCallContext} and add it to {@link
* ApiTracer}. This class needs to be wrapped by a callable that injects the {@link ApiTracer}.
*/
@InternalApi
public final class TracedBatcherUnaryCallable<RequestT, ResponseT>
extends UnaryCallable<RequestT, ResponseT> {
private final UnaryCallable<RequestT, ResponseT> innerCallable;

public TracedBatcherUnaryCallable(UnaryCallable innerCallable) {
this.innerCallable = innerCallable;
}

@Override
public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context) {
if (context.getOption(Batcher.THROTTLED_TIME_KEY) != null) {
ApiTracer tracer = context.getTracer();
// this should always be true
if (tracer instanceof BigtableTracer) {
((BigtableTracer) tracer)
.batchRequestThrottled(context.getOption(Batcher.THROTTLED_TIME_KEY));
}
}
return innerCallable.futureCall(request, context);
}
}
Expand Up @@ -15,16 +15,20 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracer.Scope;
import com.google.cloud.bigtable.misc_utilities.MethodComparator;
import com.google.common.collect.ImmutableList;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.lang.reflect.Method;
import java.util.Arrays;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -229,4 +233,20 @@ public void testRecordGfeLatency() {
verify(child3, times(1)).recordGfeMetadata(20L, t);
verify(child4, times(1)).recordGfeMetadata(20L, t);
}

@Test
public void testBatchRequestThrottled() {
compositeTracer.batchRequestThrottled(5L);
verify(child3, times(1)).batchRequestThrottled(5L);
verify(child4, times(1)).batchRequestThrottled(5L);
}

@Test
public void testMethodsOverride() {
Method[] baseMethods = BigtableTracer.class.getDeclaredMethods();
Method[] compositeTracerMethods = CompositeTracer.class.getDeclaredMethods();
assertThat(Arrays.asList(compositeTracerMethods))
.comparingElementsUsing(MethodComparator.METHOD_CORRESPONDENCE)
.containsAtLeastElementsIn(baseMethods);
}
}