diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java index 39f9adcf6b..3fe6e1099a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java @@ -29,14 +29,19 @@ public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm< // Duration to sleep on if the error is DEADLINE_EXCEEDED. public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + private boolean isRetryableStatus(Status status) { + return status.getCode() == Status.Code.INTERNAL + && status.getDescription() != null + && (status.getDescription().contains("Received unexpected EOS on DATA frame from server") + || status.getDescription().contains("Received Rst Stream")); + } + @Override public TimedAttemptSettings createNextAttempt( Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (status.getCode() == Status.Code.INTERNAL - && status.getDescription() != null - && status.getDescription().equals("Received unexpected EOS on DATA frame from server")) { + if (isRetryableStatus(status)) { return TimedAttemptSettings.newBuilder() .setGlobalSettings(prevSettings.getGlobalSettings()) .setRetryDelay(prevSettings.getRetryDelay()) @@ -54,9 +59,7 @@ public TimedAttemptSettings createNextAttempt( public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (status.getCode() == Status.Code.INTERNAL - && status.getDescription() != null - && status.getDescription().equals("Received unexpected EOS on DATA frame from server")) { + if (isRetryableStatus(status)) { return true; } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java index 9b5a3c0fdd..035a0c813b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java @@ -29,14 +29,19 @@ public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm< // Duration to sleep on if the error is DEADLINE_EXCEEDED. public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + private boolean isRetryableStatus(Status status) { + return status.getCode() == Status.Code.INTERNAL + && status.getDescription() != null + && (status.getDescription().contains("Received unexpected EOS on DATA frame from server") + || status.getDescription().contains("Received Rst Stream")); + } + @Override public TimedAttemptSettings createNextAttempt( Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (status.getCode() == Status.Code.INTERNAL - && status.getDescription() != null - && status.getDescription().equals("Received unexpected EOS on DATA frame from server")) { + if (isRetryableStatus(status)) { return TimedAttemptSettings.newBuilder() .setGlobalSettings(prevSettings.getGlobalSettings()) .setRetryDelay(prevSettings.getRetryDelay()) @@ -54,9 +59,7 @@ public TimedAttemptSettings createNextAttempt( public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (status.getCode() == Status.Code.INTERNAL - && status.getDescription() != null - && status.getDescription().equals("Received unexpected EOS on DATA frame from server")) { + if (isRetryableStatus(status)) { return true; } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java index 63ea4b0391..28d3e165b9 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java @@ -29,14 +29,19 @@ public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm< // Duration to sleep on if the error is DEADLINE_EXCEEDED. public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + private boolean isRetryableStatus(Status status) { + return status.getCode() == Status.Code.INTERNAL + && status.getDescription() != null + && (status.getDescription().contains("Received unexpected EOS on DATA frame from server") + || status.getDescription().contains("Received Rst Stream")); + } + @Override public TimedAttemptSettings createNextAttempt( Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (status.getCode() == Status.Code.INTERNAL - && status.getDescription() != null - && status.getDescription().equals("Received unexpected EOS on DATA frame from server")) { + if (isRetryableStatus(status)) { return TimedAttemptSettings.newBuilder() .setGlobalSettings(prevSettings.getGlobalSettings()) .setRetryDelay(prevSettings.getRetryDelay()) @@ -54,9 +59,7 @@ public TimedAttemptSettings createNextAttempt( public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (status.getCode() == Status.Code.INTERNAL - && status.getDescription() != null - && status.getDescription().equals("Received unexpected EOS on DATA frame from server")) { + if (isRetryableStatus(status)) { return true; } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java index e0285ac76e..df83a5a01a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java @@ -17,16 +17,20 @@ import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.GaxGrpcProperties; +import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; import com.google.api.gax.grpc.testing.MockStreamObserver; import com.google.api.gax.rpc.ApiClientHeaderProvider; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.InvalidArgumentException; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode; import com.google.protobuf.AbstractMessage; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; import java.util.Arrays; @@ -165,10 +169,38 @@ public void readRowsExceptionTest() throws Exception { @Test @SuppressWarnings("all") - public void readRowsRetryingExceptionTest() throws ExecutionException, InterruptedException { - StatusRuntimeException exception = - new StatusRuntimeException( - Status.INTERNAL.withDescription("Received unexpected EOS on DATA frame from server")); + public void readRowsRetryingEOSExceptionTest() throws ExecutionException, InterruptedException { + ApiException exception = + new InternalException( + new StatusRuntimeException( + Status.INTERNAL.withDescription( + "Received unexpected EOS on DATA frame from server")), + GrpcStatusCode.of(Code.INTERNAL), + /* retryable = */ false); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + } + + @Test + @SuppressWarnings("all") + public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, InterruptedException { + ApiException exception = + new InternalException( + new StatusRuntimeException( + Status.INTERNAL.withDescription( + "HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream")), + GrpcStatusCode.of(Code.INTERNAL), + /* retryable = */ false); mockBigQueryRead.addException(exception); long rowCount = 1340416618L; ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java index 4f7ab8f249..9dc725c9a1 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java @@ -17,11 +17,14 @@ import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.GaxGrpcProperties; +import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; import com.google.api.gax.grpc.testing.MockStreamObserver; import com.google.api.gax.rpc.ApiClientHeaderProvider; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.InvalidArgumentException; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode; @@ -40,6 +43,7 @@ import com.google.protobuf.AbstractMessage; import com.google.protobuf.Empty; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; import java.util.Arrays; @@ -295,10 +299,38 @@ public void splitReadStreamExceptionTest() throws Exception { @Test @SuppressWarnings("all") - public void readRowsRetryingExceptionTest() throws ExecutionException, InterruptedException { - StatusRuntimeException exception = - new StatusRuntimeException( - Status.INTERNAL.withDescription("Received unexpected EOS on DATA frame from server")); + public void readRowsRetryingEOSExceptionTest() throws ExecutionException, InterruptedException { + ApiException exception = + new InternalException( + new StatusRuntimeException( + Status.INTERNAL.withDescription( + "Received unexpected EOS on DATA frame from server")), + GrpcStatusCode.of(Code.INTERNAL), + /* retryable = */ false); + mockBigQueryStorage.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryStorage.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + } + + @Test + @SuppressWarnings("all") + public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, InterruptedException { + ApiException exception = + new InternalException( + new StatusRuntimeException( + Status.INTERNAL.withDescription( + "HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream")), + GrpcStatusCode.of(Code.INTERNAL), + /* retryable = */ false); mockBigQueryStorage.addException(exception); long rowCount = 1340416618L; ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java index f5f8b24086..90bea22573 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java @@ -17,16 +17,20 @@ import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.GaxGrpcProperties; +import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; import com.google.api.gax.grpc.testing.MockStreamObserver; import com.google.api.gax.rpc.ApiClientHeaderProvider; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.InvalidArgumentException; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode; import com.google.protobuf.AbstractMessage; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; import java.util.Arrays; @@ -165,10 +169,38 @@ public void readRowsExceptionTest() throws Exception { @Test @SuppressWarnings("all") - public void readRowsRetryingExceptionTest() throws ExecutionException, InterruptedException { - StatusRuntimeException exception = - new StatusRuntimeException( - Status.INTERNAL.withDescription("Received unexpected EOS on DATA frame from server")); + public void readRowsRetryingEOSExceptionTest() throws ExecutionException, InterruptedException { + ApiException exception = + new InternalException( + new StatusRuntimeException( + Status.INTERNAL.withDescription( + "Received unexpected EOS on DATA frame from server")), + GrpcStatusCode.of(Code.INTERNAL), + /* retryable = */ false); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + } + + @Test + @SuppressWarnings("all") + public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, InterruptedException { + ApiException exception = + new InternalException( + new StatusRuntimeException( + Status.INTERNAL.withDescription( + "HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream")), + GrpcStatusCode.of(Code.INTERNAL), + /* retryable = */ false); mockBigQueryRead.addException(exception); long rowCount = 1340416618L; ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build();