Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: all setting timeouts for batchers + fix handling of timeouts for point reads #861

Merged
merged 5 commits into from Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,6 +23,7 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
Expand Down Expand Up @@ -1073,7 +1074,39 @@ public void bulkMutateRows(BulkMutation mutation) {
*/
@BetaApi("This surface is likely to change as the batching surface evolves.")
public Batcher<RowMutationEntry, Void> newBulkMutationBatcher(@Nonnull String tableId) {
return stub.newMutateRowsBatcher(tableId);
return newBulkMutationBatcher(tableId, null);
}

/**
* Mutates multiple rows in a batch. Each individual row is mutated atomically as in MutateRow,
* but the entire batch is not executed atomically. The returned Batcher instance is not
* threadsafe, it can only be used from single thread. This method allows customization of the
* underlying RPCs by passing in a {@link com.google.api.gax.grpc.GrpcCallContext}. The same
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
* context will be reused for all batches.
*
* <p>Sample Code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* try (Batcher<RowMutationEntry, Void> batcher = bigtableDataClient.newBulkMutationBatcher("[TABLE]", GrpcCallContext.createDefault().withTimeout(Duration.ofSeconds(10)))) {
* for (String someValue : someCollection) {
* ApiFuture<Void> entryFuture =
* batcher.add(
* RowMutationEntry.create("[ROW KEY]")
* .setCell("[FAMILY NAME]", "[QUALIFIER]", "[VALUE]"));
* }
*
* // Blocks until mutations are applied on all submitted row entries.
* batcher.flush();
* }
* // Before `batcher` is closed, all remaining(If any) mutations are applied.
* }
* }</pre>
*/
@BetaApi("This surface is likely to change as the batching surface evolves.")
public Batcher<RowMutationEntry, Void> newBulkMutationBatcher(
@Nonnull String tableId, @Nullable GrpcCallContext ctx) {
return stub.newMutateRowsBatcher(tableId, ctx);
}

/**
Expand Down Expand Up @@ -1159,11 +1192,59 @@ public Batcher<ByteString, Row> newBulkReadRowsBatcher(String tableId) {
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
String tableId, @Nullable Filters.Filter filter) {
return newBulkReadRowsBatcher(tableId, filter, null);
}

/**
* Reads rows for given tableId and filter criteria in a batch. If the row does not exist, the
* value will be null. The returned Batcher instance is not threadsafe, it can only be used from a
* single thread. This method allows customization of the underlying RPCs by passing in a {@link
* com.google.api.gax.grpc.GrpcCallContext}. The same context will be reused for all batches.
*
* <p>Performance notice: The ReadRows protocol requires that rows are sent in ascending key
* order, which means that the keys are processed sequentially on the server-side, so batching
* allows improving throughput but not latency. Lower latencies can be achieved by sending smaller
* requests concurrently.
*
* <p>Sample Code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
*
* // Build the filter expression
* Filter filter = FILTERS.chain()
* .filter(FILTERS.key().regex("prefix.*"))
* .filter(FILTERS.limit().cellsPerRow(10));
*
* List<ApiFuture<Row>> rows = new ArrayList<>();
*
* try (Batcher<ByteString, Row> batcher = bigtableDataClient.newBulkReadRowsBatcher("[TABLE]", filter)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include GrpcCallContext in the example here?

* for (String someValue : someCollection) {
* ApiFuture<Row> rowFuture =
* batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
* rows.add(rowFuture);
* }
*
* // [Optional] Sends collected elements for batching asynchronously.
* batcher.sendOutstanding();
*
* // [Optional] Invokes sendOutstanding() and awaits until all pending entries are resolved.
* batcher.flush();
* }
* // batcher.close() invokes `flush()` which will in turn invoke `sendOutstanding()` with await for
* pending batches until its resolved.
*
* List<Row> actualRows = ApiFutures.allAsList(rows).get();
* }
* }</pre>
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
String tableId, @Nullable Filters.Filter filter, @Nullable GrpcCallContext ctx) {
Query query = Query.create(tableId);
if (filter != null) {
query = query.filter(filter);
query.filter(filter);
}
return stub.newBulkReadRowsBatcher(query);
return stub.newBulkReadRowsBatcher(query, ctx);
}

/**
Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcCallSettings;
import com.google.api.gax.grpc.GrpcRawCallableFactory;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
Expand Down Expand Up @@ -98,6 +99,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* The core client that converts method calls to RPCs.
Expand Down Expand Up @@ -536,10 +538,15 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
* <li>Split the responses using {@link MutateRowsBatchingDescriptor}.
* </ul>
*/
public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(@Nonnull String tableId) {
public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
@Nonnull String tableId, @Nullable GrpcCallContext ctx) {
UnaryCallable<BulkMutation, Void> callable = this.bulkMutateRowsCallable;
if (ctx != null) {
callable = callable.withDefaultCallContext(ctx);
}
return new BatcherImpl<>(
settings.bulkMutateRowsSettings().getBatchingDescriptor(),
bulkMutateRowsCallable,
callable,
BulkMutation.create(tableId),
settings.bulkMutateRowsSettings().getBatchingSettings(),
clientContext.getExecutor(),
Expand All @@ -561,11 +568,16 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(@Nonnull String tabl
* <li>Split the responses using {@link ReadRowsBatchingDescriptor}.
* </ul>
*/
public Batcher<ByteString, Row> newBulkReadRowsBatcher(@Nonnull Query query) {
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
@Nonnull Query query, @Nullable GrpcCallContext ctx) {
Preconditions.checkNotNull(query, "query cannot be null");
UnaryCallable<Query, List<Row>> callable = readRowsCallable().all();
if (ctx != null) {
callable = callable.withDefaultCallContext(ctx);
}
return new BatcherImpl<>(
settings.bulkReadRowsSettings().getBatchingDescriptor(),
readRowsCallable().all(),
callable,
query,
settings.bulkReadRowsSettings().getBatchingSettings(),
clientContext.getExecutor());
Expand Down
Expand Up @@ -176,7 +176,8 @@ public Void call() {

// Configure the deadline
ApiCallContext currentCallContext = callContext;
if (!externalFuture.getAttemptSettings().getRpcTimeout().isZero()) {
if (currentCallContext.getTimeout() == null
&& !externalFuture.getAttemptSettings().getRpcTimeout().isZero()) {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
currentCallContext =
currentCallContext.withTimeout(externalFuture.getAttemptSettings().getRpcTimeout());
}
Expand Down
Expand Up @@ -20,7 +20,6 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.bigtable.v2.ReadRowsRequest;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
Expand All @@ -46,9 +45,9 @@ public PointReadTimeoutCallable(ServerStreamingCallable<ReadRowsRequest, RespT>
@Override
public void call(ReadRowsRequest request, ResponseObserver<RespT> observer, ApiCallContext ctx) {
if (isPointRead(request)) {
Duration effectiveTimeout = getEffectivePointReadTimeout(ctx);
if (effectiveTimeout != null) {
ctx = ctx.withTimeout(effectiveTimeout);
Duration streamWaitTimeout = ctx.getStreamWaitTimeout();
if (ctx.getTimeout() == null && streamWaitTimeout != null) {
ctx = ctx.withTimeout(streamWaitTimeout);
}
}
inner.call(request, observer, ctx);
Expand All @@ -63,24 +62,4 @@ private boolean isPointRead(ReadRowsRequest request) {
}
return request.getRows().getRowKeysCount() == 1;
}

/**
* Extracts the effective timeout for a point read.
*
* <p>The effective time is the minimum of a streamWaitTimeout and a user set attempt timeout.
*/
@Nullable
private Duration getEffectivePointReadTimeout(ApiCallContext ctx) {
Duration streamWaitTimeout = ctx.getStreamWaitTimeout();
Duration attemptTimeout = ctx.getTimeout();

if (streamWaitTimeout == null) {
return attemptTimeout;
}

if (attemptTimeout == null) {
return streamWaitTimeout;
}
return (attemptTimeout.compareTo(streamWaitTimeout) <= 0) ? attemptTimeout : streamWaitTimeout;
}
}
Expand Up @@ -35,8 +35,11 @@
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import io.grpc.Attributes;
import io.grpc.BindableService;
import io.grpc.ServerInterceptor;
import io.grpc.ServerTransportFilter;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
Expand Down Expand Up @@ -95,7 +98,11 @@ public void transportTerminated(Attributes transportAttrs) {
terminateAttributes.add(transportAttrs);
}
};
serviceHelper = new FakeServiceHelper(null, transportFilter, service);
serviceHelper =
new FakeServiceHelper(
ImmutableList.<ServerInterceptor>of(),
transportFilter,
ImmutableList.<BindableService>of(service));
port = serviceHelper.getPort();
serviceHelper.start();

Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
Expand Down Expand Up @@ -80,9 +81,13 @@ public void setUp() {
Mockito.when(mockStub.bulkMutateRowsCallable()).thenReturn(mockBulkMutateRowsCallable);
Mockito.when(mockStub.checkAndMutateRowCallable()).thenReturn(mockCheckAndMutateRowCallable);
Mockito.when(mockStub.readModifyWriteRowCallable()).thenReturn(mockReadModifyWriteRowCallable);
Mockito.when(mockStub.newMutateRowsBatcher(Mockito.any(String.class)))
Mockito.when(
mockStub.newMutateRowsBatcher(
Mockito.any(String.class), Mockito.any(GrpcCallContext.class)))
.thenReturn(mockBulkMutationBatcher);
Mockito.when(mockStub.newBulkReadRowsBatcher(Mockito.any(Query.class)))
Mockito.when(
mockStub.newBulkReadRowsBatcher(
Mockito.any(Query.class), Mockito.any(GrpcCallContext.class)))
.thenReturn(mockBulkReadRowsBatcher);
}

Expand Down Expand Up @@ -374,7 +379,8 @@ public void proxyNewBulkMutationBatcherTest() {
ApiFuture<Void> actualRes = batcher.add(request);
assertThat(actualRes).isSameInstanceAs(expectedResponse);

Mockito.verify(mockStub).newMutateRowsBatcher(Mockito.any(String.class));
Mockito.verify(mockStub)
.newMutateRowsBatcher(Mockito.any(String.class), Mockito.any(GrpcCallContext.class));
}

@Test
Expand All @@ -390,7 +396,8 @@ public void proxyNewBulkReadRowsTest() {
ApiFuture<Row> actualResponse = batcher.add(request);
assertThat(actualResponse).isSameInstanceAs(expectedResponse);

Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class));
Mockito.verify(mockStub)
.newBulkReadRowsBatcher(Mockito.any(Query.class), Mockito.any(GrpcCallContext.class));
}

@Test
Expand All @@ -407,7 +414,8 @@ public void proxyNewBulkReadRowsWithFilterTest() {
ApiFuture<Row> actualResponse = batcher.add(request);
assertThat(actualResponse).isSameInstanceAs(expectedResponse);

Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class));
Mockito.verify(mockStub)
.newBulkReadRowsBatcher(Mockito.any(Query.class), Mockito.any(GrpcCallContext.class));
}

@Test
Expand Down
Expand Up @@ -15,40 +15,43 @@
*/
package com.google.cloud.bigtable.data.v2;

import com.google.common.collect.ImmutableList;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerTransportFilter;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.List;

/** Utility class to setup a fake grpc server on a random port. */
public class FakeServiceHelper {
private final int port;
private final Server server;

public FakeServiceHelper(BindableService... services) throws IOException {
this(null, services);
this(ImmutableList.<ServerInterceptor>of(), null, ImmutableList.copyOf(services));
}

public FakeServiceHelper(ServerInterceptor interceptor, BindableService... services)
throws IOException {
this(interceptor, null, services);
this(ImmutableList.of(interceptor), null, ImmutableList.copyOf(services));
}

public FakeServiceHelper(
ServerInterceptor interceptor,
List<ServerInterceptor> interceptors,
ServerTransportFilter transportFilter,
BindableService... services)
List<BindableService> services)
throws IOException {
try (ServerSocket ss = new ServerSocket(0)) {
port = ss.getLocalPort();
}
ServerBuilder builder = ServerBuilder.forPort(port);
if (interceptor != null) {
for (ServerInterceptor interceptor : interceptors) {
builder = builder.intercept(interceptor);
}

if (transportFilter != null) {
builder = builder.addTransportFilter(transportFilter);
}
Expand Down