Skip to content

Commit

Permalink
fix: retry certain RESOURCE_EXHAUSTED errors
Browse files Browse the repository at this point in the history
Handle certain RESOURCE_EXHAUSTED errors and report the retry attempts.
  • Loading branch information
esert-g committed Aug 24, 2021
1 parent 7755bf7 commit 8b99da1
Show file tree
Hide file tree
Showing 17 changed files with 731 additions and 26 deletions.
Expand Up @@ -15,12 +15,53 @@
*/
package com.google.cloud.bigquery.storage.util;

import com.google.rpc.RetryInfo;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import org.threeten.bp.Duration;

/** Static utility methods for working with Errors returned from the service. */
public class Errors {
private Errors() {};

public static class IsRetryableStatusResult {
public boolean isRetryable = false;
public Duration retryDelay = null;
}

private static final Metadata.Key<RetryInfo> KEY_RETRY_INFO =
ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());

/**
* Returns true iff the Status indicates an error that is retryable.
*
* <p>Generally, internal errors are not considered retryable, however there are certain transient
* network issues that appear as internal but are in fact retryable.
*
* <p>Resource exhausted errors are only considered retryable if metadata contains a serialized
* RetryInfo object.
*/
public static IsRetryableStatusResult isRetryableStatus(Status status, Metadata metadata) {
IsRetryableStatusResult result = new IsRetryableStatusResult();

result.isRetryable = isRetryableInternalStatus(status);
if (!result.isRetryable
&& status.getCode() == Status.Code.RESOURCE_EXHAUSTED
&& metadata != null
&& metadata.containsKey(KEY_RETRY_INFO)) {
RetryInfo retryInfo = metadata.get(KEY_RETRY_INFO);
if (retryInfo.hasRetryDelay()) {
result.isRetryable = true;
result.retryDelay =
Duration.ofSeconds(
retryInfo.getRetryDelay().getSeconds(), retryInfo.getRetryDelay().getNanos());
}
}

return result;
}

/**
* Returns true iff the Status indicates and internal error that is retryable.
*
Expand Down
Expand Up @@ -126,7 +126,9 @@ public static final BigQueryReadClient create(EnhancedBigQueryReadStub stub) {
*/
protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException {
this.settings = settings;
this.stub = EnhancedBigQueryReadStub.create(settings.getTypedStubSettings());
this.stub =
EnhancedBigQueryReadStub.create(
settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener());
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
Expand Down
Expand Up @@ -27,6 +27,8 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStubSettings;
import io.grpc.Metadata;
import io.grpc.Status;
import java.io.IOException;
import java.util.List;

Expand Down Expand Up @@ -69,6 +71,26 @@ public ServerStreamingCallSettings<ReadRowsRequest, ReadRowsResponse> readRowsSe
return getTypedStubSettings().readRowsSettings();
}

public static interface RetryAttemptListener {
public void onRetryAttempt(Status prevStatus, Metadata prevMetadata);
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

/**
* If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt
* function before a failed ReadRows request is retried. This can be used as negative feedback
* mechanism for future decision to split read streams because some retried failures are due to
* resource exhaustion that increased parallelism only makes it worse.
*/
public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
}

public RetryAttemptListener getReadRowsRetryAttemptListener() {
return readRowsRetryAttemptListener;
}

/** Returns the object with the settings used for calls to splitReadStream. */
public UnaryCallSettings<SplitReadStreamRequest, SplitReadStreamResponse>
splitReadStreamSettings() {
Expand Down Expand Up @@ -176,6 +198,14 @@ public Builder applyToAllUnaryMethods(
return this;
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

public Builder setReadRowsRetryAttemptListener(
RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
return this;
}

/** Returns the builder for the settings used for calls to createReadSession. */
public UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession>
createReadSessionSettings() {
Expand All @@ -196,7 +226,9 @@ public Builder applyToAllUnaryMethods(

@Override
public BigQueryReadSettings build() throws IOException {
return new BigQueryReadSettings(this);
BigQueryReadSettings settings = new BigQueryReadSettings(this);
settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener);
return settings;
}
}
}
Expand Up @@ -31,6 +31,7 @@
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.cloud.bigquery.storage.v1.BigQueryReadGrpc;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
Expand All @@ -54,10 +55,18 @@ public class EnhancedBigQueryReadStub implements BackgroundResource {
private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage";
private final GrpcBigQueryReadStub stub;
private final BigQueryReadStubSettings stubSettings;
private final BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener;
private final ClientContext context;

public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings)
throws IOException {
return create(settings, null);
}

public static EnhancedBigQueryReadStub create(
EnhancedBigQueryReadStubSettings settings,
BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener)
throws IOException {
// Configure the base settings.
BigQueryReadStubSettings.Builder baseSettingsBuilder =
BigQueryReadStubSettings.newBuilder()
Expand Down Expand Up @@ -88,14 +97,19 @@ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings s
BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build();
ClientContext clientContext = ClientContext.create(baseSettings);
GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext);
return new EnhancedBigQueryReadStub(stub, baseSettings, clientContext);
return new EnhancedBigQueryReadStub(
stub, baseSettings, readRowsRetryAttemptListener, clientContext);
}

