Skip to content

Commit

Permalink
feat: promote stream wait timeouts to deadlines for point reads (#848)
Browse files Browse the repository at this point in the history
Special case point reads to use grpc's deadlines instead of relying on the watchdog
  • Loading branch information
igorbernstein2 committed Jun 3, 2021
1 parent 66a9c9e commit 9b3c601
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 2 deletions.
Expand Up @@ -75,6 +75,7 @@
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.PointReadTimeoutCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsConvertExceptionCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
Expand Down Expand Up @@ -336,7 +337,7 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRowsBaseCallable(
ServerStreamingCallSettings<ReqT, Row> readRowsSettings, RowAdapter<RowT> rowAdapter) {

ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> base =
final ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<ReadRowsRequest, ReadRowsResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getReadRowsMethod())
Expand All @@ -352,11 +353,15 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
.build(),
readRowsSettings.getRetryableCodes());

// Promote streamWaitTimeout to deadline for point reads
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> withPointTimeouts =
new PointReadTimeoutCallable<>(base);

// Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> convertException =
new ReadRowsConvertExceptionCallable<>(base);
new ReadRowsConvertExceptionCallable<>(withPointTimeouts);

ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(convertException, rowAdapter);
Expand Down
@@ -0,0 +1,86 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.readrows;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.bigtable.v2.ReadRowsRequest;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* Specialization of ReadRows streams for point reads.
*
* <p>Under normal circumstances, the ReadRows RPC can't make any assumptions about deadlines. In
* general case the end user can be issuing a full table scan. However, when dealing with point
* reads, the client can make assumptions and promote the per row timeout to be a per attempt
* timeout.
*
* <p>This callable will check if the request is a point read and promote the timeout to be a
* deadline.
*/
@InternalApi
public class PointReadTimeoutCallable<RespT>
extends ServerStreamingCallable<ReadRowsRequest, RespT> {
private final ServerStreamingCallable<ReadRowsRequest, RespT> inner;

public PointReadTimeoutCallable(ServerStreamingCallable<ReadRowsRequest, RespT> inner) {
this.inner = inner;
}

@Override
public void call(ReadRowsRequest request, ResponseObserver<RespT> observer, ApiCallContext ctx) {
if (isPointRead(request)) {
Duration effectiveTimeout = getEffectivePointReadTimeout(ctx);
if (effectiveTimeout != null) {
ctx = ctx.withTimeout(effectiveTimeout);
}
}
inner.call(request, observer, ctx);
}

private boolean isPointRead(ReadRowsRequest request) {
if (request.getRowsLimit() == 1) {
return true;
}
if (!request.getRows().getRowRangesList().isEmpty()) {
return false;
}
return request.getRows().getRowKeysCount() == 1;
}

/**
* Extracts the effective timeout for a point read.
*
* <p>The effective time is the minimum of a streamWaitTimeout and a user set attempt timeout.
*/
@Nullable
private Duration getEffectivePointReadTimeout(ApiCallContext ctx) {
Duration streamWaitTimeout = ctx.getStreamWaitTimeout();
Duration attemptTimeout = ctx.getTimeout();

if (streamWaitTimeout == null) {
return attemptTimeout;
}

if (attemptTimeout == null) {
return streamWaitTimeout;
}
return (attemptTimeout.compareTo(streamWaitTimeout) <= 0) ? attemptTimeout : streamWaitTimeout;
}
}
@@ -0,0 +1,183 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.readrows;

