Skip to content

Commit

Permalink
refactor: stop using autogenerated gapic chains (#213)
Browse files Browse the repository at this point in the history
* refactor: stop using autogenerated gapic chains

Fully compose own callable chains. This will remove the need to fiddle with StubSettings
to disable default retry behavior and will also  eliminate unnecessary gapic trace spans. This also fixes a  bug where the trace annotations for ReadRows was missing the connection id. And this will pave the way for decoupling Opencensus via the ApiTracer api

* missing dep
  • Loading branch information
igorbernstein2 committed Apr 2, 2020
1 parent f910a32 commit 712d829
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 88 deletions.
9 changes: 4 additions & 5 deletions google-cloud-bigtable/pom.xml
Expand Up @@ -67,6 +67,10 @@
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-bigtable-v2</artifactId>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-bigtable-v2</artifactId>
Expand Down Expand Up @@ -189,11 +193,6 @@
<classifier>testlib</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-bigtable-v2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-bigtable-admin-v2</artifactId>
Expand Down
Expand Up @@ -18,20 +18,33 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatcherImpl;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcRawCallableFactory;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.RequestParamsExtractor;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.api.gax.tracing.TracedUnaryCallable;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.CheckAndMutateRowRequest;
import com.google.bigtable.v2.CheckAndMutateRowResponse;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.v2.SampleRowKeysRequest;
import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
Expand All @@ -58,24 +71,24 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.tracing.WrappedTracerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import io.opencensus.stats.Stats;
import io.opencensus.stats.StatsRecorder;
import io.opencensus.tags.Tagger;
import io.opencensus.tags.Tags;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import org.threeten.bp.Duration;

/**
* The core client that converts method calls to RPCs.
*
* <p>This class consists of a set of Callable chains that represent RPC methods. There is a chain
* for each RPC method. Each chain starts with a transformation that takes a protobuf wrapper and
* terminates in a Callable from {@link GrpcBigtableStub}. This class is meant to be a semantically
* terminates in a Callable that a gax gRPC callable. This class is meant to be a semantically
* complete facade for the Bigtable data API. However it is not meant to be consumed directly,
* please use {@link com.google.cloud.bigtable.data.v2.BigtableDataClient}.
*
Expand All @@ -85,10 +98,8 @@
@InternalApi
public class EnhancedBigtableStub implements AutoCloseable {
private static final String TRACING_OUTER_CLIENT_NAME = "Bigtable";
private static final String TRACING_INNER_CLIENT_NAME = "BaseBigtable";

private final EnhancedBigtableStubSettings settings;
private final GrpcBigtableStub stub;
private final ClientContext clientContext;
private final RequestContext requestContext;

Expand All @@ -106,85 +117,22 @@ public class EnhancedBigtableStub implements AutoCloseable {

public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
throws IOException {
// Configure the base settings
BigtableStubSettings.Builder baseSettingsBuilder =
BigtableStubSettings.newBuilder()
.setTransportChannelProvider(settings.getTransportChannelProvider())
.setEndpoint(settings.getEndpoint())
.setExecutorProvider(settings.getExecutorProvider())
.setCredentialsProvider(settings.getCredentialsProvider())
.setHeaderProvider(settings.getHeaderProvider())
.setStreamWatchdogProvider(settings.getStreamWatchdogProvider())
.setStreamWatchdogCheckInterval(settings.getStreamWatchdogCheckInterval())
// Force the base stub to use a different TracerFactory
.setTracerFactory(
new WrappedTracerFactory(settings.getTracerFactory(), TRACING_INNER_CLIENT_NAME));

// ReadRow retries are handled in the overlay: disable retries in the base layer (but make
// sure to preserve the exception callable settings).
baseSettingsBuilder
.readRowsSettings()
.setSimpleTimeoutNoRetries(Duration.ofHours(2))
.setRetryableCodes(settings.readRowsSettings().getRetryableCodes())
.setIdleTimeout(Duration.ZERO);

// SampleRowKeys retries are handled in the overlay: disable retries in the base layer (but make
// sure to preserve the exception callable settings.
baseSettingsBuilder
.sampleRowKeysSettings()
.setSimpleTimeoutNoRetries(
settings.sampleRowKeysSettings().getRetrySettings().getTotalTimeout())
.setRetryableCodes(settings.sampleRowKeysSettings().getRetryableCodes());

// MutateRow: copy outer settings to the underlying GAPIC client
baseSettingsBuilder
.mutateRowSettings()
.setRetryableCodes(settings.mutateRowSettings().getRetryableCodes())
.setRetrySettings(settings.mutateRowSettings().getRetrySettings());

// MutateRows(BulkMutateRows) retries are handled in the overlay: disable retries in the base
// layer
baseSettingsBuilder
.mutateRowsSettings()
.setSimpleTimeoutNoRetries(Duration.ofMinutes(10))
.setRetryableCodes(settings.bulkMutateRowsSettings().getRetryableCodes())
.setIdleTimeout(Duration.ZERO);

// CheckAndMutateRow is a simple passthrough
baseSettingsBuilder
.checkAndMutateRowSettings()
.setRetryableCodes(settings.checkAndMutateRowSettings().getRetryableCodes())
.setRetrySettings(settings.checkAndMutateRowSettings().getRetrySettings());

// ReadModifyWriteRow is a simple passthrough
baseSettingsBuilder
.readModifyWriteRowSettings()
.setRetryableCodes(settings.readModifyWriteRowSettings().getRetryableCodes())
.setRetrySettings(settings.readModifyWriteRowSettings().getRetrySettings());

BigtableStubSettings baseSettings = baseSettingsBuilder.build();
ClientContext clientContext = ClientContext.create(baseSettings);
GrpcBigtableStub stub = new GrpcBigtableStub(baseSettings, clientContext);

// Make sure to keep the original tracer factory for the outer client.
clientContext = clientContext.toBuilder().setTracerFactory(settings.getTracerFactory()).build();
ClientContext clientContext = ClientContext.create(settings);

return new EnhancedBigtableStub(
settings, clientContext, Tags.getTagger(), Stats.getStatsRecorder(), stub);
settings, clientContext, Tags.getTagger(), Stats.getStatsRecorder());
}

@InternalApi("Visible for testing")
EnhancedBigtableStub(
private EnhancedBigtableStub(
EnhancedBigtableStubSettings settings,
ClientContext clientContext,
Tagger tagger,
StatsRecorder statsRecorder,
GrpcBigtableStub stub) {
StatsRecorder statsRecorder) {
this.settings = settings;
this.clientContext = clientContext;
this.tagger = tagger;
this.statsRecorder = statsRecorder;
this.stub = stub;
this.requestContext =
RequestContext.create(
settings.getProjectId(), settings.getInstanceId(), settings.getAppProfileId());
Expand Down Expand Up @@ -282,8 +230,22 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
private <RowT> ServerStreamingCallable<Query, RowT> createReadRowsBaseCallable(
ServerStreamingCallSettings<Query, Row> readRowsSettings, RowAdapter<RowT> rowAdapter) {

ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<ReadRowsRequest, ReadRowsResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getReadRowsMethod())
.setParamsExtractor(
new RequestParamsExtractor<ReadRowsRequest>() {
@Override
public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
return ImmutableMap.of("table_name", readRowsRequest.getTableName());
}
})
.build(),
readRowsSettings.getRetryableCodes());

ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(stub.readRowsCallable(), rowAdapter);
new RowMergingCallable<>(base, rowAdapter);

// Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the outer
// Query -> RowT callable or the inner ReadRowsRequest -> ReadRowsResponse callable).
Expand All @@ -295,10 +257,13 @@ private <RowT> ServerStreamingCallable<Query, RowT> createReadRowsBaseCallable(
.setIdleTimeout(readRowsSettings.getIdleTimeout())
.build();

ServerStreamingCallable<ReadRowsRequest, RowT> watched =
Callables.watched(merging, innerSettings, clientContext);

// Retry logic is split into 2 parts to workaround a rare edge case described in
// ReadRowsRetryCompletedCallable
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
new ReadRowsRetryCompletedCallable<>(merging);
new ReadRowsRetryCompletedCallable<>(watched);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
Expand All @@ -322,8 +287,22 @@ private <RowT> ServerStreamingCallable<Query, RowT> createReadRowsBaseCallable(
* </ul>
*/
private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> spoolable =
stub.sampleRowKeysCallable().all();
ServerStreamingCallable<SampleRowKeysRequest, SampleRowKeysResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<SampleRowKeysRequest, SampleRowKeysResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getSampleRowKeysMethod())
.setParamsExtractor(
new RequestParamsExtractor<SampleRowKeysRequest>() {
@Override
public Map<String, String> extract(
SampleRowKeysRequest sampleRowKeysRequest) {
return ImmutableMap.of("table_name", sampleRowKeysRequest.getTableName());
}
})
.build(),
settings.sampleRowKeysSettings().getRetryableCodes());

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> spoolable = base.all();

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(spoolable, settings.sampleRowKeysSettings(), clientContext);
Expand All @@ -341,8 +320,25 @@ private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
* </ul>
*/
private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
UnaryCallable<MutateRowRequest, MutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<MutateRowRequest, MutateRowResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getMutateRowMethod())
.setParamsExtractor(
new RequestParamsExtractor<MutateRowRequest>() {
@Override
public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
return ImmutableMap.of("table_name", mutateRowRequest.getTableName());
}
})
.build(),
settings.mutateRowSettings().getRetryableCodes());

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(base, settings.mutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"MutateRow", new MutateRowCallable(stub.mutateRowCallable(), requestContext));
"MutateRow", new MutateRowCallable(retrying, requestContext));
}

