Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: expose new API with ReadRowsRequest in EnhancedBigtableStub (#276)
* feat: expose StreamingCallable with ReadRowsRequest in EnhancedBigtableStub

This commit would enable the user to target the table using absolute resource name on each read request. Currently we expose `ServerStreamingCallable<Query, RowT>`, which does not have an option to provide different `app-profile-id` on each request.

* chore: reorganized createReadRows callable methods

With this commit, the public createReadRowsCallable() would refer to single createReadRowsBaseCallable.
  • Loading branch information
rahulKQL committed May 11, 2020
1 parent 4374492 commit 394efe4
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 21 deletions.
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatcherImpl;
Expand Down Expand Up @@ -148,6 +149,27 @@ private EnhancedBigtableStub(

// <editor-fold desc="Callable creators">

/**
* Creates a callable chain to handle ReadRows RPCs. The chain will:
*
* <ul>
* <li>Dispatch the RPC with {@link ReadRowsRequest}.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured by the {@code rowAdapter} parameter.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
*
* <p>NOTE: the caller is responsible for adding tracing & metrics.
*/
@BetaApi("This surface is stable yet it might be removed in the future.")
public <RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRowsRawCallable(
RowAdapter<RowT> rowAdapter) {
return createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter)
.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
* Creates a callable chain to handle streaming ReadRows RPCs. The chain will:
*
Expand All @@ -164,12 +186,15 @@ private EnhancedBigtableStub(
*/
public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
RowAdapter<RowT> rowAdapter) {
ServerStreamingCallable<Query, RowT> readRowsCallable =
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(settings.readRowsSettings(), rowAdapter);

ServerStreamingCallable<Query, RowT> readRowsUserCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext);

ServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable<>(
readRowsCallable,
readRowsUserCallable,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, "ReadRows"));

Expand Down Expand Up @@ -199,15 +224,17 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
* </ul>
*/
public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT> rowAdapter) {
UnaryCallable<Query, RowT> readRowCallable =
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(
ServerStreamingCallSettings.<Query, Row>newBuilder()
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
.setRetrySettings(settings.readRowSettings().getRetrySettings())
.setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout())
.build(),
rowAdapter)
.first();
ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
.setRetrySettings(settings.readRowSettings().getRetrySettings())
.setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout())
.build(),
rowAdapter);

UnaryCallable<Query, RowT> readRowCallable =
new ReadRowsUserCallable<>(readRowsCallable, requestContext).first();

return createUserFacingUnaryCallable("ReadRow", readRowCallable);
}
Expand All @@ -216,19 +243,18 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
* Creates a callable chain to handle ReadRows RPCs. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and
* dispatch the RPC.
* <li>Dispatch the RPC with {@link ReadRowsRequest}.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured in by the {@code rowAdapter} parameter.
* implementation can be configured by the {@code rowAdapter} parameter.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
*
* <p>NOTE: the caller is responsible for adding tracing & metrics.
*/
private <RowT> ServerStreamingCallable<Query, RowT> createReadRowsBaseCallable(
ServerStreamingCallSettings<Query, Row> readRowsSettings, RowAdapter<RowT> rowAdapter) {
private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRowsBaseCallable(
ServerStreamingCallSettings<ReqT, Row> readRowsSettings, RowAdapter<RowT> rowAdapter) {

ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
Expand All @@ -249,8 +275,8 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(base, rowAdapter);

// Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the outer
// Query -> RowT callable or the inner ReadRowsRequest -> ReadRowsResponse callable).
// Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the inner
// ReadRowsRequest -> ReadRowsResponse callable).
ServerStreamingCallSettings<ReadRowsRequest, RowT> innerSettings =
ServerStreamingCallSettings.<ReadRowsRequest, RowT>newBuilder()
.setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter))
Expand All @@ -270,10 +296,7 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);

FilterMarkerRowsCallable<RowT> filtering =
new FilterMarkerRowsCallable<>(retrying2, rowAdapter);

return new ReadRowsUserCallable<>(filtering, requestContext);
return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}

