Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: send attempt and timestamp in headers (#935)
* feat: send attempt and timestamp in headers

* refactor

* update clirr files

* refactor

* review updates

* remove "x-"

* update

* Add epoch precision to the header

* use microseconds

* update micro seconds calculation

* fix formatting

* Rename headers class

* rename local variables

* update comment
  • Loading branch information
mutianf committed Oct 29, 2021
1 parent a141aa3 commit de3b476
Show file tree
Hide file tree
Showing 10 changed files with 474 additions and 44 deletions.
5 changes: 5 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Expand Up @@ -34,4 +34,9 @@
<className>com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub</className>
<method>*</method>
</difference>
<!-- InternalApi that was renamed -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory</className>
</difference>
</differences>
Expand Up @@ -70,11 +70,13 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.RpcMeasureConstants;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersServerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.StatsHeadersUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.BulkMutateRowsUserFacingCallable;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
Expand Down Expand Up @@ -191,7 +193,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build();
// Inject Opencensus instrumentation
builder.setTracerFactory(
new CompositeTracerFactory(
new BigtableTracerFactory(
ImmutableList.of(
// Add OpenCensus Tracing
new OpencensusTracerFactory(
Expand Down Expand Up @@ -397,11 +399,14 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
.build(),
readRowsSettings.getRetryableCodes());

ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> withStatsHeaders =
new StatsHeadersServerStreamingCallable<>(base);

// Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> convertException =
new ReadRowsConvertExceptionCallable<>(base);
new ReadRowsConvertExceptionCallable<>(withStatsHeaders);

ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(convertException, rowAdapter);
Expand Down Expand Up @@ -468,9 +473,12 @@ public Map<String, String> extract(

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> spoolable = base.all();

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withStatsHeaders =
new StatsHeadersUnaryCallable<>(spoolable);

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
spoolable, settings.getHeaderTracer(), getSpanName(methodName).toString());
withStatsHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(withHeaderTracer, settings.sampleRowKeysSettings(), clientContext);
Expand Down Expand Up @@ -505,9 +513,12 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
.build(),
settings.mutateRowSettings().getRetryableCodes());

UnaryCallable<MutateRowRequest, MutateRowResponse> withStatsHeaders =
new StatsHeadersUnaryCallable<>(base);

UnaryCallable<MutateRowRequest, MutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());
withStatsHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.mutateRowSettings(), clientContext);
Expand Down Expand Up @@ -646,6 +657,9 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
.build(),
settings.bulkMutateRowsSettings().getRetryableCodes());

ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withStatsHeaders =
new StatsHeadersServerStreamingCallable<>(base);

RetryAlgorithm<Void> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
Expand All @@ -656,7 +670,7 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {

return new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
base,
withStatsHeaders,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes());
}
Expand Down Expand Up @@ -689,9 +703,12 @@ public Map<String, String> extract(
.build(),
settings.checkAndMutateRowSettings().getRetryableCodes());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withStatsHeaders =
new StatsHeadersUnaryCallable<>(base);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());
withStatsHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.checkAndMutateRowSettings(), clientContext);
Expand Down Expand Up @@ -726,10 +743,14 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
})
.build(),
settings.readModifyWriteRowSettings().getRetryableCodes());

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withStatsHeaders =
new StatsHeadersUnaryCallable<>(base);

String methodName = "ReadModifyWriteRow";
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> withHeaderTracer =
new HeaderTracerUnaryCallable<>(
base, settings.getHeaderTracer(), getSpanName(methodName).toString());
withStatsHeaders, settings.getHeaderTracer(), getSpanName(methodName).toString());

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(withHeaderTracer, settings.readModifyWriteRowSettings(), clientContext);
Expand Down
Expand Up @@ -15,18 +15,25 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.BaseApiTracer;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import org.threeten.bp.Duration;

