From 394efe459ebe34030bf1d09116eebb4ec4f311e9 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani <42969463+rahulKQL@users.noreply.github.com> Date: Mon, 11 May 2020 22:43:15 +0530 Subject: [PATCH] feat: expose new API with ReadRowsRequest in EnhancedBigtableStub (#276) * feat: expose StreamingCallable with ReadRowsRequest in EnhancedBigtableStub This commit would enable the user to target the table using absolute resource name on each read request. Currently we expose `ServerStreamingCallable`, which does not have an option to provide different `app-profile-id` on each request. * chore: reorganized createReadRows callable methods With this commit, the public createReadRowsCallable() would refer to single createReadRowsBaseCallable. --- .../data/v2/stub/EnhancedBigtableStub.java | 65 +++++--- .../v2/stub/EnhancedBigtableStubTest.java | 146 ++++++++++++++++++ 2 files changed, 190 insertions(+), 21 deletions(-) create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 4b5ede9da..58c0130e6 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.data.v2.stub; +import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; import com.google.api.gax.batching.Batcher; import com.google.api.gax.batching.BatcherImpl; @@ -148,6 +149,27 @@ private EnhancedBigtableStub( // + /** + * Creates a callable chain to handle ReadRows RPCs. The chain will: + * + * + * + *

NOTE: the caller is responsible for adding tracing & metrics. + */ + @BetaApi("This surface is stable yet it might be removed in the future.") + public ServerStreamingCallable createReadRowsRawCallable( + RowAdapter rowAdapter) { + return createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter) + .withDefaultCallContext(clientContext.getDefaultCallContext()); + } + /** * Creates a callable chain to handle streaming ReadRows RPCs. The chain will: * @@ -164,12 +186,15 @@ private EnhancedBigtableStub( */ public ServerStreamingCallable createReadRowsCallable( RowAdapter rowAdapter) { - ServerStreamingCallable readRowsCallable = + ServerStreamingCallable readRowsCallable = createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter); + ServerStreamingCallable readRowsUserCallable = + new ReadRowsUserCallable<>(readRowsCallable, requestContext); + ServerStreamingCallable traced = new TracedServerStreamingCallable<>( - readRowsCallable, + readRowsUserCallable, clientContext.getTracerFactory(), SpanName.of(TRACING_OUTER_CLIENT_NAME, "ReadRows")); @@ -199,15 +224,17 @@ public ServerStreamingCallable createReadRowsCallable( * */ public UnaryCallable createReadRowCallable(RowAdapter rowAdapter) { - UnaryCallable readRowCallable = + ServerStreamingCallable readRowsCallable = createReadRowsBaseCallable( - ServerStreamingCallSettings.newBuilder() - .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) - .setRetrySettings(settings.readRowSettings().getRetrySettings()) - .setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout()) - .build(), - rowAdapter) - .first(); + ServerStreamingCallSettings.newBuilder() + .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) + .setRetrySettings(settings.readRowSettings().getRetrySettings()) + .setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout()) + .build(), + rowAdapter); + + UnaryCallable readRowCallable = + new ReadRowsUserCallable<>(readRowsCallable, requestContext).first(); return createUserFacingUnaryCallable("ReadRow", readRowCallable); } @@ -216,19 +243,18 @@ public UnaryCallable createReadRowCallable(RowAdapter * Creates a callable chain to handle ReadRows RPCs. The chain will: * *

* *

NOTE: the caller is responsible for adding tracing & metrics. */ - private ServerStreamingCallable createReadRowsBaseCallable( - ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) { + private ServerStreamingCallable createReadRowsBaseCallable( + ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) { ServerStreamingCallable base = GrpcRawCallableFactory.createServerStreamingCallable( @@ -249,8 +275,8 @@ public Map extract(ReadRowsRequest readRowsRequest) { ServerStreamingCallable merging = new RowMergingCallable<>(base, rowAdapter); - // Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the outer - // Query -> RowT callable or the inner ReadRowsRequest -> ReadRowsResponse callable). + // Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the inner + // ReadRowsRequest -> ReadRowsResponse callable). ServerStreamingCallSettings innerSettings = ServerStreamingCallSettings.newBuilder() .setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter)) @@ -270,10 +296,7 @@ public Map extract(ReadRowsRequest readRowsRequest) { ServerStreamingCallable retrying2 = Callables.retrying(retrying1, innerSettings, clientContext); - FilterMarkerRowsCallable filtering = - new FilterMarkerRowsCallable<>(retrying2, rowAdapter); - - return new ReadRowsUserCallable<>(filtering, requestContext); + return new FilterMarkerRowsCallable<>(retrying2, rowAdapter); } /** diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java new file mode 100644 index 000000000..be2d9c2a0 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -0,0 +1,146 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.testing.InProcessServer; +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.RowSet; +import com.google.cloud.bigtable.admin.v2.internal.NameUtil; +import com.google.cloud.bigtable.data.v2.internal.RequestContext; +import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.common.collect.Queues; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.StringValue; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class EnhancedBigtableStubTest { + + private static final String PROJECT_ID = "fake-project"; + private static final String INSTANCE_ID = "fake-instance"; + private static final String FAKE_HOST_NAME = "fake-stub-host:123"; + private static final String TABLE_NAME = + NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, "fake-table"); + private static final String APP_PROFILE_ID = "app-profile-id"; + + private InProcessServer server; + private FakeDataService fakeDataService; + private EnhancedBigtableStub enhancedBigtableStub; + + @Before + public void setUp() throws IOException, IllegalAccessException, InstantiationException { + fakeDataService = new FakeDataService(); + server = new InProcessServer<>(fakeDataService, FAKE_HOST_NAME); + server.start(); + + EnhancedBigtableStubSettings enhancedBigtableStubSettings = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(PROJECT_ID) + .setInstanceId(INSTANCE_ID) + .setAppProfileId(APP_PROFILE_ID) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setEndpoint(FAKE_HOST_NAME) + .setTransportChannelProvider(LocalChannelProvider.create(FAKE_HOST_NAME)) + .build(); + + enhancedBigtableStub = EnhancedBigtableStub.create(enhancedBigtableStubSettings); + } + + @After + public void tearDown() { + server.stop(); + } + + @Test + public void testCreateReadRowsCallable() throws InterruptedException { + ServerStreamingCallable streamingCallable = + enhancedBigtableStub.createReadRowsCallable(new DefaultRowAdapter()); + + Query request = Query.create("table-id").rowKey("row-key"); + streamingCallable.call(request).iterator().next(); + ReadRowsRequest expected = + request.toProto(RequestContext.create(PROJECT_ID, INSTANCE_ID, APP_PROFILE_ID)); + assertThat(fakeDataService.popLastRequest()).isEqualTo(expected); + } + + @Test + public void testCreateReadRowsRawCallable() throws InterruptedException { + ServerStreamingCallable callable = + enhancedBigtableStub.createReadRowsRawCallable(new DefaultRowAdapter()); + + ReadRowsRequest expectedRequest = + ReadRowsRequest.newBuilder() + .setTableName(TABLE_NAME) + .setAppProfileId("app-profile-1") + .setRows(RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("test-row-key"))) + .build(); + callable.call(expectedRequest).iterator().next(); + assertThat(fakeDataService.popLastRequest()).isEqualTo(expectedRequest); + + ReadRowsRequest expectedRequest2 = + ReadRowsRequest.newBuilder() + .setTableName(TABLE_NAME) + .setAppProfileId("app-profile-2") + .build(); + callable.call(expectedRequest2).iterator().next(); + assertThat(fakeDataService.popLastRequest()).isEqualTo(expectedRequest2); + } + + private static class FakeDataService extends BigtableGrpc.BigtableImplBase { + final BlockingQueue requests = Queues.newLinkedBlockingDeque(); + + @SuppressWarnings("unchecked") + T popLastRequest() throws InterruptedException { + return (T) requests.poll(1, TimeUnit.SECONDS); + } + + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + requests.add(request); + // Dummy row for stream + responseObserver.onNext( + ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setCommitRow(true) + .setRowKey(ByteString.copyFromUtf8("a")) + .setFamilyName(StringValue.getDefaultInstance()) + .setQualifier(BytesValue.getDefaultInstance()) + .setValueSize(0)) + .build()); + responseObserver.onCompleted(); + } + } +}