From 4614912b6ea76c9057b1a4cbf869eb3145bce18f Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Thu, 17 Jun 2021 13:14:02 -0400 Subject: [PATCH] revert: Revert "feat: promote stream wait timeouts to deadlines for point reads" (#876) Reverts googleapis/java-bigtable#848 --- .../clirr-ignored-differences.xml | 7 +- .../data/v2/stub/EnhancedBigtableStub.java | 9 +- .../readrows/PointReadTimeoutCallable.java | 86 -------- .../PointReadTimeoutCallableTest.java | 183 ------------------ 4 files changed, 8 insertions(+), 277 deletions(-) delete mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable.java delete mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallableTest.java diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml index ab921a973..c7bc6fef5 100644 --- a/google-cloud-bigtable/clirr-ignored-differences.xml +++ b/google-cloud-bigtable/clirr-ignored-differences.xml @@ -23,4 +23,9 @@ 8001 com/google/cloud/bigtable/gaxx/tracing/WrappedTracerFactory* - \ No newline at end of file + + + 8001 + com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable + + diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 55e928d59..c08f0aec1 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -75,7 +75,6 @@ import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable; -import com.google.cloud.bigtable.data.v2.stub.readrows.PointReadTimeoutCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsConvertExceptionCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; @@ -337,7 +336,7 @@ public UnaryCallable createReadRowCallable(RowAdapter private ServerStreamingCallable createReadRowsBaseCallable( ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) { - final ServerStreamingCallable base = + ServerStreamingCallable base = GrpcRawCallableFactory.createServerStreamingCallable( GrpcCallSettings.newBuilder() .setMethodDescriptor(BigtableGrpc.getReadRowsMethod()) @@ -353,15 +352,11 @@ public Map extract(ReadRowsRequest readRowsRequest) { .build(), readRowsSettings.getRetryableCodes()); - // Promote streamWaitTimeout to deadline for point reads - ServerStreamingCallable withPointTimeouts = - new PointReadTimeoutCallable<>(base); - // Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code // which by default is not retryable. Convert the exception so it can be retried in the client. ServerStreamingCallable convertException = - new ReadRowsConvertExceptionCallable<>(withPointTimeouts); + new ReadRowsConvertExceptionCallable<>(base); ServerStreamingCallable merging = new RowMergingCallable<>(convertException, rowAdapter); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable.java deleted file mode 100644 index 7ce0e8b7c..000000000 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.stub.readrows; - -import com.google.api.core.InternalApi; -import com.google.api.gax.rpc.ApiCallContext; -import com.google.api.gax.rpc.ResponseObserver; -import com.google.api.gax.rpc.ServerStreamingCallable; -import com.google.bigtable.v2.ReadRowsRequest; -import javax.annotation.Nullable; -import org.threeten.bp.Duration; - -/** - * Specialization of ReadRows streams for point reads. - * - *

Under normal circumstances, the ReadRows RPC can't make any assumptions about deadlines. In - * general case the end user can be issuing a full table scan. However, when dealing with point - * reads, the client can make assumptions and promote the per row timeout to be a per attempt - * timeout. - * - *

This callable will check if the request is a point read and promote the timeout to be a - * deadline. - */ -@InternalApi -public class PointReadTimeoutCallable - extends ServerStreamingCallable { - private final ServerStreamingCallable inner; - - public PointReadTimeoutCallable(ServerStreamingCallable inner) { - this.inner = inner; - } - - @Override - public void call(ReadRowsRequest request, ResponseObserver observer, ApiCallContext ctx) { - if (isPointRead(request)) { - Duration effectiveTimeout = getEffectivePointReadTimeout(ctx); - if (effectiveTimeout != null) { - ctx = ctx.withTimeout(effectiveTimeout); - } - } - inner.call(request, observer, ctx); - } - - private boolean isPointRead(ReadRowsRequest request) { - if (request.getRowsLimit() == 1) { - return true; - } - if (!request.getRows().getRowRangesList().isEmpty()) { - return false; - } - return request.getRows().getRowKeysCount() == 1; - } - - /** - * Extracts the effective timeout for a point read. - * - *

The effective time is the minimum of a streamWaitTimeout and a user set attempt timeout. - */ - @Nullable - private Duration getEffectivePointReadTimeout(ApiCallContext ctx) { - Duration streamWaitTimeout = ctx.getStreamWaitTimeout(); - Duration attemptTimeout = ctx.getTimeout(); - - if (streamWaitTimeout == null) { - return attemptTimeout; - } - - if (attemptTimeout == null) { - return streamWaitTimeout; - } - return (attemptTimeout.compareTo(streamWaitTimeout) <= 0) ? attemptTimeout : streamWaitTimeout; - } -} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallableTest.java deleted file mode 100644 index a3941cd5c..000000000 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallableTest.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.data.v2.stub.readrows; - -import static com.google.common.truth.Truth.assertThat; - -import com.google.api.gax.grpc.GrpcCallContext; -import com.google.api.gax.rpc.ApiCallContext; -import com.google.api.gax.rpc.ResponseObserver; -import com.google.api.gax.rpc.ServerStreamingCallable; -import com.google.bigtable.v2.ReadRowsRequest; -import com.google.bigtable.v2.RowRange; -import com.google.bigtable.v2.RowSet; -import com.google.common.collect.ImmutableList; -import com.google.protobuf.ByteString; -import java.util.Arrays; -import java.util.List; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; -import org.threeten.bp.Duration; - -@RunWith(JUnit4.class) -public class PointReadTimeoutCallableTest { - @Rule public final MockitoRule moo = MockitoJUnit.rule(); - - @Mock private ServerStreamingCallable inner; - private ArgumentCaptor ctxCaptor; - @Mock private ResponseObserver responseObserver; - - @Before - public void setUp() throws Exception { - ctxCaptor = ArgumentCaptor.forClass(ApiCallContext.class); - - Mockito.doNothing() - .when(inner) - .call( - Mockito.isA(ReadRowsRequest.class), - Mockito.any(ResponseObserver.class), - ctxCaptor.capture()); - } - - @Test - public void promotesStreamWaitTimeout() { - Duration duration = Duration.ofMillis(100); - PointReadTimeoutCallable callable = new PointReadTimeoutCallable<>(inner); - - for (ReadRowsRequest req : createPointReadRequests()) { - callable.call( - req, responseObserver, GrpcCallContext.createDefault().withStreamWaitTimeout(duration)); - - assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(duration); - } - } - - @Test - public void promotesStreamWaitTimeoutForRowLimit() { - Duration duration = Duration.ofMillis(100); - PointReadTimeoutCallable callable = new PointReadTimeoutCallable<>(inner); - - for (ReadRowsRequest req : createPointReadRequests()) { - callable.call( - createRowsLimitRequest(), - responseObserver, - GrpcCallContext.createDefault().withStreamWaitTimeout(duration)); - - assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(duration); - } - } - - @Test - public void respectsExistingTimeout() { - Duration duration = Duration.ofMillis(100); - PointReadTimeoutCallable callable = new PointReadTimeoutCallable<>(inner); - - List requests = - ImmutableList.builder() - .addAll(createPointReadRequests()) - .add(ReadRowsRequest.getDefaultInstance()) - .build(); - - for (ReadRowsRequest req : requests) { - callable.call(req, responseObserver, GrpcCallContext.createDefault().withTimeout(duration)); - assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(duration); - } - } - - @Test - public void usesMinimum1() { - Duration attemptTimeout = Duration.ofMillis(100); - Duration streamTimeout = Duration.ofMillis(200); - PointReadTimeoutCallable callable = new PointReadTimeoutCallable<>(inner); - - for (ReadRowsRequest req : createPointReadRequests()) { - GrpcCallContext ctx = - GrpcCallContext.createDefault() - .withTimeout(attemptTimeout) - .withStreamWaitTimeout(streamTimeout); - callable.call(req, responseObserver, ctx); - - assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(attemptTimeout); - } - } - - @Test - public void usesMinimum2() { - Duration attemptTimeout = Duration.ofMillis(200); - Duration streamTimeout = Duration.ofMillis(100); - PointReadTimeoutCallable callable = new PointReadTimeoutCallable<>(inner); - - for (ReadRowsRequest req : createPointReadRequests()) { - GrpcCallContext ctx = - GrpcCallContext.createDefault() - .withTimeout(attemptTimeout) - .withStreamWaitTimeout(streamTimeout); - - callable.call(req, responseObserver, ctx); - - assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(streamTimeout); - } - } - - @Test - public void nonPointReadsAreUntouched() { - Duration streamTimeout = Duration.ofMillis(100); - PointReadTimeoutCallable callable = new PointReadTimeoutCallable<>(inner); - - List requests = - Arrays.asList( - ReadRowsRequest.getDefaultInstance(), - ReadRowsRequest.newBuilder() - .setRows( - RowSet.newBuilder() - .addRowKeys(ByteString.copyFromUtf8("a")) - .addRowKeys(ByteString.copyFromUtf8("ab"))) - .build(), - ReadRowsRequest.newBuilder() - .setRows(RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance())) - .build()); - - for (ReadRowsRequest req : requests) { - callable.call( - req, - responseObserver, - GrpcCallContext.createDefault().withStreamWaitTimeout(streamTimeout)); - assertThat(ctxCaptor.getValue().getTimeout()).isNull(); - } - } - - private List createPointReadRequests() { - return Arrays.asList(createRowsLimitRequest(), createRowKeyRequest()); - } - - private ReadRowsRequest createRowsLimitRequest() { - return ReadRowsRequest.newBuilder().setRowsLimit(1).build(); - } - - private ReadRowsRequest createRowKeyRequest() { - return ReadRowsRequest.newBuilder() - .setRows(RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("key"))) - .build(); - } -}