From cf1ab06b4324219d2558bef6d30389dbf5d37ab7 Mon Sep 17 00:00:00 2001 From: Praful Makani Date: Fri, 29 May 2020 21:04:22 +0530 Subject: [PATCH] fix: add retry logic for readrows v1beta2 (#315) --- google-cloud-bigquerystorage/pom.xml | 1 - .../stub/EnhancedBigQueryReadStub.java | 69 +++- .../readrows/ApiResultRetryAlgorithm.java | 65 ++++ .../readrows/ReadRowsAttemptCallable.java | 326 ++++++++++++++++++ .../readrows/ReadRowsRetryingCallable.java | 90 +++++ .../v1beta2/BigQueryReadClientTest.java | 20 ++ 6 files changed, 567 insertions(+), 4 deletions(-) create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ReadRowsAttemptCallable.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ReadRowsRetryingCallable.java diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index 1e5f35c68d..7791c9626a 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -174,7 +174,6 @@ com.google.api.grpc grpc-google-cloud-bigquerystorage-v1beta2 - test com.google.api.grpc diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java index 9452fe62a6..351fd21c4f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java @@ -17,16 +17,31 @@ import com.google.api.core.InternalApi; import com.google.api.gax.core.BackgroundResource; +import com.google.api.gax.grpc.GrpcCallSettings; +import com.google.api.gax.grpc.GrpcRawCallableFactory; +import com.google.api.gax.retrying.ExponentialRetryAlgorithm; +import com.google.api.gax.retrying.ScheduledRetryingExecutor; +import com.google.api.gax.retrying.StreamingRetryAlgorithm; +import com.google.api.gax.rpc.Callables; import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.RequestParamsExtractor; +import com.google.api.gax.rpc.ServerStreamingCallSettings; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.tracing.SpanName; +import com.google.api.gax.tracing.TracedServerStreamingCallable; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc; import com.google.cloud.bigquery.storage.v1beta2.CreateReadSessionRequest; import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1beta2.ReadRowsResponse; import com.google.cloud.bigquery.storage.v1beta2.ReadSession; import com.google.cloud.bigquery.storage.v1beta2.SplitReadStreamRequest; import com.google.cloud.bigquery.storage.v1beta2.SplitReadStreamResponse; +import com.google.cloud.bigquery.storage.v1beta2.stub.readrows.ApiResultRetryAlgorithm; +import com.google.cloud.bigquery.storage.v1beta2.stub.readrows.ReadRowsRetryingCallable; +import com.google.common.collect.ImmutableMap; import java.io.IOException; +import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -35,7 +50,11 @@ *

This class is for advanced usage and reflects the underlying API directly. */ public class EnhancedBigQueryReadStub implements BackgroundResource { + + private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage"; private final GrpcBigQueryReadStub stub; + private final BigQueryReadStubSettings stubSettings; + private final ClientContext context; public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings) throws IOException { @@ -69,12 +88,15 @@ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings s BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build(); ClientContext clientContext = ClientContext.create(baseSettings); GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext); - return new EnhancedBigQueryReadStub(stub); + return new EnhancedBigQueryReadStub(stub, baseSettings, clientContext); } @InternalApi("Visible for testing") - EnhancedBigQueryReadStub(GrpcBigQueryReadStub stub) { + EnhancedBigQueryReadStub( + GrpcBigQueryReadStub stub, BigQueryReadStubSettings stubSettings, ClientContext context) { this.stub = stub; + this.stubSettings = stubSettings; + this.context = context; } public UnaryCallable createReadSessionCallable() { @@ -82,7 +104,48 @@ public UnaryCallable createReadSessionCal } public ServerStreamingCallable readRowsCallable() { - return stub.readRowsCallable(); + ServerStreamingCallable innerCallable = + GrpcRawCallableFactory.createServerStreamingCallable( + GrpcCallSettings.newBuilder() + .setMethodDescriptor(BigQueryReadGrpc.getReadRowsMethod()) + .setParamsExtractor( + new RequestParamsExtractor() { + @Override + public Map extract(ReadRowsRequest request) { + return ImmutableMap.of( + "read_stream", String.valueOf(request.getReadStream())); + } + }) + .build(), + stubSettings.readRowsSettings().getRetryableCodes()); + ServerStreamingCallSettings callSettings = + stubSettings.readRowsSettings(); + + StreamingRetryAlgorithm retryAlgorithm = + new StreamingRetryAlgorithm<>( + new ApiResultRetryAlgorithm(), + new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock())); + + ScheduledRetryingExecutor retryingExecutor = + new ScheduledRetryingExecutor<>(retryAlgorithm, context.getExecutor()); + + if (context.getStreamWatchdog() != null) { + innerCallable = Callables.watched(innerCallable, callSettings, context); + } + + ReadRowsRetryingCallable outerCallable = + new ReadRowsRetryingCallable( + context.getDefaultCallContext(), + innerCallable, + retryingExecutor, + callSettings.getResumptionStrategy()); + + ServerStreamingCallable traced = + new TracedServerStreamingCallable<>( + outerCallable, + context.getTracerFactory(), + SpanName.of(TRACING_OUTER_CLIENT_NAME, "ReadRows")); + return traced.withDefaultCallContext(context.getDefaultCallContext()); } public UnaryCallable splitReadStreamCallable() { diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java new file mode 100644 index 0000000000..63ea4b0391 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java @@ -0,0 +1,65 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1beta2.stub.readrows; + +import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.retrying.TimedAttemptSettings; +import com.google.api.gax.rpc.ApiException; +import io.grpc.Status; +import org.threeten.bp.Duration; + +/** For internal use, public for technical reasons. */ +@InternalApi +public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm { + // Duration to sleep on if the error is DEADLINE_EXCEEDED. + public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + + @Override + public TimedAttemptSettings createNextAttempt( + Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { + if (prevThrowable != null) { + Status status = Status.fromThrowable(prevThrowable); + if (status.getCode() == Status.Code.INTERNAL + && status.getDescription() != null + && status.getDescription().equals("Received unexpected EOS on DATA frame from server")) { + return TimedAttemptSettings.newBuilder() + .setGlobalSettings(prevSettings.getGlobalSettings()) + .setRetryDelay(prevSettings.getRetryDelay()) + .setRpcTimeout(prevSettings.getRpcTimeout()) + .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION) + .setAttemptCount(prevSettings.getAttemptCount() + 1) + .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos()) + .build(); + } + } + return null; + } + + @Override + public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { + if (prevThrowable != null) { + Status status = Status.fromThrowable(prevThrowable); + if (status.getCode() == Status.Code.INTERNAL + && status.getDescription() != null + && status.getDescription().equals("Received unexpected EOS on DATA frame from server")) { + return true; + } + } + return (prevThrowable instanceof ApiException) && ((ApiException) prevThrowable).isRetryable(); + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ReadRowsAttemptCallable.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ReadRowsAttemptCallable.java new file mode 100644 index 0000000000..3dea54a119 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ReadRowsAttemptCallable.java @@ -0,0 +1,326 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1beta2.stub.readrows; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.retrying.ServerStreamingAttemptException; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StateCheckingResponseObserver; +import com.google.api.gax.rpc.StreamController; +import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1beta2.ReadRowsResponse; +import com.google.common.base.Preconditions; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import javax.annotation.concurrent.GuardedBy; +import org.threeten.bp.Duration; + +final class ReadRowsAttemptCallable implements Callable { + private final Object lock = new Object(); + + private final ServerStreamingCallable innerCallable; + private final StreamResumptionStrategy resumptionStrategy; + private final ReadRowsRequest initialRequest; + private ApiCallContext context; + private final ResponseObserver outerObserver; + + // Start state + private boolean autoFlowControl = true; + private boolean isStarted; + + // Outer state + @GuardedBy("lock") + private Throwable cancellationCause; + + @GuardedBy("lock") + private int pendingRequests; + + private RetryingFuture outerRetryingFuture; + + // Internal retry state + private int numAttempts; + + @GuardedBy("lock") + private StreamController innerController; + + private boolean seenSuccessSinceLastError; + private SettableApiFuture innerAttemptFuture; + + ReadRowsAttemptCallable( + ServerStreamingCallable innerCallable, + StreamResumptionStrategy resumptionStrategy, + ReadRowsRequest initialRequest, + ApiCallContext context, + ResponseObserver outerObserver) { + this.innerCallable = innerCallable; + this.resumptionStrategy = resumptionStrategy; + this.initialRequest = initialRequest; + this.context = context; + this.outerObserver = outerObserver; + } + + /** Sets controlling {@link RetryingFuture}. Must be called be before {@link #start()}. */ + void setExternalFuture(RetryingFuture retryingFuture) { + Preconditions.checkState(!isStarted, "Can't change the RetryingFuture once the call has start"); + Preconditions.checkNotNull(retryingFuture, "RetryingFuture can't be null"); + + this.outerRetryingFuture = retryingFuture; + } + + /** + * Starts the initial call. The call is attempted on the caller's thread. Further call attempts + * will be scheduled by the {@link RetryingFuture}. + */ + public void start() { + Preconditions.checkState(!isStarted, "Already started"); + + // Initialize the outer observer + outerObserver.onStart( + new StreamController() { + @Override + public void disableAutoInboundFlowControl() { + Preconditions.checkState( + !isStarted, "Can't disable auto flow control once the stream is started"); + autoFlowControl = false; + } + + @Override + public void request(int count) { + onRequest(count); + } + + @Override + public void cancel() { + onCancel(); + } + }); + + if (autoFlowControl) { + synchronized (lock) { + pendingRequests = Integer.MAX_VALUE; + } + } + isStarted = true; + + // Propagate the totalTimeout as the overall stream deadline. + Duration totalTimeout = + outerRetryingFuture.getAttemptSettings().getGlobalSettings().getTotalTimeout(); + + if (totalTimeout != null && context != null) { + context = context.withTimeout(totalTimeout); + } + + // Call the inner callable + call(); + } + + /** + * Sends the actual RPC. The request being sent will first be transformed by the {@link + * StreamResumptionStrategy}. + * + *

This method expects to be called by one thread at a time. Furthermore, it expects that the + * current RPC finished before the next time it's called. + */ + @Override + public Void call() { + Preconditions.checkState(isStarted, "Must be started first"); + + ReadRowsRequest request = + (++numAttempts == 1) ? initialRequest : resumptionStrategy.getResumeRequest(initialRequest); + + // Should never happen. onAttemptError will check if ResumptionStrategy can create a resume + // request, + // which the RetryingFuture/StreamResumptionStrategy should respect. + Preconditions.checkState(request != null, "ResumptionStrategy returned a null request."); + + innerAttemptFuture = SettableApiFuture.create(); + seenSuccessSinceLastError = false; + + ApiCallContext attemptContext = context; + + if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero()) { + attemptContext = + attemptContext.withStreamWaitTimeout( + outerRetryingFuture.getAttemptSettings().getRpcTimeout()); + } + + attemptContext + .getTracer() + .attemptStarted(outerRetryingFuture.getAttemptSettings().getOverallAttemptCount()); + + innerCallable.call( + request, + new StateCheckingResponseObserver() { + @Override + public void onStartImpl(StreamController controller) { + onAttemptStart(controller); + } + + @Override + public void onResponseImpl(ReadRowsResponse response) { + onAttemptResponse(response); + } + + @Override + public void onErrorImpl(Throwable t) { + onAttemptError(t); + } + + @Override + public void onCompleteImpl() { + onAttemptComplete(); + } + }, + attemptContext); + + outerRetryingFuture.setAttemptFuture(innerAttemptFuture); + + return null; + } + + /** + * Called by the inner {@link ServerStreamingCallable} when the call is about to start. This will + * transfer unfinished state from the previous attempt. + * + * @see ResponseObserver#onStart(StreamController) + */ + private void onAttemptStart(StreamController controller) { + if (!autoFlowControl) { + controller.disableAutoInboundFlowControl(); + } + + Throwable localCancellationCause; + int numToRequest = 0; + + synchronized (lock) { + innerController = controller; + + localCancellationCause = this.cancellationCause; + + if (!autoFlowControl) { + numToRequest = pendingRequests; + } + } + + if (localCancellationCause != null) { + controller.cancel(); + } else if (numToRequest > 0) { + controller.request(numToRequest); + } + } + + /** + * Called when the outer {@link ResponseObserver} wants to prematurely cancel the stream. + * + * @see StreamController#cancel() + */ + private void onCancel() { + StreamController localInnerController; + + synchronized (lock) { + if (cancellationCause != null) { + return; + } + // NOTE: BasicRetryingFuture will replace j.u.c.CancellationExceptions with it's own, + // which will not have the current stacktrace, so a special wrapper has be used here. + cancellationCause = + new ServerStreamingAttemptException( + new CancellationException("User cancelled stream"), + resumptionStrategy.canResume(), + seenSuccessSinceLastError); + localInnerController = innerController; + } + + if (localInnerController != null) { + localInnerController.cancel(); + } + } + + /** + * Called when the outer {@link ResponseObserver} is ready for more data. + * + * @see StreamController#request(int) + */ + private void onRequest(int count) { + Preconditions.checkState(!autoFlowControl, "Automatic flow control is enabled"); + Preconditions.checkArgument(count > 0, "Count must be > 0"); + + final StreamController localInnerController; + + synchronized (lock) { + int maxInc = Integer.MAX_VALUE - pendingRequests; + count = Math.min(maxInc, count); + + pendingRequests += count; + localInnerController = this.innerController; + } + + // Note: there is a race condition here where the count might go to the previous attempt's + // StreamController after it failed. But it doesn't matter, because the controller will just + // ignore it and the current controller will pick it up onStart. + if (localInnerController != null) { + localInnerController.request(count); + } + } + + /** Called when the inner callable has responses to deliver. */ + private void onAttemptResponse(ReadRowsResponse message) { + if (!autoFlowControl) { + synchronized (lock) { + pendingRequests--; + } + } + // Update local state to allow for future resume. + seenSuccessSinceLastError = true; + message = resumptionStrategy.processResponse(message); + // Notify the outer observer. + outerObserver.onResponse(message); + } + + /** + * Called when the current RPC fails. The error will be bubbled up to the outer {@link + * RetryingFuture} via the {@link #innerAttemptFuture}. + */ + private void onAttemptError(Throwable throwable) { + Throwable localCancellationCause; + synchronized (lock) { + localCancellationCause = cancellationCause; + } + + if (localCancellationCause != null) { + // Take special care to preserve the cancellation's stack trace. + innerAttemptFuture.setException(localCancellationCause); + } else { + // Wrap the original exception and provide more context for StreamingRetryAlgorithm. + innerAttemptFuture.setException( + new ServerStreamingAttemptException( + throwable, resumptionStrategy.canResume(), seenSuccessSinceLastError)); + } + } + + /** + * Called when the current RPC successfully completes. Notifies the outer {@link RetryingFuture} + * via {@link #innerAttemptFuture}. + */ + private void onAttemptComplete() { + innerAttemptFuture.set(null); + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ReadRowsRetryingCallable.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ReadRowsRetryingCallable.java new file mode 100644 index 0000000000..8d3384de81 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ReadRowsRetryingCallable.java @@ -0,0 +1,90 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1beta2.stub.readrows; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.retrying.ScheduledRetryingExecutor; +import com.google.api.gax.retrying.ServerStreamingAttemptException; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1beta2.ReadRowsResponse; + +public final class ReadRowsRetryingCallable + extends ServerStreamingCallable { + + private final ApiCallContext context; + private final ServerStreamingCallable innerCallable; + private final ScheduledRetryingExecutor executor; + private final StreamResumptionStrategy + resumptionStrategyPrototype; + + public ReadRowsRetryingCallable( + ApiCallContext context, + ServerStreamingCallable innerCallable, + ScheduledRetryingExecutor executor, + StreamResumptionStrategy resumptionStrategyPrototype) { + this.context = context; + this.innerCallable = innerCallable; + this.executor = executor; + this.resumptionStrategyPrototype = resumptionStrategyPrototype; + } + + @Override + public void call( + ReadRowsRequest request, + final ResponseObserver responseObserver, + ApiCallContext context) { + ReadRowsAttemptCallable attemptCallable = + new ReadRowsAttemptCallable( + innerCallable, + resumptionStrategyPrototype.createNew(), + request, + this.context, + responseObserver); + + RetryingFuture retryingFuture = executor.createFuture(attemptCallable, this.context); + attemptCallable.setExternalFuture(retryingFuture); + attemptCallable.start(); + + // Bridge the future result back to the external responseObserver + ApiFutures.addCallback( + retryingFuture, + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + // Make sure to unwrap the underlying ApiException + if (throwable instanceof ServerStreamingAttemptException) { + throwable = throwable.getCause(); + } + responseObserver.onError(throwable); + } + + @Override + public void onSuccess(Void ignored) { + responseObserver.onComplete(); + } + }, + directExecutor()); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java index 88e9004092..f5f8b24086 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java @@ -162,4 +162,24 @@ public void readRowsExceptionTest() throws Exception { Assert.assertEquals(StatusCode.Code.INVALID_ARGUMENT, apiException.getStatusCode().getCode()); } } + + @Test + @SuppressWarnings("all") + public void readRowsRetryingExceptionTest() throws ExecutionException, InterruptedException { + StatusRuntimeException exception = + new StatusRuntimeException( + Status.INTERNAL.withDescription("Received unexpected EOS on DATA frame from server")); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + } }