From ca4d4a26a61446322cada72e2cf9b10e06656f8d Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Fri, 25 Jun 2021 15:17:43 +0000 Subject: [PATCH] use raw callable for bulkread rows --- .../data/v2/stub/EnhancedBigtableStub.java | 42 ++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 290f03bc27..d8b3eda0a8 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -31,7 +31,6 @@ import com.google.api.gax.retrying.RetryAlgorithm; import com.google.api.gax.retrying.RetryingExecutorWithContext; import com.google.api.gax.retrying.ScheduledRetryingExecutor; -import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.Callables; import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.RequestParamsExtractor; @@ -40,7 +39,7 @@ import com.google.api.gax.rpc.UnaryCallable; import com.google.api.gax.tracing.OpencensusTracerFactory; import com.google.api.gax.tracing.SpanName; -import com.google.api.gax.tracing.TracedBatchingContextCallable; +import com.google.api.gax.tracing.TracedBatchedContextCallable; import com.google.api.gax.tracing.TracedServerStreamingCallable; import com.google.api.gax.tracing.TracedUnaryCallable; import com.google.auth.Credentials; @@ -127,6 +126,7 @@ public class EnhancedBigtableStub implements AutoCloseable { private final ServerStreamingCallable readRowsCallable; private final UnaryCallable readRowCallable; + private final UnaryCallable> bulkReadRowsCallable; private final UnaryCallable> sampleRowKeysCallable; private final UnaryCallable mutateRowCallable; private final UnaryCallable bulkMutateRowsCallable; @@ -232,6 +232,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext readRowsCallable = createReadRowsCallable(new DefaultRowAdapter()); readRowCallable = createReadRowCallable(new DefaultRowAdapter()); + bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter()); sampleRowKeysCallable = createSampleRowKeysCallable(); mutateRowCallable = createMutateRowCallable(); bulkMutateRowsCallable = createBulkMutateRowsCallable(); @@ -393,6 +394,31 @@ public Map extract(ReadRowsRequest readRowsRequest) { return new FilterMarkerRowsCallable<>(retrying2, rowAdapter); } + /** + * @param rowAdapter + * @param + * @return + */ + private UnaryCallable> createBulkReadRowsCallable( + RowAdapter rowAdapter) { + ServerStreamingCallable readRowsCallable = + createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter); + + ServerStreamingCallable readRowsUserCallable = + new ReadRowsUserCallable<>(readRowsCallable, requestContext); + + SpanName span = getSpanName("ReadRows"); + + UnaryCallable> traced = + new TracedBatchedContextCallable<>( + readRowsUserCallable.all(), + clientContext.getDefaultCallContext(), + clientContext.getTracerFactory(), + span); + + return traced; + } + /** * Creates a callable chain to handle SampleRowKeys RPcs. The chain will: * @@ -516,7 +542,7 @@ private UnaryCallable createBulkMutateRowsCallable() { // TracedBatchingContextCallable needs to be the last in the callable chain so Batcher can pass // batchingCallContext to add batching metrics to ApiTracer. UnaryCallable batchingContextCallable = - new TracedBatchingContextCallable<>( + new TracedBatchedContextCallable<>( withHeaderTracer, clientContext.getDefaultCallContext(), clientContext.getTracerFactory(), @@ -577,17 +603,13 @@ public Batcher newMutateRowsBatcher( public Batcher newBulkReadRowsBatcher( @Nonnull Query query, @Nullable GrpcCallContext ctx) { Preconditions.checkNotNull(query, "query cannot be null"); - UnaryCallable> callable = readRowsCallable().all(); - ApiCallContext callContext = clientContext.getDefaultCallContext(); + UnaryCallable> callable = bulkReadRowsCallable; if (ctx != null) { - callContext = ctx.merge(callContext); + callable = callable.withDefaultCallContext(ctx); } - UnaryCallable> batchingContextCallable = - new TracedBatchingContextCallable( - callable, callContext, clientContext.getTracerFactory(), getSpanName("ReadRows")); return new BatcherImpl<>( settings.bulkReadRowsSettings().getBatchingDescriptor(), - batchingContextCallable, + callable, query, settings.bulkReadRowsSettings().getBatchingSettings(), clientContext.getExecutor());