Skip to content

Commit

Permalink
feat: all setting timeouts for batchers + fix handling of timeouts fo…
Browse files Browse the repository at this point in the history
…r point reads (#861)

* feat: all setting timeouts for batchers + fix handling of timeouts for point reads

This introduces 2 new variants of new*Batcher that accept a GrpcCallContext. This context will be used for batch RPCs generated by the batcher instance.
Also fixes handlings of timeout overrides for point reads and bukmutations. If a user set a timeout, don't override it

* fix incorrect test expectations

* address feedback & fix clirr

* clirr

* format
  • Loading branch information
igorbernstein2 committed Jun 16, 2021
1 parent bbeafb9 commit c145ceb
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 69 deletions.
8 changes: 7 additions & 1 deletion google-cloud-bigtable/clirr-ignored-differences.xml
Expand Up @@ -23,4 +23,10 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/gaxx/tracing/WrappedTracerFactory*</className>
</difference>
</differences>
<difference>
<!-- change method args is ok because EnhancedBigtableStub is InternalApi -->
<differenceType>7004</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub</className>
<method>*</method>
</difference>
</differences>
Expand Up @@ -23,6 +23,7 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
Expand Down Expand Up @@ -1073,7 +1074,40 @@ public void bulkMutateRows(BulkMutation mutation) {
*/
@BetaApi("This surface is likely to change as the batching surface evolves.")
public Batcher<RowMutationEntry, Void> newBulkMutationBatcher(@Nonnull String tableId) {
return stub.newMutateRowsBatcher(tableId);
return newBulkMutationBatcher(tableId, null);
}

/**
* Mutates multiple rows in a batch. Each individual row is mutated atomically as in MutateRow,
* but the entire batch is not executed atomically. The returned Batcher instance is not
* threadsafe, it can only be used from single thread. This method allows customization of the
* underlying RPCs by passing in a {@link com.google.api.gax.grpc.GrpcCallContext}. The same
* context will be reused for all batches. This can be used to customize things like per attempt
* timeouts.
*
* <p>Sample Code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* try (Batcher<RowMutationEntry, Void> batcher = bigtableDataClient.newBulkMutationBatcher("[TABLE]", GrpcCallContext.createDefault().withTimeout(Duration.ofSeconds(10)))) {
* for (String someValue : someCollection) {
* ApiFuture<Void> entryFuture =
* batcher.add(
* RowMutationEntry.create("[ROW KEY]")
* .setCell("[FAMILY NAME]", "[QUALIFIER]", "[VALUE]"));
* }
*
* // Blocks until mutations are applied on all submitted row entries.
* batcher.flush();
* }
* // Before `batcher` is closed, all remaining(If any) mutations are applied.
* }
* }</pre>
*/
@BetaApi("This surface is likely to change as the batching surface evolves.")
public Batcher<RowMutationEntry, Void> newBulkMutationBatcher(
@Nonnull String tableId, @Nullable GrpcCallContext ctx) {
return stub.newMutateRowsBatcher(tableId, ctx);
}

/**
Expand Down Expand Up @@ -1159,11 +1193,61 @@ public Batcher<ByteString, Row> newBulkReadRowsBatcher(String tableId) {
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
String tableId, @Nullable Filters.Filter filter) {
return newBulkReadRowsBatcher(tableId, filter, null);
}

/**
* Reads rows for given tableId and filter criteria in a batch. If the row does not exist, the
* value will be null. The returned Batcher instance is not threadsafe, it can only be used from a
* single thread. This method allows customization of the underlying RPCs by passing in a {@link
* com.google.api.gax.grpc.GrpcCallContext}. The same context will be reused for all batches. This
* can be used to customize things like per attempt timeouts.
*
* <p>Performance notice: The ReadRows protocol requires that rows are sent in ascending key
* order, which means that the keys are processed sequentially on the server-side, so batching
* allows improving throughput but not latency. Lower latencies can be achieved by sending smaller
* requests concurrently.
*
* <p>Sample Code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
*
* // Build the filter expression
* Filter filter = FILTERS.chain()
* .filter(FILTERS.key().regex("prefix.*"))
* .filter(FILTERS.limit().cellsPerRow(10));
*
* List<ApiFuture<Row>> rows = new ArrayList<>();
*
* try (Batcher<ByteString, Row> batcher = bigtableDataClient.newBulkReadRowsBatcher(
* "[TABLE]", filter, GrpcCallContext.createDefault().withTimeout(Duration.ofSeconds(10)))) {
* for (String someValue : someCollection) {
* ApiFuture<Row> rowFuture =
* batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
* rows.add(rowFuture);
* }
*
* // [Optional] Sends collected elements for batching asynchronously.
* batcher.sendOutstanding();
*
* // [Optional] Invokes sendOutstanding() and awaits until all pending entries are resolved.
* batcher.flush();
* }
* // batcher.close() invokes `flush()` which will in turn invoke `sendOutstanding()` with await for
* pending batches until its resolved.
*
* List<Row> actualRows = ApiFutures.allAsList(rows).get();
* }
* }</pre>
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
String tableId, @Nullable Filters.Filter filter, @Nullable GrpcCallContext ctx) {
Query query = Query.create(tableId);
if (filter != null) {
query = query.filter(filter);
query.filter(filter);
}
return stub.newBulkReadRowsBatcher(query);
return stub.newBulkReadRowsBatcher(query, ctx);
}

/**
Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcRawCallableFactory;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
Expand Down Expand Up @@ -98,6 +99,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* The core client that converts method calls to RPCs.
Expand Down Expand Up @@ -536,10 +538,15 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
* <li>Split the responses using {@link MutateRowsBatchingDescriptor}.
* </ul>
*/
public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(@Nonnull String tableId) {
public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
@Nonnull String tableId, @Nullable GrpcCallContext ctx) {
UnaryCallable<BulkMutation, Void> callable = this.bulkMutateRowsCallable;
if (ctx != null) {
callable = callable.withDefaultCallContext(ctx);
}
return new BatcherImpl<>(
settings.bulkMutateRowsSettings().getBatchingDescriptor(),
bulkMutateRowsCallable,
callable,
BulkMutation.create(tableId),
settings.bulkMutateRowsSettings().getBatchingSettings(),
clientContext.getExecutor(),
Expand All @@ -561,11 +568,16 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(@Nonnull String tabl
* <li>Split the responses using {@link ReadRowsBatchingDescriptor}.
* </ul>
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(@Nonnull Query query) {
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
@Nonnull Query query, @Nullable GrpcCallContext ctx) {
Preconditions.checkNotNull(query, "query cannot be null");
UnaryCallable<Query, List<Row>> callable = readRowsCallable().all();
if (ctx != null) {
callable = callable.withDefaultCallContext(ctx);
}
return new BatcherImpl<>(
settings.bulkReadRowsSettings().getBatchingDescriptor(),
readRowsCallable().all(),
callable,
query,
settings.bulkReadRowsSettings().getBatchingSettings(),
clientContext.getExecutor());
Expand Down
Expand Up @@ -176,7 +176,8 @@ public Void call() {

// Configure the deadline
ApiCallContext currentCallContext = callContext;
if (!externalFuture.getAttemptSettings().getRpcTimeout().isZero()) {
if (currentCallContext.getTimeout() == null
&& !externalFuture.getAttemptSettings().getRpcTimeout().isZero()) {
currentCallContext =
currentCallContext.withTimeout(externalFuture.getAttemptSettings().getRpcTimeout());
}
Expand Down
Expand Up @@ -20,7 +20,6 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.bigtable.v2.ReadRowsRequest;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
Expand All @@ -46,9 +45,9 @@ public PointReadTimeoutCallable(ServerStreamingCallable<ReadRowsRequest, RespT>
@Override
public void call(ReadRowsRequest request, ResponseObserver<RespT> observer, ApiCallContext ctx) {
if (isPointRead(request)) {
Duration effectiveTimeout = getEffectivePointReadTimeout(ctx);
if (effectiveTimeout != null) {
ctx = ctx.withTimeout(effectiveTimeout);
Duration streamWaitTimeout = ctx.getStreamWaitTimeout();
if (ctx.getTimeout() == null && streamWaitTimeout != null) {
ctx = ctx.withTimeout(streamWaitTimeout);
}
}
inner.call(request, observer, ctx);
Expand All @@ -63,24 +62,4 @@ private boolean isPointRead(ReadRowsRequest request) {
}
return request.getRows().getRowKeysCount() == 1;
}

/**
* Extracts the effective timeout for a point read.
*
* <p>The effective time is the minimum of a streamWaitTimeout and a user set attempt timeout.
*/
@Nullable
private Duration getEffectivePointReadTimeout(ApiCallContext ctx) {
Duration streamWaitTimeout = ctx.getStreamWaitTimeout();
Duration attemptTimeout = ctx.getTimeout();

if (streamWaitTimeout == null) {
return attemptTimeout;
}

if (attemptTimeout == null) {
return streamWaitTimeout;
}
return (attemptTimeout.compareTo(streamWaitTimeout) <= 0) ? attemptTimeout : streamWaitTimeout;
}
}
Expand Up @@ -35,8 +35,11 @@
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import io.grpc.Attributes;
import io.grpc.BindableService;
import io.grpc.ServerInterceptor;
import io.grpc.ServerTransportFilter;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
Expand Down Expand Up @@ -95,7 +98,11 @@ public void transportTerminated(Attributes transportAttrs) {
terminateAttributes.add(transportAttrs);
}
};
serviceHelper = new FakeServiceHelper(null, transportFilter, service);
serviceHelper =
new FakeServiceHelper(
ImmutableList.<ServerInterceptor>of(),
transportFilter,
ImmutableList.<BindableService>of(service));
port = serviceHelper.getPort();
serviceHelper.start();

Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
Expand Down Expand Up @@ -80,9 +81,13 @@ public void setUp() {
Mockito.when(mockStub.bulkMutateRowsCallable()).thenReturn(mockBulkMutateRowsCallable);
Mockito.when(mockStub.checkAndMutateRowCallable()).thenReturn(mockCheckAndMutateRowCallable);
Mockito.when(mockStub.readModifyWriteRowCallable()).thenReturn(mockReadModifyWriteRowCallable);
Mockito.when(mockStub.newMutateRowsBatcher(Mockito.any(String.class)))
Mockito.when(
mockStub.newMutateRowsBatcher(
Mockito.any(String.class), Mockito.any(GrpcCallContext.class)))
.thenReturn(mockBulkMutationBatcher);
Mockito.when(mockStub.newBulkReadRowsBatcher(Mockito.any(Query.class)))
Mockito.when(
mockStub.newBulkReadRowsBatcher(
Mockito.any(Query.class), Mockito.any(GrpcCallContext.class)))
.thenReturn(mockBulkReadRowsBatcher);
}

Expand Down Expand Up @@ -374,7 +379,8 @@ public void proxyNewBulkMutationBatcherTest() {
ApiFuture<Void> actualRes = batcher.add(request);
assertThat(actualRes).isSameInstanceAs(expectedResponse);

Mockito.verify(mockStub).newMutateRowsBatcher(Mockito.any(String.class));
Mockito.verify(mockStub)
.newMutateRowsBatcher(Mockito.any(String.class), Mockito.any(GrpcCallContext.class));
}

@Test
Expand All @@ -390,7 +396,8 @@ public void proxyNewBulkReadRowsTest() {
ApiFuture<Row> actualResponse = batcher.add(request);
assertThat(actualResponse).isSameInstanceAs(expectedResponse);

Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class));
Mockito.verify(mockStub)
.newBulkReadRowsBatcher(Mockito.any(Query.class), Mockito.any(GrpcCallContext.class));
}

@Test
Expand All @@ -407,7 +414,8 @@ public void proxyNewBulkReadRowsWithFilterTest() {
ApiFuture<Row> actualResponse = batcher.add(request);
assertThat(actualResponse).isSameInstanceAs(expectedResponse);

Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class));
Mockito.verify(mockStub)
.newBulkReadRowsBatcher(Mockito.any(Query.class), Mockito.any(GrpcCallContext.class));
}

@Test
Expand Down
Expand Up @@ -15,40 +15,43 @@
*/
package com.google.cloud.bigtable.data.v2;

import com.google.common.collect.ImmutableList;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerTransportFilter;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.List;

/** Utility class to setup a fake grpc server on a random port. */
public class FakeServiceHelper {
private final int port;
private final Server server;

public FakeServiceHelper(BindableService... services) throws IOException {
this(null, services);
this(ImmutableList.<ServerInterceptor>of(), null, ImmutableList.copyOf(services));
}

public FakeServiceHelper(ServerInterceptor interceptor, BindableService... services)
throws IOException {
this(interceptor, null, services);
this(ImmutableList.of(interceptor), null, ImmutableList.copyOf(services));
}

public FakeServiceHelper(
ServerInterceptor interceptor,
List<ServerInterceptor> interceptors,
ServerTransportFilter transportFilter,
BindableService... services)
List<BindableService> services)
throws IOException {
try (ServerSocket ss = new ServerSocket(0)) {
port = ss.getLocalPort();
}
ServerBuilder builder = ServerBuilder.forPort(port);
if (interceptor != null) {
for (ServerInterceptor interceptor : interceptors) {
builder = builder.intercept(interceptor);
}

if (transportFilter != null) {
builder = builder.addTransportFilter(transportFilter);
}
Expand Down

0 comments on commit c145ceb

Please sign in to comment.