import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.RowRange;
import com.google.bigtable.v2.RowSet;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.util.Arrays;
import java.util.List;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
public class PointReadTimeoutCallableTest {
@Rule public final MockitoRule moo = MockitoJUnit.rule();

@Mock private ServerStreamingCallable<ReadRowsRequest, Object> inner;
private ArgumentCaptor<ApiCallContext> ctxCaptor;
@Mock private ResponseObserver<Object> responseObserver;

@Before
public void setUp() throws Exception {
ctxCaptor = ArgumentCaptor.forClass(ApiCallContext.class);

Mockito.doNothing()
.when(inner)
.call(
Mockito.isA(ReadRowsRequest.class),
Mockito.any(ResponseObserver.class),
ctxCaptor.capture());
}

@Test
public void promotesStreamWaitTimeout() {
Duration duration = Duration.ofMillis(100);
PointReadTimeoutCallable<Object> callable = new PointReadTimeoutCallable<>(inner);

for (ReadRowsRequest req : createPointReadRequests()) {
callable.call(
req, responseObserver, GrpcCallContext.createDefault().withStreamWaitTimeout(duration));

assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(duration);
}
}

@Test
public void promotesStreamWaitTimeoutForRowLimit() {
Duration duration = Duration.ofMillis(100);
PointReadTimeoutCallable<Object> callable = new PointReadTimeoutCallable<>(inner);

for (ReadRowsRequest req : createPointReadRequests()) {
callable.call(
createRowsLimitRequest(),
responseObserver,
GrpcCallContext.createDefault().withStreamWaitTimeout(duration));

assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(duration);
}
}

@Test
public void respectsExistingTimeout() {
Duration duration = Duration.ofMillis(100);
PointReadTimeoutCallable<Object> callable = new PointReadTimeoutCallable<>(inner);

List<ReadRowsRequest> requests =
ImmutableList.<ReadRowsRequest>builder()
.addAll(createPointReadRequests())
.add(ReadRowsRequest.getDefaultInstance())
.build();

for (ReadRowsRequest req : requests) {
callable.call(req, responseObserver, GrpcCallContext.createDefault().withTimeout(duration));
assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(duration);
}
}

@Test
public void usesMinimum1() {
Duration attemptTimeout = Duration.ofMillis(100);
Duration streamTimeout = Duration.ofMillis(200);
PointReadTimeoutCallable<Object> callable = new PointReadTimeoutCallable<>(inner);

for (ReadRowsRequest req : createPointReadRequests()) {
GrpcCallContext ctx =
GrpcCallContext.createDefault()
.withTimeout(attemptTimeout)
.withStreamWaitTimeout(streamTimeout);
callable.call(req, responseObserver, ctx);

assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(attemptTimeout);
}
}

@Test
public void usesMinimum2() {
Duration attemptTimeout = Duration.ofMillis(200);
Duration streamTimeout = Duration.ofMillis(100);
PointReadTimeoutCallable<Object> callable = new PointReadTimeoutCallable<>(inner);

for (ReadRowsRequest req : createPointReadRequests()) {
GrpcCallContext ctx =
GrpcCallContext.createDefault()
.withTimeout(attemptTimeout)
.withStreamWaitTimeout(streamTimeout);

callable.call(req, responseObserver, ctx);

assertThat(ctxCaptor.getValue().getTimeout()).isEqualTo(streamTimeout);
}
}

@Test
public void nonPointReadsAreUntouched() {
Duration streamTimeout = Duration.ofMillis(100);
PointReadTimeoutCallable<Object> callable = new PointReadTimeoutCallable<>(inner);

List<ReadRowsRequest> requests =
Arrays.<ReadRowsRequest>asList(
ReadRowsRequest.getDefaultInstance(),
ReadRowsRequest.newBuilder()
.setRows(
RowSet.newBuilder()
.addRowKeys(ByteString.copyFromUtf8("a"))
.addRowKeys(ByteString.copyFromUtf8("ab")))
.build(),
ReadRowsRequest.newBuilder()
.setRows(RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()))
.build());

for (ReadRowsRequest req : requests) {
callable.call(
req,
responseObserver,
GrpcCallContext.createDefault().withStreamWaitTimeout(streamTimeout));
assertThat(ctxCaptor.getValue().getTimeout()).isNull();
}
}

private List<ReadRowsRequest> createPointReadRequests() {
return Arrays.asList(createRowsLimitRequest(), createRowKeyRequest());
}

private ReadRowsRequest createRowsLimitRequest() {
return ReadRowsRequest.newBuilder().setRowsLimit(1).build();
}

private ReadRowsRequest createRowKeyRequest() {
return ReadRowsRequest.newBuilder()
.setRows(RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("key")))
.build();
}
}

0 comments on commit 9b3c601

Please sign in to comment.