From fb025ae15fdc87c17c4d358a34dc4abcf4e82c63 Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Thu, 30 Apr 2020 19:31:28 +0530 Subject: [PATCH 1/2] feat: expose StreamingCallable with ReadRowsRequest in EnhancedBigtableStub This commit would enable the user to target the table using absolute resource name on each read request. Currently we expose `ServerStreamingCallable`, which does not have an option to provide different `app-profile-id` on each request. --- .../data/v2/stub/EnhancedBigtableStub.java | 67 +++++++- .../v2/stub/EnhancedBigtableStubTest.java | 159 ++++++++++++++++++ 2 files changed, 221 insertions(+), 5 deletions(-) create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 5f621654a..28addc89d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -220,7 +220,7 @@ public UnaryCallable createReadRowCallable(RowAdapter * dispatch the RPC. *
  • Upon receiving the response stream, it will merge the {@link * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row - * implementation can be configured in by the {@code rowAdapter} parameter. + * implementation can be configured by the {@code rowAdapter} parameter. *
  • Retry/resume on failure. *
  • Filter out marker rows. * @@ -230,6 +230,66 @@ public UnaryCallable createReadRowCallable(RowAdapter private ServerStreamingCallable createReadRowsBaseCallable( ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) { + return new ReadRowsUserCallable<>( + createReadRowsRawCallable(readRowsSettings, rowAdapter), requestContext); + } + + /** + * Creates a callable chain to handle ReadRows RPCs. The chain will: + * + *
      + *
    • Dispatch the RPC with {@link ReadRowsRequest}. + *
    • Upon receiving the response stream, it will merge the {@link + * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row + * implementation can be configured by the {@code rowAdapter} parameter. + *
    • Retry/resume on failure. + *
    • Filter out marker rows. + *
    + * + *

    NOTE: the caller is responsible for adding tracing & metrics. + */ + public ServerStreamingCallable createReadRowsRawCallable( + RowAdapter adapter) { + return createReadRowsBaseCallable(adapter) + .withDefaultCallContext(clientContext.getDefaultCallContext()); + } + + /** + * Creates a callable chain to handle ReadRows RPCs. The chain will: + * + *

      + *
    • Dispatch the RPC with {@link ReadRowsRequest}. + *
    • Upon receiving the response stream, it will merge the {@link + * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row + * implementation can be configured by the {@code rowAdapter} parameter. + *
    • Retry/resume on failure. + *
    • Filter out marker rows. + *
    + * + *

    NOTE: the caller is responsible for adding tracing & metrics. + */ + public ServerStreamingCallable createReadRowsBaseCallable( + RowAdapter adapter) { + return createReadRowsRawCallable(settings.readRowsSettings(), adapter); + } + + /** + * Creates a callable chain to handle ReadRows RPCs. The chain will: + * + *

      + *
    • Dispatch the RPC with {@link ReadRowsRequest}. + *
    • Upon receiving the response stream, it will merge the {@link + * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row + * implementation can be configured by the {@code rowAdapter} parameter. + *
    • Retry/resume on failure. + *
    • Filter out marker rows. + *
    + * + *

    NOTE: the caller is responsible for adding tracing & metrics. + */ + private ServerStreamingCallable createReadRowsRawCallable( + ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) { + ServerStreamingCallable base = GrpcRawCallableFactory.createServerStreamingCallable( GrpcCallSettings.newBuilder() @@ -268,10 +328,7 @@ public Map extract(ReadRowsRequest readRowsRequest) { ServerStreamingCallable retrying2 = Callables.retrying(retrying1, innerSettings, clientContext); - FilterMarkerRowsCallable filtering = - new FilterMarkerRowsCallable<>(retrying2, rowAdapter); - - return new ReadRowsUserCallable<>(filtering, requestContext); + return new FilterMarkerRowsCallable<>(retrying2, rowAdapter); } /** diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java new file mode 100644 index 000000000..0c60fc67b --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -0,0 +1,159 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.testing.InProcessServer; +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.RowSet; +import com.google.cloud.bigtable.admin.v2.internal.NameUtil; +import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter; +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.common.collect.Queues; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.StringValue; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class EnhancedBigtableStubTest { + + private static final String PROJECT_ID = "fake-project"; + private static final String INSTANCE_ID = "fake-instance"; + private static final String TABLE_NAME = + NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, "fake-table"); + private static final String FAKE_HOST_NAME = "fake-stub-host:123"; + + private InProcessServer server; + private FakeDataService fakeDataService; + private EnhancedBigtableStub enhancedBigtableStub; + private EnhancedBigtableStubSettings enhancedBigtableStubSettings; + + @Before + public void setUp() throws IOException, IllegalAccessException, InstantiationException { + fakeDataService = new FakeDataService(); + server = new InProcessServer<>(fakeDataService, FAKE_HOST_NAME); + server.start(); + + enhancedBigtableStubSettings = + EnhancedBigtableStubSettings.newBuilder() + .setProjectId(PROJECT_ID) + .setInstanceId(INSTANCE_ID) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setEndpoint(FAKE_HOST_NAME) + .setTransportChannelProvider(LocalChannelProvider.create(FAKE_HOST_NAME)) + .build(); + + enhancedBigtableStub = EnhancedBigtableStub.create(enhancedBigtableStubSettings); + } + + @After + public void tearDown() { + server.stop(); + } + + @Test + public void testCreateReadRowsBaseCallable() throws InterruptedException, IOException { + ServerStreamingCallable callable = + enhancedBigtableStub.createReadRowsBaseCallable(new DefaultRowAdapter()); + + ApiCallContext context = + ClientContext.create(enhancedBigtableStubSettings).getDefaultCallContext(); + + ReadRowsRequest expectedRequest = + ReadRowsRequest.newBuilder() + .setTableName(TABLE_NAME) + .setAppProfileId("app-profile-1") + .setRows(RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("test-row-key"))) + .build(); + callable.call(expectedRequest, context).iterator().next(); + assertThat(fakeDataService.popLastRequest()).isEqualTo(expectedRequest); + + ReadRowsRequest expectedRequest2 = + ReadRowsRequest.newBuilder() + .setTableName(TABLE_NAME) + .setAppProfileId("app-profile-2") + .build(); + callable.call(expectedRequest2, context).iterator().next(); + assertThat(fakeDataService.popLastRequest()).isEqualTo(expectedRequest2); + } + + @Test + public void testCreateReadRowsRawCallable() throws InterruptedException { + ServerStreamingCallable callable = + enhancedBigtableStub.createReadRowsRawCallable(new DefaultRowAdapter()); + + ReadRowsRequest expectedRequest = + ReadRowsRequest.newBuilder() + .setTableName(TABLE_NAME) + .setAppProfileId("app-profile-1") + .setRows(RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("test-row-key"))) + .build(); + callable.call(expectedRequest).iterator().next(); + assertThat(fakeDataService.popLastRequest()).isEqualTo(expectedRequest); + + ReadRowsRequest expectedRequest2 = + ReadRowsRequest.newBuilder() + .setTableName(TABLE_NAME) + .setAppProfileId("app-profile-2") + .build(); + callable.call(expectedRequest2).iterator().next(); + assertThat(fakeDataService.popLastRequest()).isEqualTo(expectedRequest2); + } + + private static class FakeDataService extends BigtableGrpc.BigtableImplBase { + final BlockingQueue requests = Queues.newLinkedBlockingDeque(); + + @SuppressWarnings("unchecked") + T popLastRequest() throws InterruptedException { + return (T) requests.poll(1, TimeUnit.SECONDS); + } + + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + requests.add(request); + // Dummy row for stream + responseObserver.onNext( + ReadRowsResponse.newBuilder() + .addChunks( + ReadRowsResponse.CellChunk.newBuilder() + .setCommitRow(true) + .setRowKey(ByteString.copyFromUtf8("a")) + .setFamilyName(StringValue.getDefaultInstance()) + .setQualifier(BytesValue.getDefaultInstance()) + .setValueSize(0)) + .build()); + responseObserver.onCompleted(); + } + } +} From 5d32c60f5a6320b7ab696f232965341c1d6b6cbb Mon Sep 17 00:00:00 2001 From: Rahul Kesharwani Date: Mon, 11 May 2020 20:16:40 +0530 Subject: [PATCH 2/2] chore: reorganized createReadRows callable methods With this commit, the public createReadRowsCallable() would refer to single createReadRowsBaseCallable. --- .../data/v2/stub/EnhancedBigtableStub.java | 114 ++++++------------ .../v2/stub/EnhancedBigtableStubTest.java | 43 +++---- 2 files changed, 55 insertions(+), 102 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 28addc89d..033a9fb07 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.data.v2.stub; +import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; import com.google.api.gax.batching.Batcher; import com.google.api.gax.batching.BatcherImpl; @@ -148,6 +149,27 @@ private EnhancedBigtableStub( // + /** + * Creates a callable chain to handle ReadRows RPCs. The chain will: + * + *
      + *
    • Dispatch the RPC with {@link ReadRowsRequest}. + *
    • Upon receiving the response stream, it will merge the {@link + * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row + * implementation can be configured by the {@code rowAdapter} parameter. + *
    • Retry/resume on failure. + *
    • Filter out marker rows. + *
    + * + *

    NOTE: the caller is responsible for adding tracing & metrics. + */ + @BetaApi("This surface is stable yet it might be removed in the future.") + public ServerStreamingCallable createReadRowsRawCallable( + RowAdapter rowAdapter) { + return createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter) + .withDefaultCallContext(clientContext.getDefaultCallContext()); + } + /** * Creates a callable chain to handle streaming ReadRows RPCs. The chain will: * @@ -164,12 +186,15 @@ private EnhancedBigtableStub( */ public ServerStreamingCallable createReadRowsCallable( RowAdapter rowAdapter) { - ServerStreamingCallable readRowsCallable = + ServerStreamingCallable readRowsCallable = createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter); + ServerStreamingCallable readRowsUserCallable = + new ReadRowsUserCallable<>(readRowsCallable, requestContext); + ServerStreamingCallable traced = new TracedServerStreamingCallable<>( - readRowsCallable, + readRowsUserCallable, clientContext.getTracerFactory(), SpanName.of(TRACING_OUTER_CLIENT_NAME, "ReadRows")); @@ -199,78 +224,19 @@ public ServerStreamingCallable createReadRowsCallable( * */ public UnaryCallable createReadRowCallable(RowAdapter rowAdapter) { - UnaryCallable readRowCallable = + ServerStreamingCallable readRowsCallable = createReadRowsBaseCallable( - ServerStreamingCallSettings.newBuilder() - .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) - .setRetrySettings(settings.readRowSettings().getRetrySettings()) - .setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout()) - .build(), - rowAdapter) - .first(); - - return createUserFacingUnaryCallable("ReadRow", readRowCallable); - } - - /** - * Creates a callable chain to handle ReadRows RPCs. The chain will: - * - *

      - *
    • Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and - * dispatch the RPC. - *
    • Upon receiving the response stream, it will merge the {@link - * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row - * implementation can be configured by the {@code rowAdapter} parameter. - *
    • Retry/resume on failure. - *
    • Filter out marker rows. - *
    - * - *

    NOTE: the caller is responsible for adding tracing & metrics. - */ - private ServerStreamingCallable createReadRowsBaseCallable( - ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) { - - return new ReadRowsUserCallable<>( - createReadRowsRawCallable(readRowsSettings, rowAdapter), requestContext); - } + ServerStreamingCallSettings.newBuilder() + .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) + .setRetrySettings(settings.readRowSettings().getRetrySettings()) + .setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout()) + .build(), + rowAdapter); - /** - * Creates a callable chain to handle ReadRows RPCs. The chain will: - * - *

      - *
    • Dispatch the RPC with {@link ReadRowsRequest}. - *
    • Upon receiving the response stream, it will merge the {@link - * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row - * implementation can be configured by the {@code rowAdapter} parameter. - *
    • Retry/resume on failure. - *
    • Filter out marker rows. - *
    - * - *

    NOTE: the caller is responsible for adding tracing & metrics. - */ - public ServerStreamingCallable createReadRowsRawCallable( - RowAdapter adapter) { - return createReadRowsBaseCallable(adapter) - .withDefaultCallContext(clientContext.getDefaultCallContext()); - } + UnaryCallable readRowCallable = + new ReadRowsUserCallable<>(readRowsCallable, requestContext).first(); - /** - * Creates a callable chain to handle ReadRows RPCs. The chain will: - * - *

      - *
    • Dispatch the RPC with {@link ReadRowsRequest}. - *
    • Upon receiving the response stream, it will merge the {@link - * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row - * implementation can be configured by the {@code rowAdapter} parameter. - *
    • Retry/resume on failure. - *
    • Filter out marker rows. - *
    - * - *

    NOTE: the caller is responsible for adding tracing & metrics. - */ - public ServerStreamingCallable createReadRowsBaseCallable( - RowAdapter adapter) { - return createReadRowsRawCallable(settings.readRowsSettings(), adapter); + return createUserFacingUnaryCallable("ReadRow", readRowCallable); } /** @@ -287,7 +253,7 @@ public ServerStreamingCallable createReadRowsBaseC * *

    NOTE: the caller is responsible for adding tracing & metrics. */ - private ServerStreamingCallable createReadRowsRawCallable( + private ServerStreamingCallable createReadRowsBaseCallable( ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) { ServerStreamingCallable base = @@ -307,8 +273,8 @@ public Map extract(ReadRowsRequest readRowsRequest) { ServerStreamingCallable merging = new RowMergingCallable<>(base, rowAdapter); - // Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the outer - // Query -> RowT callable or the inner ReadRowsRequest -> ReadRowsResponse callable). + // Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the inner + // ReadRowsRequest -> ReadRowsResponse callable). ServerStreamingCallSettings innerSettings = ServerStreamingCallSettings.newBuilder() .setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter)) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index 0c60fc67b..be2d9c2a0 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -20,15 +20,15 @@ import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.testing.InProcessServer; import com.google.api.gax.grpc.testing.LocalChannelProvider; -import com.google.api.gax.rpc.ApiCallContext; -import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.RowSet; import com.google.cloud.bigtable.admin.v2.internal.NameUtil; +import com.google.cloud.bigtable.data.v2.internal.RequestContext; import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter; +import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.common.collect.Queues; import com.google.protobuf.ByteString; @@ -49,14 +49,14 @@ public class EnhancedBigtableStubTest { private static final String PROJECT_ID = "fake-project"; private static final String INSTANCE_ID = "fake-instance"; + private static final String FAKE_HOST_NAME = "fake-stub-host:123"; private static final String TABLE_NAME = NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, "fake-table"); - private static final String FAKE_HOST_NAME = "fake-stub-host:123"; + private static final String APP_PROFILE_ID = "app-profile-id"; private InProcessServer server; private FakeDataService fakeDataService; private EnhancedBigtableStub enhancedBigtableStub; - private EnhancedBigtableStubSettings enhancedBigtableStubSettings; @Before public void setUp() throws IOException, IllegalAccessException, InstantiationException { @@ -64,10 +64,11 @@ public void setUp() throws IOException, IllegalAccessException, InstantiationExc server = new InProcessServer<>(fakeDataService, FAKE_HOST_NAME); server.start(); - enhancedBigtableStubSettings = + EnhancedBigtableStubSettings enhancedBigtableStubSettings = EnhancedBigtableStubSettings.newBuilder() .setProjectId(PROJECT_ID) .setInstanceId(INSTANCE_ID) + .setAppProfileId(APP_PROFILE_ID) .setCredentialsProvider(NoCredentialsProvider.create()) .setEndpoint(FAKE_HOST_NAME) .setTransportChannelProvider(LocalChannelProvider.create(FAKE_HOST_NAME)) @@ -82,29 +83,15 @@ public void tearDown() { } @Test - public void testCreateReadRowsBaseCallable() throws InterruptedException, IOException { - ServerStreamingCallable callable = - enhancedBigtableStub.createReadRowsBaseCallable(new DefaultRowAdapter()); - - ApiCallContext context = - ClientContext.create(enhancedBigtableStubSettings).getDefaultCallContext(); - - ReadRowsRequest expectedRequest = - ReadRowsRequest.newBuilder() - .setTableName(TABLE_NAME) - .setAppProfileId("app-profile-1") - .setRows(RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("test-row-key"))) - .build(); - callable.call(expectedRequest, context).iterator().next(); - assertThat(fakeDataService.popLastRequest()).isEqualTo(expectedRequest); - - ReadRowsRequest expectedRequest2 = - ReadRowsRequest.newBuilder() - .setTableName(TABLE_NAME) - .setAppProfileId("app-profile-2") - .build(); - callable.call(expectedRequest2, context).iterator().next(); - assertThat(fakeDataService.popLastRequest()).isEqualTo(expectedRequest2); + public void testCreateReadRowsCallable() throws InterruptedException { + ServerStreamingCallable streamingCallable = + enhancedBigtableStub.createReadRowsCallable(new DefaultRowAdapter()); + + Query request = Query.create("table-id").rowKey("row-key"); + streamingCallable.call(request).iterator().next(); + ReadRowsRequest expected = + request.toProto(RequestContext.create(PROJECT_ID, INSTANCE_ID, APP_PROFILE_ID)); + assertThat(fakeDataService.popLastRequest()).isEqualTo(expected); } @Test