@InternalApi("Visible for testing")
EnhancedBigQueryReadStub(
GrpcBigQueryReadStub stub, BigQueryReadStubSettings stubSettings, ClientContext context) {
GrpcBigQueryReadStub stub,
BigQueryReadStubSettings stubSettings,
BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener,
ClientContext context) {
this.stub = stub;
this.stubSettings = stubSettings;
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
this.context = context;
}

Expand Down Expand Up @@ -123,7 +137,7 @@ public Map<String, String> extract(ReadRowsRequest request) {

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new ApiResultRetryAlgorithm<Void>(readRowsRetryAttemptListener),
new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =
Expand Down
Expand Up @@ -21,6 +21,8 @@
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.storage.util.Errors;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import io.grpc.Metadata;
import io.grpc.Status;
import org.threeten.bp.Duration;

Expand All @@ -30,17 +32,41 @@ public class ApiResultRetryAlgorithm<ResponseT> implements ResultRetryAlgorithm<
// Duration to sleep on if the error is DEADLINE_EXCEEDED.
public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1);

private final BigQueryReadSettings.RetryAttemptListener retryAttemptListener;

public ApiResultRetryAlgorithm() {
this(null);
}

public ApiResultRetryAlgorithm(BigQueryReadSettings.RetryAttemptListener retryAttemptListener) {
super();
this.retryAttemptListener = retryAttemptListener;
}