/**
Expand Down Expand Up @@ -445,6 +441,20 @@ public Batcher<ByteString, Row> newBulkReadRowsBatcher(@Nonnull Query query) {
* @see MutateRowsRetryingCallable for more details
*/
private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<MutateRowsRequest, MutateRowsResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getMutateRowsMethod())
.setParamsExtractor(
new RequestParamsExtractor<MutateRowsRequest>() {
@Override
public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
return ImmutableMap.of("table_name", mutateRowsRequest.getTableName());
}
})
.build(),
settings.bulkMutateRowsSettings().getRetryableCodes());

RetryAlgorithm<Void> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
Expand All @@ -455,7 +465,7 @@ private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {

return new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
stub.mutateRowsCallable(),
base,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes());
}
Expand All @@ -470,9 +480,27 @@ private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
* </ul>
*/
private UnaryCallable<ConditionalRowMutation, Boolean> createCheckAndMutateRowCallable() {
UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<CheckAndMutateRowRequest, CheckAndMutateRowResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getCheckAndMutateRowMethod())
.setParamsExtractor(
new RequestParamsExtractor<CheckAndMutateRowRequest>() {
@Override
public Map<String, String> extract(
CheckAndMutateRowRequest checkAndMutateRowRequest) {
return ImmutableMap.of(
"table_name", checkAndMutateRowRequest.getTableName());
}
})
.build(),
settings.checkAndMutateRowSettings().getRetryableCodes());

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(base, settings.checkAndMutateRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"CheckAndMutateRow",
new CheckAndMutateRowCallable(stub.checkAndMutateRowCallable(), requestContext));
"CheckAndMutateRow", new CheckAndMutateRowCallable(retrying, requestContext));
}

