Skip to content

Commit

Permalink
Check if exception is InternalException instead of using fromThrowable
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Jan 20, 2021
1 parent 758a5f2 commit 246f389
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 17 deletions.
Expand Up @@ -346,8 +346,14 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
.build(),
readRowsSettings.getRetryableCodes());

// Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> convertException =
new ReadRowsConvertExceptionCallable<>(base);

ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(base, rowAdapter);
new RowMergingCallable<>(convertException, rowAdapter);

// Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the inner
// ReadRowsRequest -> ReadRowsResponse callable).
Expand All @@ -366,14 +372,10 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
new HeaderTracerStreamingCallable<>(
watched, settings.getHeaderTracer(), getSpanName("ReadRows").toString());

// Check for "received rst stream" exceptions and convert them to retryable ApiExceptions
ServerStreamingCallable<ReadRowsRequest, RowT> convertException =
new ReadRowsConvertExceptionCallable<>(withHeaderTracer);

// Retry logic is split into 2 parts to workaround a rare edge case described in
// ReadRowsRetryCompletedCallable
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
new ReadRowsRetryCompletedCallable<>(convertException);
new ReadRowsRetryCompletedCallable<>(withHeaderTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
Expand Down
Expand Up @@ -16,14 +16,12 @@
package com.google.cloud.bigtable.data.v2.stub.readrows;

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import io.grpc.Status;
import io.grpc.Status.Code;

/**
* This callable converts the "Received rst stream" exception into a retryable {@link ApiException}.
Expand Down Expand Up @@ -67,19 +65,23 @@ public void onResponse(RowT response) {

@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
if (status.getCode() == Code.INTERNAL
&& status.getDescription() != null
&& status.getDescription().contains("Received Rst stream")) {
outerObserver.onError(new ApiException(t, GrpcStatusCode.of(status.getCode()), true));
} else {
outerObserver.onError(t);
}
outerObserver.onError(convertException(t));
}

@Override
public void onComplete() {
outerObserver.onComplete();
}
}

private Throwable convertException(Throwable t) {
// Long lived connections sometimes are disconnected via an RST frame. This error is
// transient and should be retried.
if (t instanceof InternalException) {
if (t.getMessage() != null && t.getMessage().contains("Received Rst stream")) {
return new InternalException(t, ((InternalException) t).getStatusCode(), true);
}
}
return t;
}
}

0 comments on commit 246f389

Please sign in to comment.