From 50c0a5162faa0cb74c119635aa06c6315e080a16 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Thu, 17 Jun 2021 11:02:40 -0400 Subject: [PATCH] Revert "feat: all setting timeouts for batchers + fix handling of timeouts for point reads (#861)" (#875) This reverts commit c145ceb592f04f94a09be815feff87e0c64e8e7d. --- .../clirr-ignored-differences.xml | 8 +- .../bigtable/data/v2/BigtableDataClient.java | 90 +------------- .../data/v2/stub/EnhancedBigtableStub.java | 20 +-- .../mutaterows/MutateRowsAttemptCallable.java | 3 +- .../readrows/PointReadTimeoutCallable.java | 27 +++- .../v2/BigtableDataClientFactoryTest.java | 9 +- .../data/v2/BigtableDataClientTest.java | 18 +-- .../bigtable/data/v2/FakeServiceHelper.java | 13 +- .../v2/stub/EnhancedBigtableStubTest.java | 116 +----------------- .../PointReadTimeoutCallableTest.java | 20 ++- 10 files changed, 69 insertions(+), 255 deletions(-) diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml index 9391d2ea8a..ab921a973f 100644 --- a/google-cloud-bigtable/clirr-ignored-differences.xml +++ b/google-cloud-bigtable/clirr-ignored-differences.xml @@ -23,10 +23,4 @@ 8001 com/google/cloud/bigtable/gaxx/tracing/WrappedTracerFactory* - - - 7004 - com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub - * - - + \ No newline at end of file diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java index ce9a57fa7e..04e1b15987 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java @@ -23,7 +23,6 @@ import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; import com.google.api.gax.batching.Batcher; -import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.ApiExceptions; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStream; @@ -1074,40 +1073,7 @@ public void bulkMutateRows(BulkMutation mutation) { */ @BetaApi("This surface is likely to change as the batching surface evolves.") public Batcher newBulkMutationBatcher(@Nonnull String tableId) { - return newBulkMutationBatcher(tableId, null); - } - - /** - * Mutates multiple rows in a batch. Each individual row is mutated atomically as in MutateRow, - * but the entire batch is not executed atomically. The returned Batcher instance is not - * threadsafe, it can only be used from single thread. This method allows customization of the - * underlying RPCs by passing in a {@link com.google.api.gax.grpc.GrpcCallContext}. The same - * context will be reused for all batches. This can be used to customize things like per attempt - * timeouts. - * - *

Sample Code: - * - *

{@code
-   * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
-   *   try (Batcher batcher = bigtableDataClient.newBulkMutationBatcher("[TABLE]", GrpcCallContext.createDefault().withTimeout(Duration.ofSeconds(10)))) {
-   *     for (String someValue : someCollection) {
-   *       ApiFuture entryFuture =
-   *           batcher.add(
-   *               RowMutationEntry.create("[ROW KEY]")
-   *                   .setCell("[FAMILY NAME]", "[QUALIFIER]", "[VALUE]"));
-   *     }
-   *
-   *     // Blocks until mutations are applied on all submitted row entries.
-   *     batcher.flush();
-   *   }
-   *   // Before `batcher` is closed, all remaining(If any) mutations are applied.
-   * }
-   * }
- */ - @BetaApi("This surface is likely to change as the batching surface evolves.") - public Batcher newBulkMutationBatcher( - @Nonnull String tableId, @Nullable GrpcCallContext ctx) { - return stub.newMutateRowsBatcher(tableId, ctx); + return stub.newMutateRowsBatcher(tableId); } /** @@ -1193,61 +1159,11 @@ public Batcher newBulkReadRowsBatcher(String tableId) { */ public Batcher newBulkReadRowsBatcher( String tableId, @Nullable Filters.Filter filter) { - return newBulkReadRowsBatcher(tableId, filter, null); - } - - /** - * Reads rows for given tableId and filter criteria in a batch. If the row does not exist, the - * value will be null. The returned Batcher instance is not threadsafe, it can only be used from a - * single thread. This method allows customization of the underlying RPCs by passing in a {@link - * com.google.api.gax.grpc.GrpcCallContext}. The same context will be reused for all batches. This - * can be used to customize things like per attempt timeouts. - * - *

Performance notice: The ReadRows protocol requires that rows are sent in ascending key - * order, which means that the keys are processed sequentially on the server-side, so batching - * allows improving throughput but not latency. Lower latencies can be achieved by sending smaller - * requests concurrently. - * - *

Sample Code: - * - *

{@code
-   * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
-   *
-   *  // Build the filter expression
-   *  Filter filter = FILTERS.chain()
-   *         .filter(FILTERS.key().regex("prefix.*"))
-   *         .filter(FILTERS.limit().cellsPerRow(10));
-   *
-   *   List> rows = new ArrayList<>();
-   *
-   *   try (Batcher batcher = bigtableDataClient.newBulkReadRowsBatcher(
-   *    "[TABLE]", filter, GrpcCallContext.createDefault().withTimeout(Duration.ofSeconds(10)))) {
-   *     for (String someValue : someCollection) {
-   *       ApiFuture rowFuture =
-   *           batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
-   *       rows.add(rowFuture);
-   *     }
-   *
-   *     // [Optional] Sends collected elements for batching asynchronously.
-   *     batcher.sendOutstanding();
-   *
-   *     // [Optional] Invokes sendOutstanding() and awaits until all pending entries are resolved.
-   *     batcher.flush();
-   *   }
-   *   // batcher.close() invokes `flush()` which will in turn invoke `sendOutstanding()` with await for
-   *   pending batches until its resolved.
-   *
-   *   List actualRows = ApiFutures.allAsList(rows).get();
-   * }
-   * }
- */ - public Batcher newBulkReadRowsBatcher( - String tableId, @Nullable Filters.Filter filter, @Nullable GrpcCallContext ctx) { Query query = Query.create(tableId); if (filter != null) { - query.filter(filter); + query = query.filter(filter); } - return stub.newBulkReadRowsBatcher(query, ctx); + return stub.newBulkReadRowsBatcher(query); } /** diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index b2e63e559c..55e928d59f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -23,7 +23,6 @@ import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.grpc.GaxGrpcProperties; -import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcCallSettings; import com.google.api.gax.grpc.GrpcRawCallableFactory; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; @@ -99,7 +98,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; -import javax.annotation.Nullable; /** * The core client that converts method calls to RPCs. @@ -538,15 +536,10 @@ private UnaryCallable createBulkMutateRowsCallable() { *
  • Split the responses using {@link MutateRowsBatchingDescriptor}. * */ - public Batcher newMutateRowsBatcher( - @Nonnull String tableId, @Nullable GrpcCallContext ctx) { - UnaryCallable callable = this.bulkMutateRowsCallable; - if (ctx != null) { - callable = callable.withDefaultCallContext(ctx); - } + public Batcher newMutateRowsBatcher(@Nonnull String tableId) { return new BatcherImpl<>( settings.bulkMutateRowsSettings().getBatchingDescriptor(), - callable, + bulkMutateRowsCallable, BulkMutation.create(tableId), settings.bulkMutateRowsSettings().getBatchingSettings(), clientContext.getExecutor(), @@ -568,16 +561,11 @@ public Batcher newMutateRowsBatcher( *
  • Split the responses using {@link ReadRowsBatchingDescriptor}. * */ - public Batcher newBulkReadRowsBatcher( - @Nonnull Query query, @Nullable GrpcCallContext ctx) { + public Batcher newBulkReadRowsBatcher(@Nonnull Query query) { Preconditions.checkNotNull(query, "query cannot be null"); - UnaryCallable> callable = readRowsCallable().all(); - if (ctx != null) { - callable = callable.withDefaultCallContext(ctx); - } return new BatcherImpl<>( settings.bulkReadRowsSettings().getBatchingDescriptor(), - callable, + readRowsCallable().all(), query, settings.bulkReadRowsSettings().getBatchingSettings(), clientContext.getExecutor()); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java index de2bf6224f..e85270f619 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java @@ -176,8 +176,7 @@ public Void call() { // Configure the deadline ApiCallContext currentCallContext = callContext; - if (currentCallContext.getTimeout() == null - && !externalFuture.getAttemptSettings().getRpcTimeout().isZero()) { + if (!externalFuture.getAttemptSettings().getRpcTimeout().isZero()) { currentCallContext = currentCallContext.withTimeout(externalFuture.getAttemptSettings().getRpcTimeout()); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable.java index 4f1581b3e7..7ce0e8b7c6 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable.java @@ -20,6 +20,7 @@ import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.bigtable.v2.ReadRowsRequest; +import javax.annotation.Nullable; import org.threeten.bp.Duration; /** @@ -45,9 +46,9 @@ public PointReadTimeoutCallable(ServerStreamingCallable @Override public void call(ReadRowsRequest request, ResponseObserver observer, ApiCallContext ctx) { if (isPointRead(request)) { - Duration streamWaitTimeout = ctx.getStreamWaitTimeout(); - if (ctx.getTimeout() == null && streamWaitTimeout != null) { - ctx = ctx.withTimeout(streamWaitTimeout); + Duration effectiveTimeout = getEffectivePointReadTimeout(ctx); + if (effectiveTimeout != null) { + ctx = ctx.withTimeout(effectiveTimeout); } } inner.call(request, observer, ctx); @@ -62,4 +63,24 @@ private boolean isPointRead(ReadRowsRequest request) { } return request.getRows().getRowKeysCount() == 1; } + + /** + * Extracts the effective timeout for a point read. + * + *

    The effective time is the minimum of a streamWaitTimeout and a user set attempt timeout. + */ + @Nullable + private Duration getEffectivePointReadTimeout(ApiCallContext ctx) { + Duration streamWaitTimeout = ctx.getStreamWaitTimeout(); + Duration attemptTimeout = ctx.getTimeout(); + + if (streamWaitTimeout == null) { + return attemptTimeout; + } + + if (attemptTimeout == null) { + return streamWaitTimeout; + } + return (attemptTimeout.compareTo(streamWaitTimeout) <= 0) ? attemptTimeout : streamWaitTimeout; + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java index 949f5139b7..9e733640f6 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java @@ -35,11 +35,8 @@ import com.google.cloud.bigtable.data.v2.internal.NameUtil; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import io.grpc.Attributes; -import io.grpc.BindableService; -import io.grpc.ServerInterceptor; import io.grpc.ServerTransportFilter; import io.grpc.stub.StreamObserver; import java.io.IOException; @@ -98,11 +95,7 @@ public void transportTerminated(Attributes transportAttrs) { terminateAttributes.add(transportAttrs); } }; - serviceHelper = - new FakeServiceHelper( - ImmutableList.of(), - transportFilter, - ImmutableList.of(service)); + serviceHelper = new FakeServiceHelper(null, transportFilter, service); port = serviceHelper.getPort(); serviceHelper.start(); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java index 67befad2a4..c3bf52b63f 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java @@ -22,7 +22,6 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.gax.batching.Batcher; -import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; @@ -81,13 +80,9 @@ public void setUp() { Mockito.when(mockStub.bulkMutateRowsCallable()).thenReturn(mockBulkMutateRowsCallable); Mockito.when(mockStub.checkAndMutateRowCallable()).thenReturn(mockCheckAndMutateRowCallable); Mockito.when(mockStub.readModifyWriteRowCallable()).thenReturn(mockReadModifyWriteRowCallable); - Mockito.when( - mockStub.newMutateRowsBatcher( - Mockito.any(String.class), Mockito.any(GrpcCallContext.class))) + Mockito.when(mockStub.newMutateRowsBatcher(Mockito.any(String.class))) .thenReturn(mockBulkMutationBatcher); - Mockito.when( - mockStub.newBulkReadRowsBatcher( - Mockito.any(Query.class), Mockito.any(GrpcCallContext.class))) + Mockito.when(mockStub.newBulkReadRowsBatcher(Mockito.any(Query.class))) .thenReturn(mockBulkReadRowsBatcher); } @@ -379,8 +374,7 @@ public void proxyNewBulkMutationBatcherTest() { ApiFuture actualRes = batcher.add(request); assertThat(actualRes).isSameInstanceAs(expectedResponse); - Mockito.verify(mockStub) - .newMutateRowsBatcher(Mockito.any(String.class), Mockito.any(GrpcCallContext.class)); + Mockito.verify(mockStub).newMutateRowsBatcher(Mockito.any(String.class)); } @Test @@ -396,8 +390,7 @@ public void proxyNewBulkReadRowsTest() { ApiFuture actualResponse = batcher.add(request); assertThat(actualResponse).isSameInstanceAs(expectedResponse); - Mockito.verify(mockStub) - .newBulkReadRowsBatcher(Mockito.any(Query.class), Mockito.any(GrpcCallContext.class)); + Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class)); } @Test @@ -414,8 +407,7 @@ public void proxyNewBulkReadRowsWithFilterTest() { ApiFuture actualResponse = batcher.add(request); assertThat(actualResponse).isSameInstanceAs(expectedResponse); - Mockito.verify(mockStub) - .newBulkReadRowsBatcher(Mockito.any(Query.class), Mockito.any(GrpcCallContext.class)); + Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class)); } @Test diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/FakeServiceHelper.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/FakeServiceHelper.java index f0dd2f8809..9ec5e59cb7 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/FakeServiceHelper.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/FakeServiceHelper.java @@ -15,7 +15,6 @@ */ package com.google.cloud.bigtable.data.v2; -import com.google.common.collect.ImmutableList; import io.grpc.BindableService; import io.grpc.Server; import io.grpc.ServerBuilder; @@ -23,7 +22,6 @@ import io.grpc.ServerTransportFilter; import java.io.IOException; import java.net.ServerSocket; -import java.util.List; /** Utility class to setup a fake grpc server on a random port. */ public class FakeServiceHelper { @@ -31,27 +29,26 @@ public class FakeServiceHelper { private final Server server; public FakeServiceHelper(BindableService... services) throws IOException { - this(ImmutableList.of(), null, ImmutableList.copyOf(services)); + this(null, services); } public FakeServiceHelper(ServerInterceptor interceptor, BindableService... services) throws IOException { - this(ImmutableList.of(interceptor), null, ImmutableList.copyOf(services)); + this(interceptor, null, services); } public FakeServiceHelper( - List interceptors, + ServerInterceptor interceptor, ServerTransportFilter transportFilter, - List services) + BindableService... services) throws IOException { try (ServerSocket ss = new ServerSocket(0)) { port = ss.getLocalPort(); } ServerBuilder builder = ServerBuilder.forPort(port); - for (ServerInterceptor interceptor : interceptors) { + if (interceptor != null) { builder = builder.intercept(interceptor); } - if (transportFilter != null) { builder = builder.addTransportFilter(transportFilter); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index 8cb82359ad..b66596fb1a 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -17,18 +17,14 @@ import static com.google.common.truth.Truth.assertThat; -import com.google.api.gax.batching.Batcher; import com.google.api.gax.batching.BatcherImpl; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.GaxGrpcProperties; -import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.bigtable.v2.BigtableGrpc; -import com.google.bigtable.v2.MutateRowsRequest; -import com.google.bigtable.v2.MutateRowsResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.RowSet; @@ -40,15 +36,10 @@ import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter; import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.Row; -import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Queues; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.StringValue; -import io.grpc.BindableService; -import io.grpc.Context; -import io.grpc.Deadline; import io.grpc.Metadata; import io.grpc.ServerCall; import io.grpc.ServerCall.Listener; @@ -66,14 +57,12 @@ import java.util.Collection; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.threeten.bp.Duration; @RunWith(JUnit4.class) public class EnhancedBigtableStubTest { @@ -86,7 +75,6 @@ public class EnhancedBigtableStubTest { FakeServiceHelper serviceHelper; private MetadataInterceptor metadataInterceptor; - private ContextInterceptor contextInterceptor; private FakeDataService fakeDataService; private EnhancedBigtableStubSettings defaultSettings; private EnhancedBigtableStub enhancedBigtableStub; @@ -94,14 +82,8 @@ public class EnhancedBigtableStubTest { @Before public void setUp() throws IOException, IllegalAccessException, InstantiationException { metadataInterceptor = new MetadataInterceptor(); - contextInterceptor = new ContextInterceptor(); fakeDataService = new FakeDataService(); - - serviceHelper = - new FakeServiceHelper( - ImmutableList.of(contextInterceptor, metadataInterceptor), - null, - ImmutableList.of(fakeDataService)); + serviceHelper = new FakeServiceHelper(metadataInterceptor, fakeDataService); serviceHelper.start(); defaultSettings = @@ -273,8 +255,8 @@ public void testBulkMutationFlowControllerConfigured() throws Exception { // Creating 2 batchers from the same stub, they should share the same FlowController and // FlowControlEventStats - try (BatcherImpl batcher1 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table1", null); - BatcherImpl batcher2 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table2", null)) { + try (BatcherImpl batcher1 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table1"); + BatcherImpl batcher2 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table2")) { assertThat(batcher1.getFlowController()).isNotNull(); assertThat(batcher1.getFlowController().getFlowControlEventStats()).isNotNull(); assertThat(batcher1).isNotSameInstanceAs(batcher2); @@ -298,8 +280,8 @@ public void testBulkMutationFlowControllerConfigured() throws Exception { // Creating 2 batchers from different stubs, they should not share the same FlowController and // FlowControlEventStats - try (BatcherImpl batcher1 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table1", null); - BatcherImpl batcher2 = (BatcherImpl) stub2.newMutateRowsBatcher("my-table2", null)) { + try (BatcherImpl batcher1 = (BatcherImpl) stub1.newMutateRowsBatcher("my-table1"); + BatcherImpl batcher2 = (BatcherImpl) stub2.newMutateRowsBatcher("my-table2")) { assertThat(batcher1.getFlowController()).isNotNull(); assertThat(batcher1.getFlowController().getFlowControlEventStats()).isNotNull(); assertThat(batcher1.getFlowController()).isNotSameInstanceAs(batcher2.getFlowController()); @@ -316,7 +298,7 @@ public void testBulkMutationFlowControllerConfigured() throws Exception { .build() .getStubSettings()); ) { - try (BatcherImpl batcher = (BatcherImpl) stub2.newMutateRowsBatcher("my-table", null)) { + try (BatcherImpl batcher = (BatcherImpl) stub2.newMutateRowsBatcher("my-table")) { assertThat(batcher.getFlowController().getMaxElementCountLimit()).isEqualTo(100L); assertThat(batcher.getFlowController().getCurrentElementCountLimit()).isEqualTo(100L); assertThat(batcher.getFlowController().getMinElementCountLimit()).isEqualTo(100L); @@ -324,68 +306,6 @@ public void testBulkMutationFlowControllerConfigured() throws Exception { } } - @Test - public void testCallContextPropagatedInMutationBatcher() - throws IOException, InterruptedException, ExecutionException { - EnhancedBigtableStubSettings settings = - defaultSettings - .toBuilder() - .setRefreshingChannel(true) - .setPrimedTableIds("table1", "table2") - .build(); - - try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings)) { - // clear the previous contexts - contextInterceptor.contexts.clear(); - - // Override the timeout - GrpcCallContext clientCtx = - GrpcCallContext.createDefault().withTimeout(Duration.ofMinutes(10)); - - // Send a batch - try (Batcher batcher = - stub.newMutateRowsBatcher("table1", clientCtx)) { - batcher.add(RowMutationEntry.create("key").deleteRow()).get(); - } - - // Ensure that the server got the overriden deadline - Context serverCtx = contextInterceptor.contexts.poll(); - assertThat(serverCtx).isNotNull(); - assertThat(serverCtx.getDeadline()).isAtLeast(Deadline.after(8, TimeUnit.MINUTES)); - } - } - - @Test - public void testCallContextPropagatedInReadBatcher() - throws IOException, InterruptedException, ExecutionException { - EnhancedBigtableStubSettings settings = - defaultSettings - .toBuilder() - .setRefreshingChannel(true) - .setPrimedTableIds("table1", "table2") - .build(); - - try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings)) { - // clear the previous contexts - contextInterceptor.contexts.clear(); - - // Override the timeout - GrpcCallContext clientCtx = - GrpcCallContext.createDefault().withTimeout(Duration.ofMinutes(10)); - - // Send a batch - try (Batcher batcher = - stub.newBulkReadRowsBatcher(Query.create("table1"), clientCtx)) { - batcher.add(ByteString.copyFromUtf8("key")).get(); - } - - // Ensure that the server got the overriden deadline - Context serverCtx = contextInterceptor.contexts.poll(); - assertThat(serverCtx).isNotNull(); - assertThat(serverCtx.getDeadline()).isAtLeast(Deadline.after(8, TimeUnit.MINUTES)); - } - } - private static class MetadataInterceptor implements ServerInterceptor { final BlockingQueue headers = Queues.newLinkedBlockingDeque(); @@ -399,19 +319,6 @@ public Listener interceptCall( } } - private static class ContextInterceptor implements ServerInterceptor { - final BlockingQueue contexts = Queues.newLinkedBlockingDeque(); - - @Override - public Listener interceptCall( - ServerCall serverCall, - Metadata metadata, - ServerCallHandler serverCallHandler) { - contexts.add(Context.current()); - return serverCallHandler.startCall(serverCall, metadata); - } - } - private static class FakeDataService extends BigtableGrpc.BigtableImplBase { final BlockingQueue requests = Queues.newLinkedBlockingDeque(); @@ -420,17 +327,6 @@ ReadRowsRequest popLastRequest() throws InterruptedException { return requests.poll(1, TimeUnit.SECONDS); } - @Override - public void mutateRows( - MutateRowsRequest request, StreamObserver responseObserver) { - MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); - for (int i = 0; i < request.getEntriesCount(); i++) { - builder.addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(i).build()); - } - responseObserver.onNext(builder.build()); - responseObserver.onCompleted(); - } - @Override public void readRows( ReadRowsRequest request, StreamObserver responseObserver) { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallableTest.java index b6e72df9d2..a3941cd5c1 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallableTest.java @@ -106,7 +106,7 @@ public void respectsExistingTimeout() { } @Test - public void doesntClobber() { + public void usesMinimum1() { Duration attemptTimeout = Duration.ofMillis(100); Duration streamTimeout = Duration.ofMillis(200); PointReadTimeoutCallable callable = new PointReadTimeoutCallable<>(inner); @@ -122,6 +122,24 @@ public void doesntClobber() { } } + @Test + public void usesMinimum2() { + Duration attemptTimeout = Duration.ofMillis(200); + Duration streamTimeout = Duration.ofMillis(100); + PointReadTimeoutCallable callable = new PointReadTimeoutCallable<>(inner); + + for (ReadRowsRequest req : createPointReadRequests()) { + GrpcCallContext ctx = + GrpcCallContext.createDefault() + .withTimeout(attemptTimeout) + .withStreamWaitTimeout(streamTimeout); + + callable.call(req, responseObserver, ctx); + + assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(streamTimeout); + } + } + @Test public void nonPointReadsAreUntouched() { Duration streamTimeout = Duration.ofMillis(100);