From b09a21c1dd1a05b68bfd3a0134089ba32dca1774 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 27 Jan 2021 12:45:18 -0500 Subject: [PATCH] fix: Retry "received rst stream" (#586) * fix: Retry "received rst stream" * Check if exception is InternalException instead of using fromThrowable --- .../data/v2/stub/EnhancedBigtableStub.java | 9 +- .../ReadRowsConvertExceptionCallable.java | 87 +++++++++++++++++++ .../v2/stub/readrows/ReadRowsRetryTest.java | 37 ++++++++ 3 files changed, 132 insertions(+), 1 deletion(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 448390396..8f2505c58 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -75,6 +75,7 @@ import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor; +import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsConvertExceptionCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable; import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable; @@ -345,8 +346,14 @@ public Map extract(ReadRowsRequest readRowsRequest) { .build(), readRowsSettings.getRetryableCodes()); + // Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and + // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code + // which by default is not retryable. Convert the exception so it can be retried in the client. + ServerStreamingCallable convertException = + new ReadRowsConvertExceptionCallable<>(base); + ServerStreamingCallable merging = - new RowMergingCallable<>(base, rowAdapter); + new RowMergingCallable<>(convertException, rowAdapter); // Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the inner // ReadRowsRequest -> ReadRowsResponse callable). diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable.java new file mode 100644 index 000000000..69dd2b5b8 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable.java @@ -0,0 +1,87 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.readrows; + +import com.google.api.core.InternalApi; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.InternalException; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; + +/** + * This callable converts the "Received rst stream" exception into a retryable {@link ApiException}. + */ +@InternalApi +public final class ReadRowsConvertExceptionCallable + extends ServerStreamingCallable { + + private final ServerStreamingCallable innerCallable; + + public ReadRowsConvertExceptionCallable( + ServerStreamingCallable innerCallable) { + this.innerCallable = innerCallable; + } + + @Override + public void call( + ReadRowsRequest request, ResponseObserver responseObserver, ApiCallContext context) { + ReadRowsConvertExceptionResponseObserver observer = + new ReadRowsConvertExceptionResponseObserver<>(responseObserver); + innerCallable.call(request, observer, context); + } + + private class ReadRowsConvertExceptionResponseObserver implements ResponseObserver { + + private final ResponseObserver outerObserver; + + ReadRowsConvertExceptionResponseObserver(ResponseObserver outerObserver) { + this.outerObserver = outerObserver; + } + + @Override + public void onStart(StreamController controller) { + outerObserver.onStart(controller); + } + + @Override + public void onResponse(RowT response) { + outerObserver.onResponse(response); + } + + @Override + public void onError(Throwable t) { + outerObserver.onError(convertException(t)); + } + + @Override + public void onComplete() { + outerObserver.onComplete(); + } + } + + private Throwable convertException(Throwable t) { + // Long lived connections sometimes are disconnected via an RST frame. This error is + // transient and should be retried. + if (t instanceof InternalException) { + if (t.getMessage() != null && t.getMessage().contains("Received Rst stream")) { + return new InternalException(t, ((InternalException) t).getStatusCode(), true); + } + } + return t; + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java index 96a22f518..1536b01e0 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsRetryTest.java @@ -16,8 +16,11 @@ package com.google.cloud.bigtable.data.v2.stub.readrows; import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.ServerStream; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.ReadRowsRequest; @@ -39,6 +42,7 @@ import com.google.protobuf.StringValue; import io.grpc.Status; import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcServerRule; import java.io.IOException; @@ -260,6 +264,30 @@ public void retryWithLastScannedKeyTest() { Truth.assertThat(actualResults).containsExactly("r7").inOrder(); } + @Test + public void retryRstStreamExceptionTest() { + ApiException exception = + new InternalException( + new StatusRuntimeException( + Status.INTERNAL.withDescription( + "HTTP/2 error code: INTERNAL_ERROR\nReceived Rst stream")), + GrpcStatusCode.of(Code.INTERNAL), + false); + service.expectations.add( + RpcExpectation.create() + .expectRequest("k1") + .expectRequest(Range.closedOpen("r1", "r3")) + .respondWithException(Code.INTERNAL, exception)); + service.expectations.add( + RpcExpectation.create() + .expectRequest("k1") + .expectRequest(Range.closedOpen("r1", "r3")) + .respondWith("k1", "r1", "r2")); + + List actualResults = getResults(Query.create(TABLE_ID).rowKey("k1").range("r1", "r3")); + Truth.assertThat(actualResults).containsExactly("k1", "r1", "r2").inOrder(); + } + private List getResults(Query query) { ServerStream actualRows = client.readRows(query); List actualValues = Lists.newArrayList(); @@ -292,6 +320,8 @@ public void readRows( } if (expectedRpc.statusCode.toStatus().isOk()) { responseObserver.onCompleted(); + } else if (expectedRpc.exception != null) { + responseObserver.onError(expectedRpc.exception); } else { responseObserver.onError(expectedRpc.statusCode.toStatus().asRuntimeException()); } @@ -301,6 +331,7 @@ public void readRows( private static class RpcExpectation { ReadRowsRequest.Builder requestBuilder; Status.Code statusCode; + ApiException exception; List responses; private RpcExpectation() { @@ -370,6 +401,12 @@ RpcExpectation respondWithStatus(Status.Code code) { return this; } + RpcExpectation respondWithException(Status.Code code, ApiException exception) { + this.statusCode = code; + this.exception = exception; + return this; + } + RpcExpectation respondWith(String... responses) { for (String response : responses) { this.responses.add(