@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (Errors.isRetryableInternalStatus(status)) {
Metadata metadata = Status.trailersFromThrowable(prevThrowable);
Errors.IsRetryableStatusResult result = Errors.isRetryableStatus(status, metadata);
if (result.isRetryable) {
// If result.retryDelay isn't null, we know exactly how long we must wait, so both regular
// and randomized delays are the same.
Duration retryDelay = result.retryDelay;
Duration randomizedRetryDelay = result.retryDelay;
if (retryDelay == null) {
retryDelay = prevSettings.getRetryDelay();
randomizedRetryDelay = DEADLINE_SLEEP_DURATION;
}
if (retryAttemptListener != null) {
retryAttemptListener.onRetryAttempt(status, metadata);
}
return TimedAttemptSettings.newBuilder()
.setGlobalSettings(prevSettings.getGlobalSettings())
.setRetryDelay(prevSettings.getRetryDelay())
.setRetryDelay(retryDelay)
.setRpcTimeout(prevSettings.getRpcTimeout())
.setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION)
.setRandomizedRetryDelay(randomizedRetryDelay)
.setAttemptCount(prevSettings.getAttemptCount() + 1)
.setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos())
.build();
Expand All @@ -53,7 +79,8 @@ public TimedAttemptSettings createNextAttempt(
public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) {
if (prevThrowable != null) {
Status status = Status.fromThrowable(prevThrowable);
if (Errors.isRetryableInternalStatus(status)) {
Metadata metadata = Status.trailersFromThrowable(prevThrowable);
if (Errors.isRetryableStatus(status, metadata).isRetryable) {
return true;
}
}
Expand Down
Expand Up @@ -141,7 +141,9 @@ public static final BigQueryStorageClient create(EnhancedBigQueryStorageStub stu
*/
protected BigQueryStorageClient(BigQueryStorageSettings settings) throws IOException {
this.settings = settings;
this.stub = EnhancedBigQueryStorageStub.create(settings.getTypedStubSettings());
this.stub =
EnhancedBigQueryStorageStub.create(
settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener());
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
Expand Down
Expand Up @@ -37,6 +37,8 @@
import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamResponse;
import com.google.cloud.bigquery.storage.v1beta1.stub.EnhancedBigQueryStorageStubSettings;
import com.google.protobuf.Empty;
import io.grpc.Metadata;
import io.grpc.Status;
import java.io.IOException;
import java.util.List;

Expand Down Expand Up @@ -78,6 +80,26 @@ public ServerStreamingCallSettings<ReadRowsRequest, ReadRowsResponse> readRowsSe
return getTypedStubSettings().readRowsSettings();
}

public static interface RetryAttemptListener {
public void onRetryAttempt(Status prevStatus, Metadata prevMetadata);
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

/**
* If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt
* function before a failed ReadRows request is retried. This can be used as negative feedback
* mechanism for future decision to split read streams because some retried failures are due to
* resource exhaustion that increased parallelism only makes it worse.
*/
public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
}

public RetryAttemptListener getReadRowsRetryAttemptListener() {
return readRowsRetryAttemptListener;
}

/** Returns the object with the settings used for calls to batchCreateReadSessionStreams. */
public UnaryCallSettings<
BatchCreateReadSessionStreamsRequest, BatchCreateReadSessionStreamsResponse>
Expand Down Expand Up @@ -197,6 +219,14 @@ public Builder applyToAllUnaryMethods(
return this;
}

private RetryAttemptListener readRowsRetryAttemptListener = null;

public Builder setReadRowsRetryAttemptListener(
RetryAttemptListener readRowsRetryAttemptListener) {
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
return this;
}

/** Returns the builder for the settings used for calls to createReadSession. */
public UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession>
createReadSessionSettings() {
Expand Down Expand Up @@ -229,7 +259,9 @@ public UnaryCallSettings.Builder<FinalizeStreamRequest, Empty> finalizeStreamSet

@Override
public BigQueryStorageSettings build() throws IOException {
return new BigQueryStorageSettings(this);
BigQueryStorageSettings settings = new BigQueryStorageSettings(this);
settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener);
return settings;
}
}
}
Expand Up @@ -31,6 +31,7 @@
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageGrpc;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings;
import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsRequest;
import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsResponse;
import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
Expand Down Expand Up @@ -58,10 +59,18 @@ public class EnhancedBigQueryStorageStub implements BackgroundResource {
private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage";
private final GrpcBigQueryStorageStub stub;
private final BigQueryStorageStubSettings stubSettings;
private final BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener;
private final ClientContext context;

public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSettings settings)
throws IOException {
return create(settings, null);
}

public static EnhancedBigQueryStorageStub create(
EnhancedBigQueryStorageStubSettings settings,
BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener)
throws IOException {
// Configure the base settings.
BigQueryStorageStubSettings.Builder baseSettingsBuilder =
BigQueryStorageStubSettings.newBuilder()
Expand Down Expand Up @@ -107,16 +116,19 @@ public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSett
BigQueryStorageStubSettings baseSettings = baseSettingsBuilder.build();
ClientContext clientContext = ClientContext.create(baseSettings);
GrpcBigQueryStorageStub stub = new GrpcBigQueryStorageStub(baseSettings, clientContext);
return new EnhancedBigQueryStorageStub(stub, baseSettings, clientContext);
return new EnhancedBigQueryStorageStub(
stub, baseSettings, readRowsRetryAttemptListener, clientContext);
}

@InternalApi("Visible for testing")
EnhancedBigQueryStorageStub(
GrpcBigQueryStorageStub stub,
BigQueryStorageStubSettings stubSettings,
BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener,
ClientContext context) {
this.stub = stub;
this.stubSettings = stubSettings;
this.readRowsRetryAttemptListener = readRowsRetryAttemptListener;
this.context = context;
}

Expand Down Expand Up @@ -145,7 +157,7 @@ public Map<String, String> extract(ReadRowsRequest request) {

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new ApiResultRetryAlgorithm<Void>(readRowsRetryAttemptListener),
new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =
Expand Down

0 comments on commit 8b99da1

Please sign in to comment.