From d56e1caf91297d7c2e1e4a9ce1463c04e44619c0 Mon Sep 17 00:00:00 2001 From: esert-g <48071655+esert-g@users.noreply.github.com> Date: Tue, 24 Aug 2021 10:54:31 -0700 Subject: [PATCH] feat: retry certain RESOURCE_EXHAUSTED errors observed during ReadRows and report retry attempts (#1257) Bq Storage Read service will start returning a retryable RESOURCE_EXHAUSTED error in the next few weeks when a read session's parallelism is considered to be excessive, so this PR expands retry handling logic for ReadRows with 2 changes: 1. If a ReadRows request fails with a RESOURCE_EXHAUSTED error and the error has an associated RetryInfo, it is now considered to be retryable and retry delay is set according to the RetryInfo. 1. If the client decides to retry, it now notifies the user with the provided RetryAttemptListener object. This will be useful as a negative feedback mechanism for future SplitReadStream requests which in return will reduce the likelihood of receiving the new retryable RESOURCE_EXHAUSTED error. --- .../cloud/bigquery/storage/util/Errors.java | 41 ++++++ .../storage/v1/BigQueryReadClient.java | 4 +- .../storage/v1/BigQueryReadSettings.java | 34 ++++- .../v1/stub/EnhancedBigQueryReadStub.java | 20 ++- .../readrows/ApiResultRetryAlgorithm.java | 35 ++++- .../v1beta1/BigQueryStorageClient.java | 4 +- .../v1beta1/BigQueryStorageSettings.java | 34 ++++- .../stub/EnhancedBigQueryStorageStub.java | 16 ++- .../readrows/ApiResultRetryAlgorithm.java | 36 +++++- .../storage/v1beta2/BigQueryReadClient.java | 4 +- .../storage/v1beta2/BigQueryReadSettings.java | 34 ++++- .../stub/EnhancedBigQueryReadStub.java | 20 ++- .../readrows/ApiResultRetryAlgorithm.java | 35 ++++- .../bigquery/storage/util/ErrorsTest.java | 80 ++++++++++++ .../storage/v1/BigQueryReadClientTest.java | 120 ++++++++++++++++++ .../v1beta1/BigQueryStorageClientTest.java | 120 ++++++++++++++++++ .../v1beta2/BigQueryReadClientTest.java | 120 ++++++++++++++++++ 17 files changed, 731 insertions(+), 26 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java index 067f8d242d..5d73e0cc83 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java @@ -15,12 +15,53 @@ */ package com.google.cloud.bigquery.storage.util; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.protobuf.ProtoUtils; +import org.threeten.bp.Duration; /** Static utility methods for working with Errors returned from the service. */ public class Errors { private Errors() {}; + public static class IsRetryableStatusResult { + public boolean isRetryable = false; + public Duration retryDelay = null; + } + + private static final Metadata.Key KEY_RETRY_INFO = + ProtoUtils.keyForProto(RetryInfo.getDefaultInstance()); + + /** + * Returns true iff the Status indicates an error that is retryable. + * + *

Generally, internal errors are not considered retryable, however there are certain transient + * network issues that appear as internal but are in fact retryable. + * + *

Resource exhausted errors are only considered retryable if metadata contains a serialized + * RetryInfo object. + */ + public static IsRetryableStatusResult isRetryableStatus(Status status, Metadata metadata) { + IsRetryableStatusResult result = new IsRetryableStatusResult(); + + result.isRetryable = isRetryableInternalStatus(status); + if (!result.isRetryable + && status.getCode() == Status.Code.RESOURCE_EXHAUSTED + && metadata != null + && metadata.containsKey(KEY_RETRY_INFO)) { + RetryInfo retryInfo = metadata.get(KEY_RETRY_INFO); + if (retryInfo.hasRetryDelay()) { + result.isRetryable = true; + result.retryDelay = + Duration.ofSeconds( + retryInfo.getRetryDelay().getSeconds(), retryInfo.getRetryDelay().getNanos()); + } + } + + return result; + } + /** * Returns true iff the Status indicates and internal error that is retryable. * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java index e3fbf56165..172f85d73d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java @@ -126,7 +126,9 @@ public static final BigQueryReadClient create(EnhancedBigQueryReadStub stub) { */ protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException { this.settings = settings; - this.stub = EnhancedBigQueryReadStub.create(settings.getTypedStubSettings()); + this.stub = + EnhancedBigQueryReadStub.create( + settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener()); } @BetaApi("A restructuring of stub classes is planned, so this may break in the future") diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java index fcf02a2331..d4f26c432b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java @@ -27,6 +27,8 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStubSettings; +import io.grpc.Metadata; +import io.grpc.Status; import java.io.IOException; import java.util.List; @@ -69,6 +71,26 @@ public ServerStreamingCallSettings readRowsSe return getTypedStubSettings().readRowsSettings(); } + public static interface RetryAttemptListener { + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata); + } + + private RetryAttemptListener readRowsRetryAttemptListener = null; + + /** + * If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt + * function before a failed ReadRows request is retried. This can be used as negative feedback + * mechanism for future decision to split read streams because some retried failures are due to + * resource exhaustion that increased parallelism only makes it worse. + */ + public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + } + + public RetryAttemptListener getReadRowsRetryAttemptListener() { + return readRowsRetryAttemptListener; + } + /** Returns the object with the settings used for calls to splitReadStream. */ public UnaryCallSettings splitReadStreamSettings() { @@ -176,6 +198,14 @@ public Builder applyToAllUnaryMethods( return this; } + private RetryAttemptListener readRowsRetryAttemptListener = null; + + public Builder setReadRowsRetryAttemptListener( + RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + return this; + } + /** Returns the builder for the settings used for calls to createReadSession. */ public UnaryCallSettings.Builder createReadSessionSettings() { @@ -196,7 +226,9 @@ public Builder applyToAllUnaryMethods( @Override public BigQueryReadSettings build() throws IOException { - return new BigQueryReadSettings(this); + BigQueryReadSettings settings = new BigQueryReadSettings(this); + settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener); + return settings; } } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java index 3d8e3ea0ff..1407c254c0 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java @@ -31,6 +31,7 @@ import com.google.api.gax.tracing.SpanName; import com.google.api.gax.tracing.TracedServerStreamingCallable; import com.google.cloud.bigquery.storage.v1.BigQueryReadGrpc; +import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; @@ -54,10 +55,18 @@ public class EnhancedBigQueryReadStub implements BackgroundResource { private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage"; private final GrpcBigQueryReadStub stub; private final BigQueryReadStubSettings stubSettings; + private final BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener; private final ClientContext context; public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings) throws IOException { + return create(settings, null); + } + + public static EnhancedBigQueryReadStub create( + EnhancedBigQueryReadStubSettings settings, + BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener) + throws IOException { // Configure the base settings. BigQueryReadStubSettings.Builder baseSettingsBuilder = BigQueryReadStubSettings.newBuilder() @@ -88,14 +97,19 @@ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings s BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build(); ClientContext clientContext = ClientContext.create(baseSettings); GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext); - return new EnhancedBigQueryReadStub(stub, baseSettings, clientContext); + return new EnhancedBigQueryReadStub( + stub, baseSettings, readRowsRetryAttemptListener, clientContext); } @InternalApi("Visible for testing") EnhancedBigQueryReadStub( - GrpcBigQueryReadStub stub, BigQueryReadStubSettings stubSettings, ClientContext context) { + GrpcBigQueryReadStub stub, + BigQueryReadStubSettings stubSettings, + BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener, + ClientContext context) { this.stub = stub; this.stubSettings = stubSettings; + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; this.context = context; } @@ -123,7 +137,7 @@ public Map extract(ReadRowsRequest request) { StreamingRetryAlgorithm retryAlgorithm = new StreamingRetryAlgorithm<>( - new ApiResultRetryAlgorithm(), + new ApiResultRetryAlgorithm(readRowsRetryAttemptListener), new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock())); ScheduledRetryingExecutor retryingExecutor = diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java index 6e1269ae07..10bb440e5a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java @@ -21,6 +21,8 @@ import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.ApiException; import com.google.cloud.bigquery.storage.util.Errors; +import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; +import io.grpc.Metadata; import io.grpc.Status; import org.threeten.bp.Duration; @@ -30,17 +32,41 @@ public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm< // Duration to sleep on if the error is DEADLINE_EXCEEDED. public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + private final BigQueryReadSettings.RetryAttemptListener retryAttemptListener; + + public ApiResultRetryAlgorithm() { + this(null); + } + + public ApiResultRetryAlgorithm(BigQueryReadSettings.RetryAttemptListener retryAttemptListener) { + super(); + this.retryAttemptListener = retryAttemptListener; + } + @Override public TimedAttemptSettings createNextAttempt( Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + Errors.IsRetryableStatusResult result = Errors.isRetryableStatus(status, metadata); + if (result.isRetryable) { + // If result.retryDelay isn't null, we know exactly how long we must wait, so both regular + // and randomized delays are the same. + Duration retryDelay = result.retryDelay; + Duration randomizedRetryDelay = result.retryDelay; + if (retryDelay == null) { + retryDelay = prevSettings.getRetryDelay(); + randomizedRetryDelay = DEADLINE_SLEEP_DURATION; + } + if (retryAttemptListener != null) { + retryAttemptListener.onRetryAttempt(status, metadata); + } return TimedAttemptSettings.newBuilder() .setGlobalSettings(prevSettings.getGlobalSettings()) - .setRetryDelay(prevSettings.getRetryDelay()) + .setRetryDelay(retryDelay) .setRpcTimeout(prevSettings.getRpcTimeout()) - .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION) + .setRandomizedRetryDelay(randomizedRetryDelay) .setAttemptCount(prevSettings.getAttemptCount() + 1) .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos()) .build(); @@ -53,7 +79,8 @@ public TimedAttemptSettings createNextAttempt( public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + if (Errors.isRetryableStatus(status, metadata).isRetryable) { return true; } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClient.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClient.java index ec5b048992..38d4504ce7 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClient.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClient.java @@ -141,7 +141,9 @@ public static final BigQueryStorageClient create(EnhancedBigQueryStorageStub stu */ protected BigQueryStorageClient(BigQueryStorageSettings settings) throws IOException { this.settings = settings; - this.stub = EnhancedBigQueryStorageStub.create(settings.getTypedStubSettings()); + this.stub = + EnhancedBigQueryStorageStub.create( + settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener()); } @BetaApi("A restructuring of stub classes is planned, so this may break in the future") diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageSettings.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageSettings.java index 172ee7fa5a..d600a4e49d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageSettings.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageSettings.java @@ -37,6 +37,8 @@ import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamResponse; import com.google.cloud.bigquery.storage.v1beta1.stub.EnhancedBigQueryStorageStubSettings; import com.google.protobuf.Empty; +import io.grpc.Metadata; +import io.grpc.Status; import java.io.IOException; import java.util.List; @@ -78,6 +80,26 @@ public ServerStreamingCallSettings readRowsSe return getTypedStubSettings().readRowsSettings(); } + public static interface RetryAttemptListener { + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata); + } + + private RetryAttemptListener readRowsRetryAttemptListener = null; + + /** + * If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt + * function before a failed ReadRows request is retried. This can be used as negative feedback + * mechanism for future decision to split read streams because some retried failures are due to + * resource exhaustion that increased parallelism only makes it worse. + */ + public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + } + + public RetryAttemptListener getReadRowsRetryAttemptListener() { + return readRowsRetryAttemptListener; + } + /** Returns the object with the settings used for calls to batchCreateReadSessionStreams. */ public UnaryCallSettings< BatchCreateReadSessionStreamsRequest, BatchCreateReadSessionStreamsResponse> @@ -197,6 +219,14 @@ public Builder applyToAllUnaryMethods( return this; } + private RetryAttemptListener readRowsRetryAttemptListener = null; + + public Builder setReadRowsRetryAttemptListener( + RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + return this; + } + /** Returns the builder for the settings used for calls to createReadSession. */ public UnaryCallSettings.Builder createReadSessionSettings() { @@ -229,7 +259,9 @@ public UnaryCallSettings.Builder finalizeStreamSet @Override public BigQueryStorageSettings build() throws IOException { - return new BigQueryStorageSettings(this); + BigQueryStorageSettings settings = new BigQueryStorageSettings(this); + settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener); + return settings; } } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java index 5f449adf8f..5a1940dff7 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java @@ -31,6 +31,7 @@ import com.google.api.gax.tracing.SpanName; import com.google.api.gax.tracing.TracedServerStreamingCallable; import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageGrpc; +import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings; import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsRequest; import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsResponse; import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest; @@ -58,10 +59,18 @@ public class EnhancedBigQueryStorageStub implements BackgroundResource { private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage"; private final GrpcBigQueryStorageStub stub; private final BigQueryStorageStubSettings stubSettings; + private final BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener; private final ClientContext context; public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSettings settings) throws IOException { + return create(settings, null); + } + + public static EnhancedBigQueryStorageStub create( + EnhancedBigQueryStorageStubSettings settings, + BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener) + throws IOException { // Configure the base settings. BigQueryStorageStubSettings.Builder baseSettingsBuilder = BigQueryStorageStubSettings.newBuilder() @@ -107,16 +116,19 @@ public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSett BigQueryStorageStubSettings baseSettings = baseSettingsBuilder.build(); ClientContext clientContext = ClientContext.create(baseSettings); GrpcBigQueryStorageStub stub = new GrpcBigQueryStorageStub(baseSettings, clientContext); - return new EnhancedBigQueryStorageStub(stub, baseSettings, clientContext); + return new EnhancedBigQueryStorageStub( + stub, baseSettings, readRowsRetryAttemptListener, clientContext); } @InternalApi("Visible for testing") EnhancedBigQueryStorageStub( GrpcBigQueryStorageStub stub, BigQueryStorageStubSettings stubSettings, + BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener, ClientContext context) { this.stub = stub; this.stubSettings = stubSettings; + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; this.context = context; } @@ -145,7 +157,7 @@ public Map extract(ReadRowsRequest request) { StreamingRetryAlgorithm retryAlgorithm = new StreamingRetryAlgorithm<>( - new ApiResultRetryAlgorithm(), + new ApiResultRetryAlgorithm(readRowsRetryAttemptListener), new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock())); ScheduledRetryingExecutor retryingExecutor = diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java index d9cf557a76..da8712584f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java @@ -21,6 +21,8 @@ import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.ApiException; import com.google.cloud.bigquery.storage.util.Errors; +import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings; +import io.grpc.Metadata; import io.grpc.Status; import org.threeten.bp.Duration; @@ -30,17 +32,42 @@ public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm< // Duration to sleep on if the error is DEADLINE_EXCEEDED. public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + private final BigQueryStorageSettings.RetryAttemptListener retryAttemptListener; + + public ApiResultRetryAlgorithm() { + this(null); + } + + public ApiResultRetryAlgorithm( + BigQueryStorageSettings.RetryAttemptListener retryAttemptListener) { + super(); + this.retryAttemptListener = retryAttemptListener; + } + @Override public TimedAttemptSettings createNextAttempt( Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + Errors.IsRetryableStatusResult result = Errors.isRetryableStatus(status, metadata); + if (result.isRetryable) { + // If result.retryDelay isn't null, we know exactly how long we must wait, so both regular + // and randomized delays are the same. + Duration retryDelay = result.retryDelay; + Duration randomizedRetryDelay = result.retryDelay; + if (retryDelay == null) { + retryDelay = prevSettings.getRetryDelay(); + randomizedRetryDelay = DEADLINE_SLEEP_DURATION; + } + if (retryAttemptListener != null) { + retryAttemptListener.onRetryAttempt(status, metadata); + } return TimedAttemptSettings.newBuilder() .setGlobalSettings(prevSettings.getGlobalSettings()) - .setRetryDelay(prevSettings.getRetryDelay()) + .setRetryDelay(retryDelay) .setRpcTimeout(prevSettings.getRpcTimeout()) - .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION) + .setRandomizedRetryDelay(randomizedRetryDelay) .setAttemptCount(prevSettings.getAttemptCount() + 1) .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos()) .build(); @@ -53,7 +80,8 @@ public TimedAttemptSettings createNextAttempt( public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + if (Errors.isRetryableStatus(status, metadata).isRetryable) { return true; } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClient.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClient.java index 23d61233df..ba549d9b97 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClient.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClient.java @@ -126,7 +126,9 @@ public static final BigQueryReadClient create(EnhancedBigQueryReadStub stub) { */ protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException { this.settings = settings; - this.stub = EnhancedBigQueryReadStub.create(settings.getTypedStubSettings()); + this.stub = + EnhancedBigQueryReadStub.create( + settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener()); } @BetaApi("A restructuring of stub classes is planned, so this may break in the future") diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadSettings.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadSettings.java index f18c9e19c6..4230a5fe8f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadSettings.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadSettings.java @@ -27,6 +27,8 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.cloud.bigquery.storage.v1beta2.stub.EnhancedBigQueryReadStubSettings; +import io.grpc.Metadata; +import io.grpc.Status; import java.io.IOException; import java.util.List; @@ -69,6 +71,26 @@ public ServerStreamingCallSettings readRowsSe return getTypedStubSettings().readRowsSettings(); } + public static interface RetryAttemptListener { + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata); + } + + private RetryAttemptListener readRowsRetryAttemptListener = null; + + /** + * If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt + * function before a failed ReadRows request is retried. This can be used as negative feedback + * mechanism for future decision to split read streams because some retried failures are due to + * resource exhaustion that increased parallelism only makes it worse. + */ + public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + } + + public RetryAttemptListener getReadRowsRetryAttemptListener() { + return readRowsRetryAttemptListener; + } + /** Returns the object with the settings used for calls to splitReadStream. */ public UnaryCallSettings splitReadStreamSettings() { @@ -176,6 +198,14 @@ public Builder applyToAllUnaryMethods( return this; } + private RetryAttemptListener readRowsRetryAttemptListener = null; + + public Builder setReadRowsRetryAttemptListener( + RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + return this; + } + /** Returns the builder for the settings used for calls to createReadSession. */ public UnaryCallSettings.Builder createReadSessionSettings() { @@ -196,7 +226,9 @@ public Builder applyToAllUnaryMethods( @Override public BigQueryReadSettings build() throws IOException { - return new BigQueryReadSettings(this); + BigQueryReadSettings settings = new BigQueryReadSettings(this); + settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener); + return settings; } } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java index 351fd21c4f..28870b7a47 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java @@ -31,6 +31,7 @@ import com.google.api.gax.tracing.SpanName; import com.google.api.gax.tracing.TracedServerStreamingCallable; import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadSettings; import com.google.cloud.bigquery.storage.v1beta2.CreateReadSessionRequest; import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1beta2.ReadRowsResponse; @@ -54,10 +55,18 @@ public class EnhancedBigQueryReadStub implements BackgroundResource { private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage"; private final GrpcBigQueryReadStub stub; private final BigQueryReadStubSettings stubSettings; + private final BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener; private final ClientContext context; public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings) throws IOException { + return create(settings, null); + } + + public static EnhancedBigQueryReadStub create( + EnhancedBigQueryReadStubSettings settings, + BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener) + throws IOException { // Configure the base settings. BigQueryReadStubSettings.Builder baseSettingsBuilder = BigQueryReadStubSettings.newBuilder() @@ -88,14 +97,19 @@ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings s BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build(); ClientContext clientContext = ClientContext.create(baseSettings); GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext); - return new EnhancedBigQueryReadStub(stub, baseSettings, clientContext); + return new EnhancedBigQueryReadStub( + stub, baseSettings, readRowsRetryAttemptListener, clientContext); } @InternalApi("Visible for testing") EnhancedBigQueryReadStub( - GrpcBigQueryReadStub stub, BigQueryReadStubSettings stubSettings, ClientContext context) { + GrpcBigQueryReadStub stub, + BigQueryReadStubSettings stubSettings, + BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener, + ClientContext context) { this.stub = stub; this.stubSettings = stubSettings; + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; this.context = context; } @@ -123,7 +137,7 @@ public Map extract(ReadRowsRequest request) { StreamingRetryAlgorithm retryAlgorithm = new StreamingRetryAlgorithm<>( - new ApiResultRetryAlgorithm(), + new ApiResultRetryAlgorithm(readRowsRetryAttemptListener), new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock())); ScheduledRetryingExecutor retryingExecutor = diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java index 2c887e1424..1d9cad46bd 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java @@ -21,6 +21,8 @@ import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.ApiException; import com.google.cloud.bigquery.storage.util.Errors; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadSettings; +import io.grpc.Metadata; import io.grpc.Status; import org.threeten.bp.Duration; @@ -30,17 +32,41 @@ public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm< // Duration to sleep on if the error is DEADLINE_EXCEEDED. public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + private final BigQueryReadSettings.RetryAttemptListener retryAttemptListener; + + public ApiResultRetryAlgorithm() { + this(null); + } + + public ApiResultRetryAlgorithm(BigQueryReadSettings.RetryAttemptListener retryAttemptListener) { + super(); + this.retryAttemptListener = retryAttemptListener; + } + @Override public TimedAttemptSettings createNextAttempt( Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + Errors.IsRetryableStatusResult result = Errors.isRetryableStatus(status, metadata); + if (result.isRetryable) { + // If result.retryDelay isn't null, we know exactly how long we must wait, so both regular + // and randomized delays are the same. + Duration retryDelay = result.retryDelay; + Duration randomizedRetryDelay = result.retryDelay; + if (retryDelay == null) { + retryDelay = prevSettings.getRetryDelay(); + randomizedRetryDelay = DEADLINE_SLEEP_DURATION; + } + if (retryAttemptListener != null) { + retryAttemptListener.onRetryAttempt(status, metadata); + } return TimedAttemptSettings.newBuilder() .setGlobalSettings(prevSettings.getGlobalSettings()) - .setRetryDelay(prevSettings.getRetryDelay()) + .setRetryDelay(retryDelay) .setRpcTimeout(prevSettings.getRpcTimeout()) - .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION) + .setRandomizedRetryDelay(randomizedRetryDelay) .setAttemptCount(prevSettings.getAttemptCount() + 1) .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos()) .build(); @@ -53,7 +79,8 @@ public TimedAttemptSettings createNextAttempt( public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + if (Errors.isRetryableStatus(status, metadata).isRetryable) { return true; } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/util/ErrorsTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/util/ErrorsTest.java index fa885b424a..bed97c7b7e 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/util/ErrorsTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/util/ErrorsTest.java @@ -15,9 +15,15 @@ */ package com.google.cloud.bigquery.storage.util; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import com.google.protobuf.Duration; +import com.google.protobuf.Parser; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; import org.junit.Test; import org.junit.runner.RunWith; @@ -51,4 +57,78 @@ public void testNonRetryableOtherError() { Status.DATA_LOSS.withDescription( "RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR"))); } + + @Test + public void testIsRetryableStatus() { + Errors.IsRetryableStatusResult result = + Errors.isRetryableStatus( + Status.INTERNAL.withDescription( + "HTTP/2 error code: INTERNAL_ERROR\nReceived Rst stream"), + null); + assertTrue(result.isRetryable); + assertNull(result.retryDelay); + + result = + Errors.isRetryableStatus( + Status.INTERNAL.withDescription( + "RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR"), + null); + assertTrue(result.isRetryable); + assertNull(result.retryDelay); + + Metadata metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "some-key-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(Integer value) { + return new byte[] {}; + } + + @Override + public Integer parseBytes(byte[] serialized) { + return new Integer(1); + } + }), + new Integer(2)); + result = + Errors.isRetryableStatus( + Status.RESOURCE_EXHAUSTED.withDescription("You have run out of X quota"), metadata); + assertFalse(result.isRetryable); + assertNull(result.retryDelay); + + RetryInfo retryInfo = + RetryInfo.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build()) + .build(); + + metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "google.rpc.retryinfo-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(RetryInfo value) { + return value.toByteArray(); + } + + @Override + public RetryInfo parseBytes(byte[] serialized) { + try { + Parser parser = (RetryInfo.newBuilder().build()).getParserForType(); + return parser.parseFrom(serialized); + } catch (Exception e) { + return null; + } + } + }), + retryInfo); + + result = + Errors.isRetryableStatus( + Status.RESOURCE_EXHAUSTED.withDescription("Stop for a while"), metadata); + assertTrue(result.isRetryable); + assertEquals(result.retryDelay, org.threeten.bp.Duration.ofSeconds(123, 456)); + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java index df83a5a01a..d0859bdda5 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java @@ -26,9 +26,14 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.InvalidArgumentException; +import com.google.api.gax.rpc.ResourceExhaustedException; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode; import com.google.protobuf.AbstractMessage; +import com.google.protobuf.Duration; +import com.google.protobuf.Parser; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -49,6 +54,8 @@ public class BigQueryReadClientTest { private static MockServiceHelper serviceHelper; private BigQueryReadClient client; private LocalChannelProvider channelProvider; + private int retryCount; + private Code lastRetryStatusCode; @BeforeClass public static void startStaticServer() { @@ -68,10 +75,22 @@ public static void stopServer() { public void setUp() throws IOException { serviceHelper.reset(); channelProvider = serviceHelper.createChannelProvider(); + retryCount = 0; + lastRetryStatusCode = Code.OK; BigQueryReadSettings settings = BigQueryReadSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) + .setReadRowsRetryAttemptListener( + new BigQueryReadSettings.RetryAttemptListener() { + @Override + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata) { + synchronized (this) { + retryCount += 1; + lastRetryStatusCode = prevStatus.getCode(); + } + } + }) .build(); client = BigQueryReadClient.create(settings); } @@ -143,6 +162,9 @@ public void readRowsTest() throws Exception { List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); Assert.assertEquals(expectedResponse, actualResponses.get(0)); + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -165,6 +187,9 @@ public void readRowsExceptionTest() throws Exception { InvalidArgumentException apiException = (InvalidArgumentException) e.getCause(); Assert.assertEquals(StatusCode.Code.INVALID_ARGUMENT, apiException.getStatusCode().getCode()); } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -189,6 +214,9 @@ public void readRowsRetryingEOSExceptionTest() throws ExecutionException, Interr callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); } @Test @@ -213,5 +241,97 @@ public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, Inte callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithoutRetryInfo() + throws ExecutionException, InterruptedException { + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription("You are out of quota X")), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + + try { + List actualResponses = responseObserver.future().get(); + Assert.fail("No exception thrown"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof ResourceExhaustedException); + ResourceExhaustedException apiException = (ResourceExhaustedException) e.getCause(); + Assert.assertEquals( + StatusCode.Code.RESOURCE_EXHAUSTED, apiException.getStatusCode().getCode()); + } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithRetryInfo() + throws ExecutionException, InterruptedException { + RetryInfo retryInfo = + RetryInfo.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build()) + .build(); + + Metadata metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "google.rpc.retryinfo-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(RetryInfo value) { + return value.toByteArray(); + } + + @Override + public RetryInfo parseBytes(byte[] serialized) { + try { + Parser parser = (RetryInfo.newBuilder().build()).getParserForType(); + return parser.parseFrom(serialized); + } catch (Exception e) { + return null; + } + } + }), + retryInfo); + + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription("Try again in a bit"), metadata), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.RESOURCE_EXHAUSTED); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java index 9dc725c9a1..3ec383cc31 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java @@ -26,6 +26,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.InvalidArgumentException; +import com.google.api.gax.rpc.ResourceExhaustedException; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode; import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsRequest; @@ -41,7 +42,11 @@ import com.google.cloud.bigquery.storage.v1beta1.Storage.StreamPosition; import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto.TableReference; import com.google.protobuf.AbstractMessage; +import com.google.protobuf.Duration; import com.google.protobuf.Empty; +import com.google.protobuf.Parser; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -61,6 +66,8 @@ public class BigQueryStorageClientTest { private static MockServiceHelper serviceHelper; private BigQueryStorageClient client; private LocalChannelProvider channelProvider; + private int retryCount; + private Code lastRetryStatusCode; @BeforeClass public static void startStaticServer() { @@ -79,10 +86,22 @@ public static void stopServer() { public void setUp() throws IOException { serviceHelper.reset(); channelProvider = serviceHelper.createChannelProvider(); + retryCount = 0; + lastRetryStatusCode = Code.OK; BigQueryStorageSettings settings = BigQueryStorageSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) + .setReadRowsRetryAttemptListener( + new BigQueryStorageSettings.RetryAttemptListener() { + @Override + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata) { + synchronized (this) { + retryCount += 1; + lastRetryStatusCode = prevStatus.getCode(); + } + } + }) .build(); client = BigQueryStorageClient.create(settings); } @@ -153,6 +172,9 @@ public void readRowsTest() throws Exception { List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); Assert.assertEquals(expectedResponse, actualResponses.get(0)); + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -176,6 +198,9 @@ public void readRowsExceptionTest() throws Exception { InvalidArgumentException apiException = (InvalidArgumentException) e.getCause(); Assert.assertEquals(StatusCode.Code.INVALID_ARGUMENT, apiException.getStatusCode().getCode()); } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -319,6 +344,9 @@ public void readRowsRetryingEOSExceptionTest() throws ExecutionException, Interr callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); } @Test @@ -343,5 +371,97 @@ public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, Inte callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithoutRetryInfo() + throws ExecutionException, InterruptedException { + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription("You are out of quota X")), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryStorage.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryStorage.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + + try { + List actualResponses = responseObserver.future().get(); + Assert.fail("No exception thrown"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof ResourceExhaustedException); + ResourceExhaustedException apiException = (ResourceExhaustedException) e.getCause(); + Assert.assertEquals( + StatusCode.Code.RESOURCE_EXHAUSTED, apiException.getStatusCode().getCode()); + } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithRetryInfo() + throws ExecutionException, InterruptedException { + RetryInfo retryInfo = + RetryInfo.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build()) + .build(); + + Metadata metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "google.rpc.retryinfo-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(RetryInfo value) { + return value.toByteArray(); + } + + @Override + public RetryInfo parseBytes(byte[] serialized) { + try { + Parser parser = (RetryInfo.newBuilder().build()).getParserForType(); + return parser.parseFrom(serialized); + } catch (Exception e) { + return null; + } + } + }), + retryInfo); + + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription("Try again in a bit"), metadata), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryStorage.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryStorage.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.RESOURCE_EXHAUSTED); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java index 90bea22573..a551146bbc 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java @@ -26,9 +26,14 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.InvalidArgumentException; +import com.google.api.gax.rpc.ResourceExhaustedException; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode; import com.google.protobuf.AbstractMessage; +import com.google.protobuf.Duration; +import com.google.protobuf.Parser; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -49,6 +54,8 @@ public class BigQueryReadClientTest { private static MockServiceHelper serviceHelper; private BigQueryReadClient client; private LocalChannelProvider channelProvider; + private int retryCount; + private Code lastRetryStatusCode; @BeforeClass public static void startStaticServer() { @@ -68,10 +75,22 @@ public static void stopServer() { public void setUp() throws IOException { serviceHelper.reset(); channelProvider = serviceHelper.createChannelProvider(); + retryCount = 0; + lastRetryStatusCode = Code.OK; BigQueryReadSettings settings = BigQueryReadSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) + .setReadRowsRetryAttemptListener( + new BigQueryReadSettings.RetryAttemptListener() { + @Override + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata) { + synchronized (this) { + retryCount += 1; + lastRetryStatusCode = prevStatus.getCode(); + } + } + }) .build(); client = BigQueryReadClient.create(settings); } @@ -143,6 +162,9 @@ public void readRowsTest() throws Exception { List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); Assert.assertEquals(expectedResponse, actualResponses.get(0)); + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -165,6 +187,9 @@ public void readRowsExceptionTest() throws Exception { InvalidArgumentException apiException = (InvalidArgumentException) e.getCause(); Assert.assertEquals(StatusCode.Code.INVALID_ARGUMENT, apiException.getStatusCode().getCode()); } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -189,6 +214,9 @@ public void readRowsRetryingEOSExceptionTest() throws ExecutionException, Interr callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); } @Test @@ -213,5 +241,97 @@ public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, Inte callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithoutRetryInfo() + throws ExecutionException, InterruptedException { + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription("You are out of quota X")), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + + try { + List actualResponses = responseObserver.future().get(); + Assert.fail("No exception thrown"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof ResourceExhaustedException); + ResourceExhaustedException apiException = (ResourceExhaustedException) e.getCause(); + Assert.assertEquals( + StatusCode.Code.RESOURCE_EXHAUSTED, apiException.getStatusCode().getCode()); + } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithRetryInfo() + throws ExecutionException, InterruptedException { + RetryInfo retryInfo = + RetryInfo.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build()) + .build(); + + Metadata metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "google.rpc.retryinfo-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(RetryInfo value) { + return value.toByteArray(); + } + + @Override + public RetryInfo parseBytes(byte[] serialized) { + try { + Parser parser = (RetryInfo.newBuilder().build()).getParserForType(); + return parser.parseFrom(serialized); + } catch (Exception e) { + return null; + } + } + }), + retryInfo); + + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription("Try again in a bit"), metadata), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.RESOURCE_EXHAUSTED); } }