/** Combines multiple {@link ApiTracer}s into a single {@link ApiTracer}. */
class CompositeTracer extends BaseApiTracer {
/**
* A Bigtable specific {@link ApiTracer} that will be used to plumb additional context through the
* call chains as well as combines multiple user defined {@link ApiTracer}s into a single one. This
* will ensure that operation lifecycle events are plumbed through while maintaining user configured
* functionalities.
*/
class BigtableTracer extends BaseApiTracer {
private final List<ApiTracer> children;
private volatile int attempt = 0;

CompositeTracer(List<ApiTracer> children) {
BigtableTracer(List<ApiTracer> children) {
this.children = ImmutableList.copyOf(children);
}

Expand Down Expand Up @@ -78,6 +85,7 @@ public void connectionSelected(String id) {

@Override
public void attemptStarted(int attemptNumber) {
this.attempt = attemptNumber;
for (ApiTracer child : children) {
child.attemptStarted(attemptNumber);
}
Expand Down Expand Up @@ -152,4 +160,13 @@ public void batchRequestSent(long elementCount, long requestSize) {
child.batchRequestSent(elementCount, requestSize);
}
}

/**
* Get the attempt number of the current call. Attempt number for the current call is passed in
* and recorded in {@link #attemptStarted(int)}. With the getter we can access it from {@link
* ApiCallContext}. Attempt number starts from 0.
*/
public int getAttempt() {
return attempt;
}
}
Expand Up @@ -24,12 +24,15 @@
import java.util.ArrayList;
import java.util.List;

/** Combines multiple {@link ApiTracerFactory} into a single {@link ApiTracerFactory}. */
/**
* A Bigtable specific {@link ApiTracerFactory} that combines multiple {@link ApiTracerFactory} into
* a single one.
*/
@InternalApi("For internal use only")
public class CompositeTracerFactory extends BaseApiTracerFactory {
public class BigtableTracerFactory extends BaseApiTracerFactory {
private final List<ApiTracerFactory> apiTracerFactories;

public CompositeTracerFactory(List<ApiTracerFactory> apiTracerFactories) {
public BigtableTracerFactory(List<ApiTracerFactory> apiTracerFactories) {
this.apiTracerFactories = ImmutableList.copyOf(apiTracerFactories);
}

Expand All @@ -40,6 +43,6 @@ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType op
for (ApiTracerFactory factory : apiTracerFactories) {
children.add(factory.newTracer(parent, spanName, operationType));
}
return new CompositeTracer(children);
return new BigtableTracer(children);
}
}
@@ -0,0 +1,46 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;

/**
* A callable that injects client timestamp and current attempt number to request headers. Attempt
* number starts from 0.
*/
@InternalApi("For internal use only")
public final class StatsHeadersServerStreamingCallable<RequestT, ResponseT>
extends ServerStreamingCallable<RequestT, ResponseT> {
private final ServerStreamingCallable innerCallable;

public StatsHeadersServerStreamingCallable(ServerStreamingCallable innerCallable) {
this.innerCallable = innerCallable;
}

@Override
public void call(
RequestT request,
ResponseObserver<ResponseT> responseObserver,
ApiCallContext apiCallContext) {
ApiCallContext newCallContext =
apiCallContext.withExtraHeaders(Util.createStatsHeaders(apiCallContext));
innerCallable.call(request, responseObserver, newCallContext);
}
}
@@ -0,0 +1,43 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;

/**
* A callable that injects client timestamp and current attempt number to request headers. Attempt
* number starts from 0.
*/
@InternalApi("For internal use only")
public final class StatsHeadersUnaryCallable<RequestT, ResponseT>
extends UnaryCallable<RequestT, ResponseT> {
private final UnaryCallable innerCallable;

public StatsHeadersUnaryCallable(UnaryCallable innerCallable) {
this.innerCallable = innerCallable;
}

@Override
public ApiFuture futureCall(RequestT request, ApiCallContext apiCallContext) {
ApiCallContext newCallContext =
apiCallContext.withExtraHeaders(Util.createStatsHeaders(apiCallContext));
return innerCallable.futureCall(request, newCallContext);
}
}
Expand Up @@ -15,20 +15,33 @@
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.common.collect.ImmutableMap;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.opencensus.tags.TagValue;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.annotation.Nullable;

/** Utilities to help integrating with OpenCensus. */
class Util {
static final Metadata.Key<String> ATTEMPT_HEADER_KEY =
Metadata.Key.of("bigtable-attempt", Metadata.ASCII_STRING_MARSHALLER);
static final Metadata.Key<String> ATTEMPT_EPOCH_KEY =
Metadata.Key.of("bigtable-client-attempt-epoch-usec", Metadata.ASCII_STRING_MARSHALLER);

private static final TagValue OK_STATUS = TagValue.create(StatusCode.Code.OK.toString());

/** Convert an exception into a value that can be used as an OpenCensus tag value. */
Expand Down Expand Up @@ -71,4 +84,21 @@ static TagValue extractStatus(Future<?> future) {
}
return extractStatus(error);
}

/**
* Add attempt number and client timestamp from api call context to request headers. Attempt
* number starts from 0.
*/
static Map<String, List<String>> createStatsHeaders(ApiCallContext apiCallContext) {
ImmutableMap.Builder<String, List<String>> headers = ImmutableMap.builder();
headers.put(
ATTEMPT_EPOCH_KEY.name(),
Arrays.asList(String.valueOf(Instant.EPOCH.until(Instant.now(), ChronoUnit.MICROS))));
// This should always be true
if (apiCallContext.getTracer() instanceof BigtableTracer) {
int attemptCount = ((BigtableTracer) apiCallContext.getTracer()).getAttempt();
headers.put(ATTEMPT_HEADER_KEY.name(), Arrays.asList(String.valueOf(attemptCount)));
}
return headers.build();
}
}

0 comments on commit de3b476

Please sign in to comment.