From 9b3c6013fef396fcc923e53c13f73a03a48c9c02 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Thu, 3 Jun 2021 12:00:24 -0400 Subject: [PATCH] feat: promote stream wait timeouts to deadlines for point reads (#848) Special case point reads to use grpc's deadlines instead of relying on the watchdog --- .../data/v2/stub/EnhancedBigtableStub.java | 9 +- .../readrows/PointReadTimeoutCallable.java | 86 ++++++++ .../PointReadTimeoutCallableTest.java | 183 ++++++++++++++++++ 3 files changed, 276 insertions(+), 2 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallableTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index c08f0aec1..55e928d59 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -75,6 +75,7 @@ import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable; +import com.google.cloud.bigtable.data.v2.stub.readrows.PointReadTimeoutCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsConvertExceptionCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; @@ -336,7 +337,7 @@ public UnaryCallable createReadRowCallable(RowAdapter private ServerStreamingCallable createReadRowsBaseCallable( ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) { - ServerStreamingCallable base = + final ServerStreamingCallable base = GrpcRawCallableFactory.createServerStreamingCallable( GrpcCallSettings.newBuilder() .setMethodDescriptor(BigtableGrpc.getReadRowsMethod()) @@ -352,11 +353,15 @@ public Map extract(ReadRowsRequest readRowsRequest) { .build(), readRowsSettings.getRetryableCodes()); + // Promote streamWaitTimeout to deadline for point reads + ServerStreamingCallable withPointTimeouts = + new PointReadTimeoutCallable<>(base); + // Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code // which by default is not retryable. Convert the exception so it can be retried in the client. ServerStreamingCallable convertException = - new ReadRowsConvertExceptionCallable<>(base); + new ReadRowsConvertExceptionCallable<>(withPointTimeouts); ServerStreamingCallable merging = new RowMergingCallable<>(convertException, rowAdapter); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable.java new file mode 100644 index 000000000..7ce0e8b7c --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable.java @@ -0,0 +1,86 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; + +import com.google.api.core.InternalApi; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.bigtable.v2.ReadRowsRequest; +import javax.annotation.Nullable; +import org.threeten.bp.Duration; + +/** + * Specialization of ReadRows streams for point reads. + * + *

Under normal circumstances, the ReadRows RPC can't make any assumptions about deadlines. In + * general case the end user can be issuing a full table scan. However, when dealing with point + * reads, the client can make assumptions and promote the per row timeout to be a per attempt + * timeout. + * + *

This callable will check if the request is a point read and promote the timeout to be a + * deadline. + */ +@InternalApi +public class PointReadTimeoutCallable + extends ServerStreamingCallable { + private final ServerStreamingCallable inner; + + public PointReadTimeoutCallable(ServerStreamingCallable inner) { + this.inner = inner; + } + + @Override + public void call(ReadRowsRequest request, ResponseObserver observer, ApiCallContext ctx) { + if (isPointRead(request)) { + Duration effectiveTimeout = getEffectivePointReadTimeout(ctx); + if (effectiveTimeout != null) { + ctx = ctx.withTimeout(effectiveTimeout); + } + } + inner.call(request, observer, ctx); + } + + private boolean isPointRead(ReadRowsRequest request) { + if (request.getRowsLimit() == 1) { + return true; + } + if (!request.getRows().getRowRangesList().isEmpty()) { + return false; + } + return request.getRows().getRowKeysCount() == 1; + } + + /** + * Extracts the effective timeout for a point read. + * + *

The effective time is the minimum of a streamWaitTimeout and a user set attempt timeout. + */ + @Nullable + private Duration getEffectivePointReadTimeout(ApiCallContext ctx) { + Duration streamWaitTimeout = ctx.getStreamWaitTimeout(); + Duration attemptTimeout = ctx.getTimeout(); + + if (streamWaitTimeout == null) { + return attemptTimeout; + } + + if (attemptTimeout == null) { + return streamWaitTimeout; + } + return (attemptTimeout.compareTo(streamWaitTimeout) <= 0) ? attemptTimeout : streamWaitTimeout; + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallableTest.java new file mode 100644 index 000000000..a3941cd5c --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallableTest.java @@ -0,0 +1,183 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.RowSet; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import java.util.Arrays; +import java.util.List; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class PointReadTimeoutCallableTest { + @Rule public final MockitoRule moo = MockitoJUnit.rule(); + + @Mock private ServerStreamingCallable inner; + private ArgumentCaptor ctxCaptor; + @Mock private ResponseObserver responseObserver; + + @Before + public void setUp() throws Exception { + ctxCaptor = ArgumentCaptor.forClass(ApiCallContext.class); + + Mockito.doNothing() + .when(inner) + .call( + Mockito.isA(ReadRowsRequest.class), + Mockito.any(ResponseObserver.class), + ctxCaptor.capture()); + } + + @Test + public void promotesStreamWaitTimeout() { + Duration duration = Duration.ofMillis(100); + PointReadTimeoutCallable callable = new PointReadTimeoutCallable<>(inner); + + for (ReadRowsRequest req : createPointReadRequests()) { + callable.call( + req, responseObserver, GrpcCallContext.createDefault().withStreamWaitTimeout(duration)); + + assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(duration); + } + } + + @Test + public void promotesStreamWaitTimeoutForRowLimit() { + Duration duration = Duration.ofMillis(100); + PointReadTimeoutCallable callable = new PointReadTimeoutCallable<>(inner); + + for (ReadRowsRequest req : createPointReadRequests()) { + callable.call( + createRowsLimitRequest(), + responseObserver, + GrpcCallContext.createDefault().withStreamWaitTimeout(duration)); + + assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(duration); + } + } + + @Test + public void respectsExistingTimeout() { + Duration duration = Duration.ofMillis(100); + PointReadTimeoutCallable callable = new PointReadTimeoutCallable<>(inner); + + List requests = + ImmutableList.builder() + .addAll(createPointReadRequests()) + .add(ReadRowsRequest.getDefaultInstance()) + .build(); + + for (ReadRowsRequest req : requests) { + callable.call(req, responseObserver, GrpcCallContext.createDefault().withTimeout(duration)); + assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(duration); + } + } + + @Test + public void usesMinimum1() { + Duration attemptTimeout = Duration.ofMillis(100); + Duration streamTimeout = Duration.ofMillis(200); + PointReadTimeoutCallable callable = new PointReadTimeoutCallable<>(inner); + + for (ReadRowsRequest req : createPointReadRequests()) { + GrpcCallContext ctx = + GrpcCallContext.createDefault() + .withTimeout(attemptTimeout) + .withStreamWaitTimeout(streamTimeout); + callable.call(req, responseObserver, ctx); + + assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(attemptTimeout); + } + } + + @Test + public void usesMinimum2() { + Duration attemptTimeout = Duration.ofMillis(200); + Duration streamTimeout = Duration.ofMillis(100); + PointReadTimeoutCallable callable = new PointReadTimeoutCallable<>(inner); + + for (ReadRowsRequest req : createPointReadRequests()) { + GrpcCallContext ctx = + GrpcCallContext.createDefault() + .withTimeout(attemptTimeout) + .withStreamWaitTimeout(streamTimeout); + + callable.call(req, responseObserver, ctx); + + assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(streamTimeout); + } + } + + @Test + public void nonPointReadsAreUntouched() { + Duration streamTimeout = Duration.ofMillis(100); + PointReadTimeoutCallable callable = new PointReadTimeoutCallable<>(inner); + + List requests = + Arrays.asList( + ReadRowsRequest.getDefaultInstance(), + ReadRowsRequest.newBuilder() + .setRows( + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("a")) + .addRowKeys(ByteString.copyFromUtf8("ab"))) + .build(), + ReadRowsRequest.newBuilder() + .setRows(RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance())) + .build()); + + for (ReadRowsRequest req : requests) { + callable.call( + req, + responseObserver, + GrpcCallContext.createDefault().withStreamWaitTimeout(streamTimeout)); + assertThat(ctxCaptor.getValue().getTimeout()).isNull(); + } + } + + private List createPointReadRequests() { + return Arrays.asList(createRowsLimitRequest(), createRowKeyRequest()); + } + + private ReadRowsRequest createRowsLimitRequest() { + return ReadRowsRequest.newBuilder().setRowsLimit(1).build(); + } + + private ReadRowsRequest createRowKeyRequest() { + return ReadRowsRequest.newBuilder() + .setRows(RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("key"))) + .build(); + } +}