com.google.api.grpc
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java
index 9452fe62a6..351fd21c4f 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java
@@ -17,16 +17,31 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.core.BackgroundResource;
+import com.google.api.gax.grpc.GrpcCallSettings;
+import com.google.api.gax.grpc.GrpcRawCallableFactory;
+import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
+import com.google.api.gax.retrying.ScheduledRetryingExecutor;
+import com.google.api.gax.retrying.StreamingRetryAlgorithm;
+import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
+import com.google.api.gax.rpc.RequestParamsExtractor;
+import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
+import com.google.api.gax.tracing.SpanName;
+import com.google.api.gax.tracing.TracedServerStreamingCallable;
+import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc;
import com.google.cloud.bigquery.storage.v1beta2.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1beta2.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.ReadSession;
import com.google.cloud.bigquery.storage.v1beta2.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1beta2.SplitReadStreamResponse;
+import com.google.cloud.bigquery.storage.v1beta2.stub.readrows.ApiResultRetryAlgorithm;
+import com.google.cloud.bigquery.storage.v1beta2.stub.readrows.ReadRowsRetryingCallable;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
@@ -35,7 +50,11 @@
* This class is for advanced usage and reflects the underlying API directly.
*/
public class EnhancedBigQueryReadStub implements BackgroundResource {
+
+ private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage";
private final GrpcBigQueryReadStub stub;
+ private final BigQueryReadStubSettings stubSettings;
+ private final ClientContext context;
public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings)
throws IOException {
@@ -69,12 +88,15 @@ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings s
BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build();
ClientContext clientContext = ClientContext.create(baseSettings);
GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext);
- return new EnhancedBigQueryReadStub(stub);
+ return new EnhancedBigQueryReadStub(stub, baseSettings, clientContext);
}
@InternalApi("Visible for testing")
- EnhancedBigQueryReadStub(GrpcBigQueryReadStub stub) {
+ EnhancedBigQueryReadStub(
+ GrpcBigQueryReadStub stub, BigQueryReadStubSettings stubSettings, ClientContext context) {
this.stub = stub;
+ this.stubSettings = stubSettings;
+ this.context = context;
}
public UnaryCallable createReadSessionCallable() {
@@ -82,7 +104,48 @@ public UnaryCallable createReadSessionCal
}
public ServerStreamingCallable readRowsCallable() {
- return stub.readRowsCallable();
+ ServerStreamingCallable innerCallable =
+ GrpcRawCallableFactory.createServerStreamingCallable(
+ GrpcCallSettings.newBuilder()
+ .setMethodDescriptor(BigQueryReadGrpc.getReadRowsMethod())
+ .setParamsExtractor(
+ new RequestParamsExtractor() {
+ @Override
+ public Map extract(ReadRowsRequest request) {
+ return ImmutableMap.of(
+ "read_stream", String.valueOf(request.getReadStream()));
+ }
+ })
+ .build(),
+ stubSettings.readRowsSettings().getRetryableCodes());
+ ServerStreamingCallSettings callSettings =
+ stubSettings.readRowsSettings();
+
+ StreamingRetryAlgorithm retryAlgorithm =
+ new StreamingRetryAlgorithm<>(
+ new ApiResultRetryAlgorithm(),
+ new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock()));
+
+ ScheduledRetryingExecutor retryingExecutor =
+ new ScheduledRetryingExecutor<>(retryAlgorithm, context.getExecutor());
+
+ if (context.getStreamWatchdog() != null) {
+ innerCallable = Callables.watched(innerCallable, callSettings, context);
+ }
+
+ ReadRowsRetryingCallable outerCallable =
+ new ReadRowsRetryingCallable(
+ context.getDefaultCallContext(),
+ innerCallable,
+ retryingExecutor,
+ callSettings.getResumptionStrategy());
+
+ ServerStreamingCallable traced =
+ new TracedServerStreamingCallable<>(
+ outerCallable,
+ context.getTracerFactory(),
+ SpanName.of(TRACING_OUTER_CLIENT_NAME, "ReadRows"));
+ return traced.withDefaultCallContext(context.getDefaultCallContext());
}
public UnaryCallable splitReadStreamCallable() {
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java
new file mode 100644
index 0000000000..63ea4b0391
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.storage.v1beta2.stub.readrows;
+
+import com.google.api.core.InternalApi;
+import com.google.api.gax.retrying.ResultRetryAlgorithm;
+import com.google.api.gax.retrying.TimedAttemptSettings;
+import com.google.api.gax.rpc.ApiException;
+import io.grpc.Status;
+import org.threeten.bp.Duration;
+
+/** For internal use, public for technical reasons. */
+@InternalApi
+public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm {
+ // Duration to sleep on if the error is DEADLINE_EXCEEDED.
+ public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);
+
+ @Override
+ public TimedAttemptSettings createNextAttempt(
+ Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
+ if (prevThrowable != null) {
+ Status status = Status.fromThrowable(prevThrowable);
+ if (status.getCode() == Status.Code.INTERNAL
+ && status.getDescription() != null
+ && status.getDescription().equals("Received unexpected EOS on DATA frame from server")) {
+ return TimedAttemptSettings.newBuilder()
+ .setGlobalSettings(prevSettings.getGlobalSettings())
+ .setRetryDelay(prevSettings.getRetryDelay())
+ .setRpcTimeout(prevSettings.getRpcTimeout())
+ .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION)
+ .setAttemptCount(prevSettings.getAttemptCount() + 1)
+ .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos())
+ .build();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
+ if (prevThrowable != null) {
+ Status status = Status.fromThrowable(prevThrowable);
+ if (status.getCode() == Status.Code.INTERNAL
+ && status.getDescription() != null
+ && status.getDescription().equals("Received unexpected EOS on DATA frame from server")) {
+ return true;
+ }
+ }
+ return (prevThrowable instanceof ApiException) && ((ApiException) prevThrowable).isRetryable();
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ReadRowsAttemptCallable.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ReadRowsAttemptCallable.java
new file mode 100644
index 0000000000..3dea54a119
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ReadRowsAttemptCallable.java
@@ -0,0 +1,326 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.storage.v1beta2.stub.readrows;
+
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.retrying.RetryingFuture;
+import com.google.api.gax.retrying.ServerStreamingAttemptException;
+import com.google.api.gax.retrying.StreamResumptionStrategy;
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.StateCheckingResponseObserver;
+import com.google.api.gax.rpc.StreamController;
+import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1beta2.ReadRowsResponse;
+import com.google.common.base.Preconditions;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import javax.annotation.concurrent.GuardedBy;
+import org.threeten.bp.Duration;
+
+final class ReadRowsAttemptCallable implements Callable {
+ private final Object lock = new Object();
+
+ private final ServerStreamingCallable innerCallable;
+ private final StreamResumptionStrategy resumptionStrategy;
+ private final ReadRowsRequest initialRequest;
+ private ApiCallContext context;
+ private final ResponseObserver outerObserver;
+
+ // Start state
+ private boolean autoFlowControl = true;
+ private boolean isStarted;
+
+ // Outer state
+ @GuardedBy("lock")
+ private Throwable cancellationCause;
+
+ @GuardedBy("lock")
+ private int pendingRequests;
+
+ private RetryingFuture outerRetryingFuture;
+
+ // Internal retry state
+ private int numAttempts;
+
+ @GuardedBy("lock")
+ private StreamController innerController;
+
+ private boolean seenSuccessSinceLastError;
+ private SettableApiFuture innerAttemptFuture;
+
+ ReadRowsAttemptCallable(
+ ServerStreamingCallable innerCallable,
+ StreamResumptionStrategy resumptionStrategy,
+ ReadRowsRequest initialRequest,
+ ApiCallContext context,
+ ResponseObserver outerObserver) {
+ this.innerCallable = innerCallable;
+ this.resumptionStrategy = resumptionStrategy;
+ this.initialRequest = initialRequest;
+ this.context = context;
+ this.outerObserver = outerObserver;
+ }
+
+ /** Sets controlling {@link RetryingFuture}. Must be called be before {@link #start()}. */
+ void setExternalFuture(RetryingFuture retryingFuture) {
+ Preconditions.checkState(!isStarted, "Can't change the RetryingFuture once the call has start");
+ Preconditions.checkNotNull(retryingFuture, "RetryingFuture can't be null");
+
+ this.outerRetryingFuture = retryingFuture;
+ }
+
+ /**
+ * Starts the initial call. The call is attempted on the caller's thread. Further call attempts
+ * will be scheduled by the {@link RetryingFuture}.
+ */
+ public void start() {
+ Preconditions.checkState(!isStarted, "Already started");
+
+ // Initialize the outer observer
+ outerObserver.onStart(
+ new StreamController() {
+ @Override
+ public void disableAutoInboundFlowControl() {
+ Preconditions.checkState(
+ !isStarted, "Can't disable auto flow control once the stream is started");
+ autoFlowControl = false;
+ }
+
+ @Override
+ public void request(int count) {
+ onRequest(count);
+ }
+
+ @Override
+ public void cancel() {
+ onCancel();
+ }
+ });
+
+ if (autoFlowControl) {
+ synchronized (lock) {
+ pendingRequests = Integer.MAX_VALUE;
+ }
+ }
+ isStarted = true;
+
+ // Propagate the totalTimeout as the overall stream deadline.
+ Duration totalTimeout =
+ outerRetryingFuture.getAttemptSettings().getGlobalSettings().getTotalTimeout();
+
+ if (totalTimeout != null && context != null) {
+ context = context.withTimeout(totalTimeout);
+ }
+
+ // Call the inner callable
+ call();
+ }
+
+ /**
+ * Sends the actual RPC. The request being sent will first be transformed by the {@link
+ * StreamResumptionStrategy}.
+ *
+ * This method expects to be called by one thread at a time. Furthermore, it expects that the
+ * current RPC finished before the next time it's called.
+ */
+ @Override
+ public Void call() {
+ Preconditions.checkState(isStarted, "Must be started first");
+
+ ReadRowsRequest request =
+ (++numAttempts == 1) ? initialRequest : resumptionStrategy.getResumeRequest(initialRequest);
+
+ // Should never happen. onAttemptError will check if ResumptionStrategy can create a resume
+ // request,
+ // which the RetryingFuture/StreamResumptionStrategy should respect.
+ Preconditions.checkState(request != null, "ResumptionStrategy returned a null request.");
+
+ innerAttemptFuture = SettableApiFuture.create();
+ seenSuccessSinceLastError = false;
+
+ ApiCallContext attemptContext = context;
+
+ if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero()) {
+ attemptContext =
+ attemptContext.withStreamWaitTimeout(
+ outerRetryingFuture.getAttemptSettings().getRpcTimeout());
+ }
+
+ attemptContext
+ .getTracer()
+ .attemptStarted(outerRetryingFuture.getAttemptSettings().getOverallAttemptCount());
+
+ innerCallable.call(
+ request,
+ new StateCheckingResponseObserver() {
+ @Override
+ public void onStartImpl(StreamController controller) {
+ onAttemptStart(controller);
+ }
+
+ @Override
+ public void onResponseImpl(ReadRowsResponse response) {
+ onAttemptResponse(response);
+ }
+
+ @Override
+ public void onErrorImpl(Throwable t) {
+ onAttemptError(t);
+ }
+
+ @Override
+ public void onCompleteImpl() {
+ onAttemptComplete();
+ }
+ },
+ attemptContext);
+
+ outerRetryingFuture.setAttemptFuture(innerAttemptFuture);
+
+ return null;
+ }
+
+ /**
+ * Called by the inner {@link ServerStreamingCallable} when the call is about to start. This will
+ * transfer unfinished state from the previous attempt.
+ *
+ * @see ResponseObserver#onStart(StreamController)
+ */
+ private void onAttemptStart(StreamController controller) {
+ if (!autoFlowControl) {
+ controller.disableAutoInboundFlowControl();
+ }
+
+ Throwable localCancellationCause;
+ int numToRequest = 0;
+
+ synchronized (lock) {
+ innerController = controller;
+
+ localCancellationCause = this.cancellationCause;
+
+ if (!autoFlowControl) {
+ numToRequest = pendingRequests;
+ }
+ }
+
+ if (localCancellationCause != null) {
+ controller.cancel();
+ } else if (numToRequest > 0) {
+ controller.request(numToRequest);
+ }
+ }
+
+ /**
+ * Called when the outer {@link ResponseObserver} wants to prematurely cancel the stream.
+ *
+ * @see StreamController#cancel()
+ */
+ private void onCancel() {
+ StreamController localInnerController;
+
+ synchronized (lock) {
+ if (cancellationCause != null) {
+ return;
+ }
+ // NOTE: BasicRetryingFuture will replace j.u.c.CancellationExceptions with it's own,
+ // which will not have the current stacktrace, so a special wrapper has be used here.
+ cancellationCause =
+ new ServerStreamingAttemptException(
+ new CancellationException("User cancelled stream"),
+ resumptionStrategy.canResume(),
+ seenSuccessSinceLastError);
+ localInnerController = innerController;
+ }
+
+ if (localInnerController != null) {
+ localInnerController.cancel();
+ }
+ }
+
+ /**
+ * Called when the outer {@link ResponseObserver} is ready for more data.
+ *
+ * @see StreamController#request(int)
+ */
+ private void onRequest(int count) {
+ Preconditions.checkState(!autoFlowControl, "Automatic flow control is enabled");
+ Preconditions.checkArgument(count > 0, "Count must be > 0");
+
+ final StreamController localInnerController;
+
+ synchronized (lock) {
+ int maxInc = Integer.MAX_VALUE - pendingRequests;
+ count = Math.min(maxInc, count);
+
+ pendingRequests += count;
+ localInnerController = this.innerController;
+ }
+
+ // Note: there is a race condition here where the count might go to the previous attempt's
+ // StreamController after it failed. But it doesn't matter, because the controller will just
+ // ignore it and the current controller will pick it up onStart.
+ if (localInnerController != null) {
+ localInnerController.request(count);
+ }
+ }
+
+ /** Called when the inner callable has responses to deliver. */
+ private void onAttemptResponse(ReadRowsResponse message) {
+ if (!autoFlowControl) {
+ synchronized (lock) {
+ pendingRequests--;
+ }
+ }
+ // Update local state to allow for future resume.
+ seenSuccessSinceLastError = true;
+ message = resumptionStrategy.processResponse(message);
+ // Notify the outer observer.
+ outerObserver.onResponse(message);
+ }
+
+ /**
+ * Called when the current RPC fails. The error will be bubbled up to the outer {@link
+ * RetryingFuture} via the {@link #innerAttemptFuture}.
+ */
+ private void onAttemptError(Throwable throwable) {
+ Throwable localCancellationCause;
+ synchronized (lock) {
+ localCancellationCause = cancellationCause;
+ }
+
+ if (localCancellationCause != null) {
+ // Take special care to preserve the cancellation's stack trace.
+ innerAttemptFuture.setException(localCancellationCause);
+ } else {
+ // Wrap the original exception and provide more context for StreamingRetryAlgorithm.
+ innerAttemptFuture.setException(
+ new ServerStreamingAttemptException(
+ throwable, resumptionStrategy.canResume(), seenSuccessSinceLastError));
+ }
+ }
+
+ /**
+ * Called when the current RPC successfully completes. Notifies the outer {@link RetryingFuture}
+ * via {@link #innerAttemptFuture}.
+ */
+ private void onAttemptComplete() {
+ innerAttemptFuture.set(null);
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ReadRowsRetryingCallable.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ReadRowsRetryingCallable.java
new file mode 100644
index 0000000000..8d3384de81
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ReadRowsRetryingCallable.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.storage.v1beta2.stub.readrows;
+
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.retrying.RetryingFuture;
+import com.google.api.gax.retrying.ScheduledRetryingExecutor;
+import com.google.api.gax.retrying.ServerStreamingAttemptException;
+import com.google.api.gax.retrying.StreamResumptionStrategy;
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1beta2.ReadRowsResponse;
+
+public final class ReadRowsRetryingCallable
+ extends ServerStreamingCallable {
+
+ private final ApiCallContext context;
+ private final ServerStreamingCallable innerCallable;
+ private final ScheduledRetryingExecutor executor;
+ private final StreamResumptionStrategy
+ resumptionStrategyPrototype;
+
+ public ReadRowsRetryingCallable(
+ ApiCallContext context,
+ ServerStreamingCallable innerCallable,
+ ScheduledRetryingExecutor executor,
+ StreamResumptionStrategy resumptionStrategyPrototype) {
+ this.context = context;
+ this.innerCallable = innerCallable;
+ this.executor = executor;
+ this.resumptionStrategyPrototype = resumptionStrategyPrototype;
+ }
+
+ @Override
+ public void call(
+ ReadRowsRequest request,
+ final ResponseObserver responseObserver,
+ ApiCallContext context) {
+ ReadRowsAttemptCallable attemptCallable =
+ new ReadRowsAttemptCallable(
+ innerCallable,
+ resumptionStrategyPrototype.createNew(),
+ request,
+ this.context,
+ responseObserver);
+
+ RetryingFuture retryingFuture = executor.createFuture(attemptCallable, this.context);
+ attemptCallable.setExternalFuture(retryingFuture);
+ attemptCallable.start();
+
+ // Bridge the future result back to the external responseObserver
+ ApiFutures.addCallback(
+ retryingFuture,
+ new ApiFutureCallback() {
+ @Override
+ public void onFailure(Throwable throwable) {
+ // Make sure to unwrap the underlying ApiException
+ if (throwable instanceof ServerStreamingAttemptException) {
+ throwable = throwable.getCause();
+ }
+ responseObserver.onError(throwable);
+ }
+
+ @Override
+ public void onSuccess(Void ignored) {
+ responseObserver.onComplete();
+ }
+ },
+ directExecutor());
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java
index 88e9004092..f5f8b24086 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java
@@ -162,4 +162,24 @@ public void readRowsExceptionTest() throws Exception {
Assert.assertEquals(StatusCode.Code.INVALID_ARGUMENT, apiException.getStatusCode().getCode());
}
}
+
+ @Test
+ @SuppressWarnings("all")
+ public void readRowsRetryingExceptionTest() throws ExecutionException, InterruptedException {
+ StatusRuntimeException exception =
+ new StatusRuntimeException(
+ Status.INTERNAL.withDescription("Received unexpected EOS on DATA frame from server"));
+ mockBigQueryRead.addException(exception);
+ long rowCount = 1340416618L;
+ ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build();
+ mockBigQueryRead.addResponse(expectedResponse);
+ ReadRowsRequest request = ReadRowsRequest.newBuilder().build();
+
+ MockStreamObserver responseObserver = new MockStreamObserver<>();
+
+ ServerStreamingCallable callable = client.readRowsCallable();
+ callable.serverStreamingCall(request, responseObserver);
+ List actualResponses = responseObserver.future().get();
+ Assert.assertEquals(1, actualResponses.size());
+ }
}