newBulkReadRowsBatcher(
String tableId, @Nullable Filters.Filter filter) {
+ return newBulkReadRowsBatcher(tableId, filter, null);
+ }
+
+ /**
+ * Reads rows for given tableId and filter criteria in a batch. If the row does not exist, the
+ * value will be null. The returned Batcher instance is not threadsafe, it can only be used from a
+ * single thread. This method allows customization of the underlying RPCs by passing in a {@link
+ * com.google.api.gax.grpc.GrpcCallContext}. The same context will be reused for all batches. This
+ * can be used to customize things like per attempt timeouts.
+ *
+ * Performance notice: The ReadRows protocol requires that rows are sent in ascending key
+ * order, which means that the keys are processed sequentially on the server-side, so batching
+ * allows improving throughput but not latency. Lower latencies can be achieved by sending smaller
+ * requests concurrently.
+ *
+ *
Sample Code:
+ *
+ *
{@code
+ * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
+ *
+ * // Build the filter expression
+ * Filter filter = FILTERS.chain()
+ * .filter(FILTERS.key().regex("prefix.*"))
+ * .filter(FILTERS.limit().cellsPerRow(10));
+ *
+ * List> rows = new ArrayList<>();
+ *
+ * try (Batcher batcher = bigtableDataClient.newBulkReadRowsBatcher(
+ * "[TABLE]", filter, GrpcCallContext.createDefault().withTimeout(Duration.ofSeconds(10)))) {
+ * for (String someValue : someCollection) {
+ * ApiFuture rowFuture =
+ * batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
+ * rows.add(rowFuture);
+ * }
+ *
+ * // [Optional] Sends collected elements for batching asynchronously.
+ * batcher.sendOutstanding();
+ *
+ * // [Optional] Invokes sendOutstanding() and awaits until all pending entries are resolved.
+ * batcher.flush();
+ * }
+ * // batcher.close() invokes `flush()` which will in turn invoke `sendOutstanding()` with await for
+ * pending batches until its resolved.
+ *
+ * List actualRows = ApiFutures.allAsList(rows).get();
+ * }
+ * }
+ */
+ public Batcher newBulkReadRowsBatcher(
+ String tableId, @Nullable Filters.Filter filter, @Nullable GrpcCallContext ctx) {
Query query = Query.create(tableId);
if (filter != null) {
- query = query.filter(filter);
+ query.filter(filter);
}
- return stub.newBulkReadRowsBatcher(query);
+ return stub.newBulkReadRowsBatcher(query, ctx);
}
/**
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
index c08f0aec1..62619f5bb 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
@@ -23,6 +23,7 @@
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.grpc.GaxGrpcProperties;
+import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcRawCallableFactory;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
@@ -97,6 +98,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
/**
* The core client that converts method calls to RPCs.
@@ -531,10 +533,15 @@ private UnaryCallable createBulkMutateRowsCallable() {
* Split the responses using {@link MutateRowsBatchingDescriptor}.
*
*/
- public Batcher newMutateRowsBatcher(@Nonnull String tableId) {
+ public Batcher newMutateRowsBatcher(
+ @Nonnull String tableId, @Nullable GrpcCallContext ctx) {
+ UnaryCallable callable = this.bulkMutateRowsCallable;
+ if (ctx != null) {
+ callable = callable.withDefaultCallContext(ctx);
+ }
return new BatcherImpl<>(
settings.bulkMutateRowsSettings().getBatchingDescriptor(),
- bulkMutateRowsCallable,
+ callable,
BulkMutation.create(tableId),
settings.bulkMutateRowsSettings().getBatchingSettings(),
clientContext.getExecutor(),
@@ -556,11 +563,16 @@ public Batcher newMutateRowsBatcher(@Nonnull String tabl
* Split the responses using {@link ReadRowsBatchingDescriptor}.
*
*/
- public Batcher newBulkReadRowsBatcher(@Nonnull Query query) {
+ public Batcher newBulkReadRowsBatcher(
+ @Nonnull Query query, @Nullable GrpcCallContext ctx) {
Preconditions.checkNotNull(query, "query cannot be null");
+ UnaryCallable> callable = readRowsCallable().all();
+ if (ctx != null) {
+ callable = callable.withDefaultCallContext(ctx);
+ }
return new BatcherImpl<>(
settings.bulkReadRowsSettings().getBatchingDescriptor(),
- readRowsCallable().all(),
+ callable,
query,
settings.bulkReadRowsSettings().getBatchingSettings(),
clientContext.getExecutor());
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java
index e85270f61..de2bf6224 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java
@@ -176,7 +176,8 @@ public Void call() {
// Configure the deadline
ApiCallContext currentCallContext = callContext;
- if (!externalFuture.getAttemptSettings().getRpcTimeout().isZero()) {
+ if (currentCallContext.getTimeout() == null
+ && !externalFuture.getAttemptSettings().getRpcTimeout().isZero()) {
currentCallContext =
currentCallContext.withTimeout(externalFuture.getAttemptSettings().getRpcTimeout());
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java
index 9e733640f..949f5139b 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java
@@ -35,8 +35,11 @@
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import io.grpc.Attributes;
+import io.grpc.BindableService;
+import io.grpc.ServerInterceptor;
import io.grpc.ServerTransportFilter;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
@@ -95,7 +98,11 @@ public void transportTerminated(Attributes transportAttrs) {
terminateAttributes.add(transportAttrs);
}
};
- serviceHelper = new FakeServiceHelper(null, transportFilter, service);
+ serviceHelper =
+ new FakeServiceHelper(
+ ImmutableList.of(),
+ transportFilter,
+ ImmutableList.of(service));
port = serviceHelper.getPort();
serviceHelper.start();
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java
index c3bf52b63..67befad2a 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java
@@ -22,6 +22,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.Batcher;
+import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
@@ -80,9 +81,13 @@ public void setUp() {
Mockito.when(mockStub.bulkMutateRowsCallable()).thenReturn(mockBulkMutateRowsCallable);
Mockito.when(mockStub.checkAndMutateRowCallable()).thenReturn(mockCheckAndMutateRowCallable);
Mockito.when(mockStub.readModifyWriteRowCallable()).thenReturn(mockReadModifyWriteRowCallable);
- Mockito.when(mockStub.newMutateRowsBatcher(Mockito.any(String.class)))
+ Mockito.when(
+ mockStub.newMutateRowsBatcher(
+ Mockito.any(String.class), Mockito.any(GrpcCallContext.class)))
.thenReturn(mockBulkMutationBatcher);
- Mockito.when(mockStub.newBulkReadRowsBatcher(Mockito.any(Query.class)))
+ Mockito.when(
+ mockStub.newBulkReadRowsBatcher(
+ Mockito.any(Query.class), Mockito.any(GrpcCallContext.class)))
.thenReturn(mockBulkReadRowsBatcher);
}
@@ -374,7 +379,8 @@ public void proxyNewBulkMutationBatcherTest() {
ApiFuture actualRes = batcher.add(request);
assertThat(actualRes).isSameInstanceAs(expectedResponse);
- Mockito.verify(mockStub).newMutateRowsBatcher(Mockito.any(String.class));
+ Mockito.verify(mockStub)
+ .newMutateRowsBatcher(Mockito.any(String.class), Mockito.any(GrpcCallContext.class));
}
@Test
@@ -390,7 +396,8 @@ public void proxyNewBulkReadRowsTest() {
ApiFuture actualResponse = batcher.add(request);
assertThat(actualResponse).isSameInstanceAs(expectedResponse);
- Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class));
+ Mockito.verify(mockStub)
+ .newBulkReadRowsBatcher(Mockito.any(Query.class), Mockito.any(GrpcCallContext.class));
}
@Test
@@ -407,7 +414,8 @@ public void proxyNewBulkReadRowsWithFilterTest() {
ApiFuture actualResponse = batcher.add(request);
assertThat(actualResponse).isSameInstanceAs(expectedResponse);
- Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class));
+ Mockito.verify(mockStub)
+ .newBulkReadRowsBatcher(Mockito.any(Query.class), Mockito.any(GrpcCallContext.class));
}
@Test
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/FakeServiceHelper.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/FakeServiceHelper.java
index 9ec5e59cb..f0dd2f880 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/FakeServiceHelper.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/FakeServiceHelper.java
@@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2;
+import com.google.common.collect.ImmutableList;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
@@ -22,6 +23,7 @@
import io.grpc.ServerTransportFilter;
import java.io.IOException;
import java.net.ServerSocket;
+import java.util.List;
/** Utility class to setup a fake grpc server on a random port. */
public class FakeServiceHelper {
@@ -29,26 +31,27 @@ public class FakeServiceHelper {
private final Server server;
public FakeServiceHelper(BindableService... services) throws IOException {
- this(null, services);
+ this(ImmutableList.of(), null, ImmutableList.copyOf(services));
}
public FakeServiceHelper(ServerInterceptor interceptor, BindableService... services)
throws IOException {
- this(interceptor, null, services);
+ this(ImmutableList.of(interceptor), null, ImmutableList.copyOf(services));
}
public FakeServiceHelper(
- ServerInterceptor interceptor,
+ List interceptors,
ServerTransportFilter transportFilter,
- BindableService... services)
+ List services)
throws IOException {
try (ServerSocket ss = new ServerSocket(0)) {
port = ss.getLocalPort();
}
ServerBuilder builder = ServerBuilder.forPort(port);
- if (interceptor != null) {
+ for (ServerInterceptor interceptor : interceptors) {
builder = builder.intercept(interceptor);
}
+
if (transportFilter != null) {
builder = builder.addTransportFilter(transportFilter);
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
index b66596fb1..8cb82359a 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java
@@ -17,14 +17,18 @@
import static com.google.common.truth.Truth.assertThat;
+import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatcherImpl;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GaxGrpcProperties;
+import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.bigtable.v2.BigtableGrpc;
+import com.google.bigtable.v2.MutateRowsRequest;
+import com.google.bigtable.v2.MutateRowsResponse;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.v2.RowSet;
@@ -36,10 +40,15 @@
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
+import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.StringValue;
+import io.grpc.BindableService;
+import io.grpc.Context;
+import io.grpc.Deadline;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
@@ -57,12 +66,14 @@
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.threeten.bp.Duration;
@RunWith(JUnit4.class)
public class EnhancedBigtableStubTest {
@@ -75,6 +86,7 @@ public class EnhancedBigtableStubTest {
FakeServiceHelper serviceHelper;
private MetadataInterceptor metadataInterceptor;
+ private ContextInterceptor contextInterceptor;
private FakeDataService fakeDataService;
private EnhancedBigtableStubSettings defaultSettings;
private EnhancedBigtableStub enhancedBigtableStub;
@@ -82,8 +94,14 @@ public class EnhancedBigtableStubTest {
@Before
public void setUp() throws IOException, IllegalAccessException, InstantiationException {
metadataInterceptor = new MetadataInterceptor();
+ contextInterceptor = new ContextInterceptor();
fakeDataService = new FakeDataService();
- serviceHelper = new FakeServiceHelper(metadataInterceptor, fakeDataService);
+
+ serviceHelper =
+ new FakeServiceHelper(
+ ImmutableList.of(contextInterceptor, metadataInterceptor),
+ null,
+ ImmutableList.of(fakeDataService));
serviceHelper.start();
defaultSettings =
@@ -255,8 +273,8 @@ public void testBulkMutationFlowControllerConfigured() throws Exception {
// Creating 2 batchers from the same stub, they should share the same FlowController and
// FlowControlEventStats
- try (BatcherImpl batcher1 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table1");
- BatcherImpl batcher2 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table2")) {
+ try (BatcherImpl batcher1 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table1", null);
+ BatcherImpl batcher2 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table2", null)) {
assertThat(batcher1.getFlowController()).isNotNull();
assertThat(batcher1.getFlowController().getFlowControlEventStats()).isNotNull();
assertThat(batcher1).isNotSameInstanceAs(batcher2);
@@ -280,8 +298,8 @@ public void testBulkMutationFlowControllerConfigured() throws Exception {
// Creating 2 batchers from different stubs, they should not share the same FlowController and
// FlowControlEventStats
- try (BatcherImpl batcher1 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table1");
- BatcherImpl batcher2 = (BatcherImpl) stub2.newMutateRowsBatcher("my-table2")) {
+ try (BatcherImpl batcher1 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table1", null);
+ BatcherImpl batcher2 = (BatcherImpl) stub2.newMutateRowsBatcher("my-table2", null)) {
assertThat(batcher1.getFlowController()).isNotNull();
assertThat(batcher1.getFlowController().getFlowControlEventStats()).isNotNull();
assertThat(batcher1.getFlowController()).isNotSameInstanceAs(batcher2.getFlowController());
@@ -298,7 +316,7 @@ public void testBulkMutationFlowControllerConfigured() throws Exception {
.build()
.getStubSettings()); ) {
- try (BatcherImpl batcher = (BatcherImpl) stub2.newMutateRowsBatcher("my-table")) {
+ try (BatcherImpl batcher = (BatcherImpl) stub2.newMutateRowsBatcher("my-table", null)) {
assertThat(batcher.getFlowController().getMaxElementCountLimit()).isEqualTo(100L);
assertThat(batcher.getFlowController().getCurrentElementCountLimit()).isEqualTo(100L);
assertThat(batcher.getFlowController().getMinElementCountLimit()).isEqualTo(100L);
@@ -306,6 +324,68 @@ public void testBulkMutationFlowControllerConfigured() throws Exception {
}
}
+ @Test
+ public void testCallContextPropagatedInMutationBatcher()
+ throws IOException, InterruptedException, ExecutionException {
+ EnhancedBigtableStubSettings settings =
+ defaultSettings
+ .toBuilder()
+ .setRefreshingChannel(true)
+ .setPrimedTableIds("table1", "table2")
+ .build();
+
+ try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings)) {
+ // clear the previous contexts
+ contextInterceptor.contexts.clear();
+
+ // Override the timeout
+ GrpcCallContext clientCtx =
+ GrpcCallContext.createDefault().withTimeout(Duration.ofMinutes(10));
+
+ // Send a batch
+ try (Batcher batcher =
+ stub.newMutateRowsBatcher("table1", clientCtx)) {
+ batcher.add(RowMutationEntry.create("key").deleteRow()).get();
+ }
+
+ // Ensure that the server got the overriden deadline
+ Context serverCtx = contextInterceptor.contexts.poll();
+ assertThat(serverCtx).isNotNull();
+ assertThat(serverCtx.getDeadline()).isAtLeast(Deadline.after(8, TimeUnit.MINUTES));
+ }
+ }
+
+ @Test
+ public void testCallContextPropagatedInReadBatcher()
+ throws IOException, InterruptedException, ExecutionException {
+ EnhancedBigtableStubSettings settings =
+ defaultSettings
+ .toBuilder()
+ .setRefreshingChannel(true)
+ .setPrimedTableIds("table1", "table2")
+ .build();
+
+ try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings)) {
+ // clear the previous contexts
+ contextInterceptor.contexts.clear();
+
+ // Override the timeout
+ GrpcCallContext clientCtx =
+ GrpcCallContext.createDefault().withTimeout(Duration.ofMinutes(10));
+
+ // Send a batch
+ try (Batcher batcher =
+ stub.newBulkReadRowsBatcher(Query.create("table1"), clientCtx)) {
+ batcher.add(ByteString.copyFromUtf8("key")).get();
+ }
+
+ // Ensure that the server got the overriden deadline
+ Context serverCtx = contextInterceptor.contexts.poll();
+ assertThat(serverCtx).isNotNull();
+ assertThat(serverCtx.getDeadline()).isAtLeast(Deadline.after(8, TimeUnit.MINUTES));
+ }
+ }
+
private static class MetadataInterceptor implements ServerInterceptor {
final BlockingQueue headers = Queues.newLinkedBlockingDeque();
@@ -319,6 +399,19 @@ public Listener interceptCall(
}
}
+ private static class ContextInterceptor implements ServerInterceptor {
+ final BlockingQueue contexts = Queues.newLinkedBlockingDeque();
+
+ @Override
+ public Listener interceptCall(
+ ServerCall serverCall,
+ Metadata metadata,
+ ServerCallHandler serverCallHandler) {
+ contexts.add(Context.current());
+ return serverCallHandler.startCall(serverCall, metadata);
+ }
+ }
+
private static class FakeDataService extends BigtableGrpc.BigtableImplBase {
final BlockingQueue requests = Queues.newLinkedBlockingDeque();
@@ -327,6 +420,17 @@ ReadRowsRequest popLastRequest() throws InterruptedException {
return requests.poll(1, TimeUnit.SECONDS);
}
+ @Override
+ public void mutateRows(
+ MutateRowsRequest request, StreamObserver responseObserver) {
+ MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder();
+ for (int i = 0; i < request.getEntriesCount(); i++) {
+ builder.addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(i).build());
+ }
+ responseObserver.onNext(builder.build());
+ responseObserver.onCompleted();
+ }
+
@Override
public void readRows(
ReadRowsRequest request, StreamObserver responseObserver) {