Skip to content

Commit

Permalink
feat: call setting timeouts for batchers (#877)
Browse files Browse the repository at this point in the history
This introduces 2 new variants of new*Batcher that accept a GrpcCallContext. This context will be used for batch RPCs generated by the batcher instance.
Also fixes handlings of timeout overrides for bukmutations. If a user set a timeout, don't override it
  • Loading branch information
igorbernstein2 committed Jun 22, 2021
1 parent c2841f0 commit 0f3654d
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 25 deletions.
6 changes: 6 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Expand Up @@ -28,4 +28,10 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable</className>
</difference>
<difference>
<!-- change method args is ok because EnhancedBigtableStub is InternalApi -->
<differenceType>7004</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub</className>
<method>*</method>
</difference>
</differences>
Expand Up @@ -23,6 +23,7 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
Expand Down Expand Up @@ -1073,7 +1074,40 @@ public void bulkMutateRows(BulkMutation mutation) {
*/
@BetaApi("This surface is likely to change as the batching surface evolves.")
public Batcher<RowMutationEntry, Void> newBulkMutationBatcher(@Nonnull String tableId) {
return stub.newMutateRowsBatcher(tableId);
return newBulkMutationBatcher(tableId, null);
}

/**
* Mutates multiple rows in a batch. Each individual row is mutated atomically as in MutateRow,
* but the entire batch is not executed atomically. The returned Batcher instance is not
* threadsafe, it can only be used from single thread. This method allows customization of the
* underlying RPCs by passing in a {@link com.google.api.gax.grpc.GrpcCallContext}. The same
* context will be reused for all batches. This can be used to customize things like per attempt
* timeouts.
*
* <p>Sample Code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* try (Batcher<RowMutationEntry, Void> batcher = bigtableDataClient.newBulkMutationBatcher("[TABLE]", GrpcCallContext.createDefault().withTimeout(Duration.ofSeconds(10)))) {
* for (String someValue : someCollection) {
* ApiFuture<Void> entryFuture =
* batcher.add(
* RowMutationEntry.create("[ROW KEY]")
* .setCell("[FAMILY NAME]", "[QUALIFIER]", "[VALUE]"));
* }
*
* // Blocks until mutations are applied on all submitted row entries.
* batcher.flush();
* }
* // Before `batcher` is closed, all remaining(If any) mutations are applied.
* }
* }</pre>
*/
@BetaApi("This surface is likely to change as the batching surface evolves.")
public Batcher<RowMutationEntry, Void> newBulkMutationBatcher(
@Nonnull String tableId, @Nullable GrpcCallContext ctx) {
return stub.newMutateRowsBatcher(tableId, ctx);
}

/**
Expand Down Expand Up @@ -1159,11 +1193,61 @@ public Batcher<ByteString, Row> newBulkReadRowsBatcher(String tableId) {
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
String tableId, @Nullable Filters.Filter filter) {
return newBulkReadRowsBatcher(tableId, filter, null);
}

/**
* Reads rows for given tableId and filter criteria in a batch. If the row does not exist, the
* value will be null. The returned Batcher instance is not threadsafe, it can only be used from a
* single thread. This method allows customization of the underlying RPCs by passing in a {@link
* com.google.api.gax.grpc.GrpcCallContext}. The same context will be reused for all batches. This
* can be used to customize things like per attempt timeouts.
*
* <p>Performance notice: The ReadRows protocol requires that rows are sent in ascending key
* order, which means that the keys are processed sequentially on the server-side, so batching
* allows improving throughput but not latency. Lower latencies can be achieved by sending smaller
* requests concurrently.
*
* <p>Sample Code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
*
* // Build the filter expression
* Filter filter = FILTERS.chain()
* .filter(FILTERS.key().regex("prefix.*"))
* .filter(FILTERS.limit().cellsPerRow(10));
*
* List<ApiFuture<Row>> rows = new ArrayList<>();
*
* try (Batcher<ByteString, Row> batcher = bigtableDataClient.newBulkReadRowsBatcher(
* "[TABLE]", filter, GrpcCallContext.createDefault().withTimeout(Duration.ofSeconds(10)))) {
* for (String someValue : someCollection) {
* ApiFuture<Row> rowFuture =
* batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
* rows.add(rowFuture);
* }
*
* // [Optional] Sends collected elements for batching asynchronously.
* batcher.sendOutstanding();
*
* // [Optional] Invokes sendOutstanding() and awaits until all pending entries are resolved.
* batcher.flush();
* }
* // batcher.close() invokes `flush()` which will in turn invoke `sendOutstanding()` with await for
* pending batches until its resolved.
*
* List<Row> actualRows = ApiFutures.allAsList(rows).get();
* }
* }</pre>
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
String tableId, @Nullable Filters.Filter filter, @Nullable GrpcCallContext ctx) {
Query query = Query.create(tableId);
if (filter != null) {
query = query.filter(filter);
query.filter(filter);
}
return stub.newBulkReadRowsBatcher(query);
return stub.newBulkReadRowsBatcher(query, ctx);
}

/**
Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcRawCallableFactory;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
Expand Down Expand Up @@ -97,6 +98,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* The core client that converts method calls to RPCs.
Expand Down Expand Up @@ -531,10 +533,15 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
* <li>Split the responses using {@link MutateRowsBatchingDescriptor}.
* </ul>
*/
public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(@Nonnull String tableId) {
public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
@Nonnull String tableId, @Nullable GrpcCallContext ctx) {
UnaryCallable<BulkMutation, Void> callable = this.bulkMutateRowsCallable;
if (ctx != null) {
callable = callable.withDefaultCallContext(ctx);
}
return new BatcherImpl<>(
settings.bulkMutateRowsSettings().getBatchingDescriptor(),
bulkMutateRowsCallable,
callable,
BulkMutation.create(tableId),
settings.bulkMutateRowsSettings().getBatchingSettings(),
clientContext.getExecutor(),
Expand All @@ -556,11 +563,16 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(@Nonnull String tabl
* <li>Split the responses using {@link ReadRowsBatchingDescriptor}.
* </ul>
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(@Nonnull Query query) {
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
@Nonnull Query query, @Nullable GrpcCallContext ctx) {
Preconditions.checkNotNull(query, "query cannot be null");
UnaryCallable<Query, List<Row>> callable = readRowsCallable().all();
if (ctx != null) {
callable = callable.withDefaultCallContext(ctx);
}
return new BatcherImpl<>(
settings.bulkReadRowsSettings().getBatchingDescriptor(),
readRowsCallable().all(),
callable,
query,
settings.bulkReadRowsSettings().getBatchingSettings(),
clientContext.getExecutor());
Expand Down
Expand Up @@ -176,7 +176,8 @@ public Void call() {

// Configure the deadline
ApiCallContext currentCallContext = callContext;
if (!externalFuture.getAttemptSettings().getRpcTimeout().isZero()) {
if (currentCallContext.getTimeout() == null
&& !externalFuture.getAttemptSettings().getRpcTimeout().isZero()) {
currentCallContext =
currentCallContext.withTimeout(externalFuture.getAttemptSettings().getRpcTimeout());
}
Expand Down
Expand Up @@ -35,8 +35,11 @@
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import io.grpc.Attributes;
import io.grpc.BindableService;
import io.grpc.ServerInterceptor;
import io.grpc.ServerTransportFilter;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
Expand Down Expand Up @@ -95,7 +98,11 @@ public void transportTerminated(Attributes transportAttrs) {
terminateAttributes.add(transportAttrs);
}
};
serviceHelper = new FakeServiceHelper(null, transportFilter, service);
serviceHelper =
new FakeServiceHelper(
ImmutableList.<ServerInterceptor>of(),
transportFilter,
ImmutableList.<BindableService>of(service));
port = serviceHelper.getPort();
serviceHelper.start();

Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
Expand Down Expand Up @@ -80,9 +81,13 @@ public void setUp() {
Mockito.when(mockStub.bulkMutateRowsCallable()).thenReturn(mockBulkMutateRowsCallable);
Mockito.when(mockStub.checkAndMutateRowCallable()).thenReturn(mockCheckAndMutateRowCallable);
Mockito.when(mockStub.readModifyWriteRowCallable()).thenReturn(mockReadModifyWriteRowCallable);
Mockito.when(mockStub.newMutateRowsBatcher(Mockito.any(String.class)))
Mockito.when(
mockStub.newMutateRowsBatcher(
Mockito.any(String.class), Mockito.any(GrpcCallContext.class)))
.thenReturn(mockBulkMutationBatcher);
Mockito.when(mockStub.newBulkReadRowsBatcher(Mockito.any(Query.class)))
Mockito.when(
mockStub.newBulkReadRowsBatcher(
Mockito.any(Query.class), Mockito.any(GrpcCallContext.class)))
.thenReturn(mockBulkReadRowsBatcher);
}

Expand Down Expand Up @@ -374,7 +379,8 @@ public void proxyNewBulkMutationBatcherTest() {
ApiFuture<Void> actualRes = batcher.add(request);
assertThat(actualRes).isSameInstanceAs(expectedResponse);

Mockito.verify(mockStub).newMutateRowsBatcher(Mockito.any(String.class));
Mockito.verify(mockStub)
.newMutateRowsBatcher(Mockito.any(String.class), Mockito.any(GrpcCallContext.class));
}

@Test
Expand All @@ -390,7 +396,8 @@ public void proxyNewBulkReadRowsTest() {
ApiFuture<Row> actualResponse = batcher.add(request);
assertThat(actualResponse).isSameInstanceAs(expectedResponse);

Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class));
Mockito.verify(mockStub)
.newBulkReadRowsBatcher(Mockito.any(Query.class), Mockito.any(GrpcCallContext.class));
}

@Test
Expand All @@ -407,7 +414,8 @@ public void proxyNewBulkReadRowsWithFilterTest() {
ApiFuture<Row> actualResponse = batcher.add(request);
assertThat(actualResponse).isSameInstanceAs(expectedResponse);

Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class));
Mockito.verify(mockStub)
.newBulkReadRowsBatcher(Mockito.any(Query.class), Mockito.any(GrpcCallContext.class));
}

@Test
Expand Down
Expand Up @@ -15,40 +15,43 @@
*/
package com.google.cloud.bigtable.data.v2;

import com.google.common.collect.ImmutableList;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerTransportFilter;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.List;

/** Utility class to setup a fake grpc server on a random port. */
public class FakeServiceHelper {
private final int port;
private final Server server;

public FakeServiceHelper(BindableService... services) throws IOException {
this(null, services);
this(ImmutableList.<ServerInterceptor>of(), null, ImmutableList.copyOf(services));
}

public FakeServiceHelper(ServerInterceptor interceptor, BindableService... services)
throws IOException {
this(interceptor, null, services);
this(ImmutableList.of(interceptor), null, ImmutableList.copyOf(services));
}

public FakeServiceHelper(
ServerInterceptor interceptor,
List<ServerInterceptor> interceptors,
ServerTransportFilter transportFilter,
BindableService... services)
List<BindableService> services)
throws IOException {
try (ServerSocket ss = new ServerSocket(0)) {
port = ss.getLocalPort();
}
ServerBuilder builder = ServerBuilder.forPort(port);
if (interceptor != null) {
for (ServerInterceptor interceptor : interceptors) {
builder = builder.intercept(interceptor);
}

if (transportFilter != null) {
builder = builder.addTransportFilter(transportFilter);
}
Expand Down

0 comments on commit 0f3654d

Please sign in to comment.