Skip to content

Commit

Permalink
use raw callable for bulkread rows
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Jun 25, 2021
1 parent c307ef8 commit ca4d4a2
Showing 1 changed file with 32 additions and 10 deletions.
Expand Up @@ -31,7 +31,6 @@
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.RequestParamsExtractor;
Expand All @@ -40,7 +39,7 @@
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.OpencensusTracerFactory;
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedBatchingContextCallable;
import com.google.api.gax.tracing.TracedBatchedContextCallable;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.api.gax.tracing.TracedUnaryCallable;
import com.google.auth.Credentials;
Expand Down Expand Up @@ -127,6 +126,7 @@ public class EnhancedBigtableStub implements AutoCloseable {

private final ServerStreamingCallable<Query, Row> readRowsCallable;
private final UnaryCallable<Query, Row> readRowCallable;
private final UnaryCallable<Query, List<Row>> bulkReadRowsCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
Expand Down Expand Up @@ -232,6 +232,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext

readRowsCallable = createReadRowsCallable(new DefaultRowAdapter());
readRowCallable = createReadRowCallable(new DefaultRowAdapter());
bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter());
sampleRowKeysCallable = createSampleRowKeysCallable();
mutateRowCallable = createMutateRowCallable();
bulkMutateRowsCallable = createBulkMutateRowsCallable();
Expand Down Expand Up @@ -393,6 +394,31 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}

/**
* @param rowAdapter
* @param <RowT>
* @return
*/
private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
RowAdapter<RowT> rowAdapter) {
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter);

ServerStreamingCallable<Query, RowT> readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

SpanName span = getSpanName("ReadRows");

UnaryCallable<Query, List<RowT>> traced =
new TracedBatchedContextCallable<>(
readRowsUserCallable.all(),
clientContext.getDefaultCallContext(),
clientContext.getTracerFactory(),
span);

return traced;
}

/**
* Creates a callable chain to handle SampleRowKeys RPcs. The chain will:
*
Expand Down Expand Up @@ -516,7 +542,7 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
// TracedBatchingContextCallable needs to be the last in the callable chain so Batcher can pass
// batchingCallContext to add batching metrics to ApiTracer.
UnaryCallable<BulkMutation, Void> batchingContextCallable =
new TracedBatchingContextCallable<>(
new TracedBatchedContextCallable<>(
withHeaderTracer,
clientContext.getDefaultCallContext(),
clientContext.getTracerFactory(),
Expand Down Expand Up @@ -577,17 +603,13 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
@Nonnull Query query, @Nullable GrpcCallContext ctx) {
Preconditions.checkNotNull(query, "query cannot be null");
UnaryCallable<Query, List<Row>> callable = readRowsCallable().all();
ApiCallContext callContext = clientContext.getDefaultCallContext();
UnaryCallable<Query, List<Row>> callable = bulkReadRowsCallable;
if (ctx != null) {
callContext = ctx.merge(callContext);
callable = callable.withDefaultCallContext(ctx);
}
UnaryCallable<Query, List<Row>> batchingContextCallable =
new TracedBatchingContextCallable(
callable, callContext, clientContext.getTracerFactory(), getSpanName("ReadRows"));
return new BatcherImpl<>(
settings.bulkReadRowsSettings().getBatchingDescriptor(),
batchingContextCallable,
callable,
query,
settings.bulkReadRowsSettings().getBatchingSettings(),
clientContext.getExecutor());
Expand Down

0 comments on commit ca4d4a2

Please sign in to comment.