retryingFuture) {
+ Preconditions.checkState(!isStarted, "Can't change the RetryingFuture once the call has start");
+ Preconditions.checkNotNull(retryingFuture, "RetryingFuture can't be null");
+
+ this.outerRetryingFuture = retryingFuture;
+ }
+
+ /**
+ * Starts the initial call. The call is attempted on the caller's thread. Further call attempts
+ * will be scheduled by the {@link RetryingFuture}.
+ */
+ public void start() {
+ Preconditions.checkState(!isStarted, "Already started");
+
+ // Initialize the outer observer
+ outerObserver.onStart(
+ new StreamController() {
+ @Override
+ public void disableAutoInboundFlowControl() {
+ Preconditions.checkState(
+ !isStarted, "Can't disable auto flow control once the stream is started");
+ autoFlowControl = false;
+ }
+
+ @Override
+ public void request(int count) {
+ onRequest(count);
+ }
+
+ @Override
+ public void cancel() {
+ onCancel();
+ }
+ });
+
+ if (autoFlowControl) {
+ synchronized (lock) {
+ pendingRequests = Integer.MAX_VALUE;
+ }
+ }
+ isStarted = true;
+
+ // Propagate the totalTimeout as the overall stream deadline.
+ Duration totalTimeout =
+ outerRetryingFuture.getAttemptSettings().getGlobalSettings().getTotalTimeout();
+
+ if (totalTimeout != null && context != null) {
+ context = context.withTimeout(totalTimeout);
+ }
+
+ // Call the inner callable
+ call();
+ }
+
+ /**
+ * Sends the actual RPC. The request being sent will first be transformed by the {@link
+ * StreamResumptionStrategy}.
+ *
+ * This method expects to be called by one thread at a time. Furthermore, it expects that the
+ * current RPC finished before the next time it's called.
+ */
+ @Override
+ public Void call() {
+ Preconditions.checkState(isStarted, "Must be started first");
+
+ ReadRowsRequest request =
+ (++numAttempts == 1) ? initialRequest : resumptionStrategy.getResumeRequest(initialRequest);
+
+ // Should never happen. onAttemptError will check if ResumptionStrategy can create a resume
+ // request,
+ // which the RetryingFuture/StreamResumptionStrategy should respect.
+ Preconditions.checkState(request != null, "ResumptionStrategy returned a null request.");
+
+ innerAttemptFuture = SettableApiFuture.create();
+ seenSuccessSinceLastError = false;
+
+ ApiCallContext attemptContext = context;
+
+ if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero()) {
+ attemptContext =
+ attemptContext.withStreamWaitTimeout(
+ outerRetryingFuture.getAttemptSettings().getRpcTimeout());
+ }
+
+ attemptContext
+ .getTracer()
+ .attemptStarted(outerRetryingFuture.getAttemptSettings().getOverallAttemptCount());
+
+ innerCallable.call(
+ request,
+ new StateCheckingResponseObserver() {
+ @Override
+ public void onStartImpl(StreamController controller) {
+ onAttemptStart(controller);
+ }
+
+ @Override
+ public void onResponseImpl(ReadRowsResponse response) {
+ onAttemptResponse(response);
+ }
+
+ @Override
+ public void onErrorImpl(Throwable t) {
+ onAttemptError(t);
+ }
+
+ @Override
+ public void onCompleteImpl() {
+ onAttemptComplete();
+ }
+ },
+ attemptContext);
+
+ outerRetryingFuture.setAttemptFuture(innerAttemptFuture);
+
+ return null;
+ }
+
+ /**
+ * Called by the inner {@link ServerStreamingCallable} when the call is about to start. This will
+ * transfer unfinished state from the previous attempt.
+ *
+ * @see ResponseObserver#onStart(StreamController)
+ */
+ private void onAttemptStart(StreamController controller) {
+ if (!autoFlowControl) {
+ controller.disableAutoInboundFlowControl();
+ }
+
+ Throwable localCancellationCause;
+ int numToRequest = 0;
+
+ synchronized (lock) {
+ innerController = controller;
+
+ localCancellationCause = this.cancellationCause;
+
+ if (!autoFlowControl) {
+ numToRequest = pendingRequests;
+ }
+ }
+
+ if (localCancellationCause != null) {
+ controller.cancel();
+ } else if (numToRequest > 0) {
+ controller.request(numToRequest);
+ }
+ }
+
+ /**
+ * Called when the outer {@link ResponseObserver} wants to prematurely cancel the stream.
+ *
+ * @see StreamController#cancel()
+ */
+ private void onCancel() {
+ StreamController localInnerController;
+
+ synchronized (lock) {
+ if (cancellationCause != null) {
+ return;
+ }
+ // NOTE: BasicRetryingFuture will replace j.u.c.CancellationExceptions with it's own,
+ // which will not have the current stacktrace, so a special wrapper has be used here.
+ cancellationCause =
+ new ServerStreamingAttemptException(
+ new CancellationException("User cancelled stream"),
+ resumptionStrategy.canResume(),
+ seenSuccessSinceLastError);
+ localInnerController = innerController;
+ }
+
+ if (localInnerController != null) {
+ localInnerController.cancel();
+ }
+ }
+
+ /**
+ * Called when the outer {@link ResponseObserver} is ready for more data.
+ *
+ * @see StreamController#request(int)
+ */
+ private void onRequest(int count) {
+ Preconditions.checkState(!autoFlowControl, "Automatic flow control is enabled");
+ Preconditions.checkArgument(count > 0, "Count must be > 0");
+
+ final StreamController localInnerController;
+
+ synchronized (lock) {
+ int maxInc = Integer.MAX_VALUE - pendingRequests;
+ count = Math.min(maxInc, count);
+
+ pendingRequests += count;
+ localInnerController = this.innerController;
+ }
+
+ // Note: there is a race condition here where the count might go to the previous attempt's
+ // StreamController after it failed. But it doesn't matter, because the controller will just
+ // ignore it and the current controller will pick it up onStart.
+ if (localInnerController != null) {
+ localInnerController.request(count);
+ }
+ }
+
+ /** Called when the inner callable has responses to deliver. */
+ private void onAttemptResponse(ReadRowsResponse message) {
+ if (!autoFlowControl) {
+ synchronized (lock) {
+ pendingRequests--;
+ }
+ }
+ // Update local state to allow for future resume.
+ seenSuccessSinceLastError = true;
+ message = resumptionStrategy.processResponse(message);
+ // Notify the outer observer.
+ outerObserver.onResponse(message);
+ }
+
+ /**
+ * Called when the current RPC fails. The error will be bubbled up to the outer {@link
+ * RetryingFuture} via the {@link #innerAttemptFuture}.
+ */
+ private void onAttemptError(Throwable throwable) {
+ Throwable localCancellationCause;
+ synchronized (lock) {
+ localCancellationCause = cancellationCause;
+ }
+
+ if (localCancellationCause != null) {
+ // Take special care to preserve the cancellation's stack trace.
+ innerAttemptFuture.setException(localCancellationCause);
+ } else {
+ // Wrap the original exception and provide more context for StreamingRetryAlgorithm.
+ innerAttemptFuture.setException(
+ new ServerStreamingAttemptException(
+ throwable, resumptionStrategy.canResume(), seenSuccessSinceLastError));
+ }
+ }
+
+ /**
+ * Called when the current RPC successfully completes. Notifies the outer {@link RetryingFuture}
+ * via {@link #innerAttemptFuture}.
+ */
+ private void onAttemptComplete() {
+ innerAttemptFuture.set(null);
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryingCallable.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryingCallable.java
new file mode 100644
index 0000000000..2b4308ae70
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryingCallable.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.bigquery.storage.v1beta1.stub.readrows;
+
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.retrying.RetryingFuture;
+import com.google.api.gax.retrying.ScheduledRetryingExecutor;
+import com.google.api.gax.retrying.ServerStreamingAttemptException;
+import com.google.api.gax.retrying.StreamResumptionStrategy;
+import com.google.api.gax.rpc.ApiCallContext;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
+
+public final class ReadRowsRetryingCallable
+ extends ServerStreamingCallable {
+
+ private final ApiCallContext context;
+ private final ServerStreamingCallable innerCallable;
+ private final ScheduledRetryingExecutor executor;
+ private final StreamResumptionStrategy
+ resumptionStrategyPrototype;
+
+ public ReadRowsRetryingCallable(
+ ApiCallContext context,
+ ServerStreamingCallable innerCallable,
+ ScheduledRetryingExecutor executor,
+ StreamResumptionStrategy resumptionStrategyPrototype) {
+ this.context = context;
+ this.innerCallable = innerCallable;
+ this.executor = executor;
+ this.resumptionStrategyPrototype = resumptionStrategyPrototype;
+ }
+
+ @Override
+ public void call(
+ ReadRowsRequest request,
+ final ResponseObserver responseObserver,
+ ApiCallContext context) {
+ ReadRowsAttemptCallable attemptCallable =
+ new ReadRowsAttemptCallable(
+ innerCallable,
+ resumptionStrategyPrototype.createNew(),
+ request,
+ this.context,
+ responseObserver);
+
+ RetryingFuture retryingFuture = executor.createFuture(attemptCallable, this.context);
+ attemptCallable.setExternalFuture(retryingFuture);
+ attemptCallable.start();
+
+ // Bridge the future result back to the external responseObserver
+ ApiFutures.addCallback(
+ retryingFuture,
+ new ApiFutureCallback() {
+ @Override
+ public void onFailure(Throwable throwable) {
+ // Make sure to unwrap the underlying ApiException
+ if (throwable instanceof ServerStreamingAttemptException) {
+ throwable = throwable.getCause();
+ }
+ responseObserver.onError(throwable);
+ }
+
+ @Override
+ public void onSuccess(Void ignored) {
+ responseObserver.onComplete();
+ }
+ },
+ directExecutor());
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java
index 17d2a646ab..4f7ab8f249 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java
@@ -292,4 +292,24 @@ public void splitReadStreamExceptionTest() throws Exception {
// Expected exception
}
}
+
+ @Test
+ @SuppressWarnings("all")
+ public void readRowsRetryingExceptionTest() throws ExecutionException, InterruptedException {
+ StatusRuntimeException exception =
+ new StatusRuntimeException(
+ Status.INTERNAL.withDescription("Received unexpected EOS on DATA frame from server"));
+ mockBigQueryStorage.addException(exception);
+ long rowCount = 1340416618L;
+ ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build();
+ mockBigQueryStorage.addResponse(expectedResponse);
+ ReadRowsRequest request = ReadRowsRequest.newBuilder().build();
+
+ MockStreamObserver responseObserver = new MockStreamObserver<>();
+
+ ServerStreamingCallable callable = client.readRowsCallable();
+ callable.serverStreamingCall(request, responseObserver);
+ List actualResponses = responseObserver.future().get();
+ Assert.assertEquals(1, actualResponses.size());
+ }
}