/**
Expand Down
@@ -0,0 +1,146 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.testing.InProcessServer;
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.v2.RowSet;
import com.google.cloud.bigtable.admin.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.common.collect.Queues;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.StringValue;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class EnhancedBigtableStubTest {

private static final String PROJECT_ID = "fake-project";
private static final String INSTANCE_ID = "fake-instance";
private static final String FAKE_HOST_NAME = "fake-stub-host:123";
private static final String TABLE_NAME =
NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, "fake-table");
private static final String APP_PROFILE_ID = "app-profile-id";

private InProcessServer<?> server;
private FakeDataService fakeDataService;
private EnhancedBigtableStub enhancedBigtableStub;

@Before
public void setUp() throws IOException, IllegalAccessException, InstantiationException {
fakeDataService = new FakeDataService();
server = new InProcessServer<>(fakeDataService, FAKE_HOST_NAME);
server.start();

EnhancedBigtableStubSettings enhancedBigtableStubSettings =
EnhancedBigtableStubSettings.newBuilder()
.setProjectId(PROJECT_ID)
.setInstanceId(INSTANCE_ID)
.setAppProfileId(APP_PROFILE_ID)
.setCredentialsProvider(NoCredentialsProvider.create())
.setEndpoint(FAKE_HOST_NAME)
.setTransportChannelProvider(LocalChannelProvider.create(FAKE_HOST_NAME))
.build();

enhancedBigtableStub = EnhancedBigtableStub.create(enhancedBigtableStubSettings);
}

@After
public void tearDown() {
server.stop();
}

@Test
public void testCreateReadRowsCallable() throws InterruptedException {
ServerStreamingCallable<Query, Row> streamingCallable =
enhancedBigtableStub.createReadRowsCallable(new DefaultRowAdapter());

Query request = Query.create("table-id").rowKey("row-key");
streamingCallable.call(request).iterator().next();
ReadRowsRequest expected =
request.toProto(RequestContext.create(PROJECT_ID, INSTANCE_ID, APP_PROFILE_ID));
assertThat(fakeDataService.popLastRequest()).isEqualTo(expected);
}

@Test
public void testCreateReadRowsRawCallable() throws InterruptedException {
ServerStreamingCallable<ReadRowsRequest, Row> callable =
enhancedBigtableStub.createReadRowsRawCallable(new DefaultRowAdapter());

ReadRowsRequest expectedRequest =
ReadRowsRequest.newBuilder()
.setTableName(TABLE_NAME)
.setAppProfileId("app-profile-1")
.setRows(RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("test-row-key")))
.build();
callable.call(expectedRequest).iterator().next();
assertThat(fakeDataService.popLastRequest()).isEqualTo(expectedRequest);

ReadRowsRequest expectedRequest2 =
ReadRowsRequest.newBuilder()
.setTableName(TABLE_NAME)
.setAppProfileId("app-profile-2")
.build();
callable.call(expectedRequest2).iterator().next();
assertThat(fakeDataService.popLastRequest()).isEqualTo(expectedRequest2);
}

private static class FakeDataService extends BigtableGrpc.BigtableImplBase {
final BlockingQueue<Object> requests = Queues.newLinkedBlockingDeque();

@SuppressWarnings("unchecked")
<T> T popLastRequest() throws InterruptedException {
return (T) requests.poll(1, TimeUnit.SECONDS);
}

@Override
public void readRows(
ReadRowsRequest request, StreamObserver<ReadRowsResponse> responseObserver) {
requests.add(request);
// Dummy row for stream
responseObserver.onNext(
ReadRowsResponse.newBuilder()
.addChunks(
ReadRowsResponse.CellChunk.newBuilder()
.setCommitRow(true)
.setRowKey(ByteString.copyFromUtf8("a"))
.setFamilyName(StringValue.getDefaultInstance())
.setQualifier(BytesValue.getDefaultInstance())
.setValueSize(0))
.build());
responseObserver.onCompleted();
}
}
}

0 comments on commit 394efe4

Please sign in to comment.