/**
Expand All @@ -486,9 +514,25 @@ private UnaryCallable<ConditionalRowMutation, Boolean> createCheckAndMutateRowCa
* </ul>
*/
private UnaryCallable<ReadModifyWriteRow, Row> createReadModifyWriteRowCallable() {
UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
GrpcCallSettings.<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getReadModifyWriteRowMethod())
.setParamsExtractor(
new RequestParamsExtractor<ReadModifyWriteRowRequest>() {
@Override
public Map<String, String> extract(ReadModifyWriteRowRequest request) {
return ImmutableMap.of("table_name", request.getTableName());
}
})
.build(),
settings.readModifyWriteRowSettings().getRetryableCodes());

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(base, settings.readModifyWriteRowSettings(), clientContext);

return createUserFacingUnaryCallable(
"ReadModifyWriteRow",
new ReadModifyWriteRowCallable(stub.readModifyWriteRowCallable(), requestContext));
"ReadModifyWriteRow", new ReadModifyWriteRowCallable(retrying, requestContext));
}

/**
Expand Down Expand Up @@ -562,6 +606,8 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {

@Override
public void close() {
stub.close();
for (BackgroundResource backgroundResource : clientContext.getBackgroundResources()) {
backgroundResource.shutdown();
}
}
}

0 comments on commit 712d829

Please sign in to comment.