diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java index 067f8d242d..5d73e0cc83 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/util/Errors.java @@ -15,12 +15,53 @@ */ package com.google.cloud.bigquery.storage.util; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.protobuf.ProtoUtils; +import org.threeten.bp.Duration; /** Static utility methods for working with Errors returned from the service. */ public class Errors { private Errors() {}; + public static class IsRetryableStatusResult { + public boolean isRetryable = false; + public Duration retryDelay = null; + } + + private static final Metadata.Key KEY_RETRY_INFO = + ProtoUtils.keyForProto(RetryInfo.getDefaultInstance()); + + /** + * Returns true iff the Status indicates an error that is retryable. + * + *

Generally, internal errors are not considered retryable, however there are certain transient + * network issues that appear as internal but are in fact retryable. + * + *

Resource exhausted errors are only considered retryable if metadata contains a serialized + * RetryInfo object. + */ + public static IsRetryableStatusResult isRetryableStatus(Status status, Metadata metadata) { + IsRetryableStatusResult result = new IsRetryableStatusResult(); + + result.isRetryable = isRetryableInternalStatus(status); + if (!result.isRetryable + && status.getCode() == Status.Code.RESOURCE_EXHAUSTED + && metadata != null + && metadata.containsKey(KEY_RETRY_INFO)) { + RetryInfo retryInfo = metadata.get(KEY_RETRY_INFO); + if (retryInfo.hasRetryDelay()) { + result.isRetryable = true; + result.retryDelay = + Duration.ofSeconds( + retryInfo.getRetryDelay().getSeconds(), retryInfo.getRetryDelay().getNanos()); + } + } + + return result; + } + /** * Returns true iff the Status indicates and internal error that is retryable. * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java index e3fbf56165..172f85d73d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClient.java @@ -126,7 +126,9 @@ public static final BigQueryReadClient create(EnhancedBigQueryReadStub stub) { */ protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException { this.settings = settings; - this.stub = EnhancedBigQueryReadStub.create(settings.getTypedStubSettings()); + this.stub = + EnhancedBigQueryReadStub.create( + settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener()); } @BetaApi("A restructuring of stub classes is planned, so this may break in the future") diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java index fcf02a2331..d4f26c432b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigQueryReadSettings.java @@ -27,6 +27,8 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStubSettings; +import io.grpc.Metadata; +import io.grpc.Status; import java.io.IOException; import java.util.List; @@ -69,6 +71,26 @@ public ServerStreamingCallSettings readRowsSe return getTypedStubSettings().readRowsSettings(); } + public static interface RetryAttemptListener { + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata); + } + + private RetryAttemptListener readRowsRetryAttemptListener = null; + + /** + * If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt + * function before a failed ReadRows request is retried. This can be used as negative feedback + * mechanism for future decision to split read streams because some retried failures are due to + * resource exhaustion that increased parallelism only makes it worse. + */ + public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + } + + public RetryAttemptListener getReadRowsRetryAttemptListener() { + return readRowsRetryAttemptListener; + } + /** Returns the object with the settings used for calls to splitReadStream. */ public UnaryCallSettings splitReadStreamSettings() { @@ -176,6 +198,14 @@ public Builder applyToAllUnaryMethods( return this; } + private RetryAttemptListener readRowsRetryAttemptListener = null; + + public Builder setReadRowsRetryAttemptListener( + RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + return this; + } + /** Returns the builder for the settings used for calls to createReadSession. */ public UnaryCallSettings.Builder createReadSessionSettings() { @@ -196,7 +226,9 @@ public Builder applyToAllUnaryMethods( @Override public BigQueryReadSettings build() throws IOException { - return new BigQueryReadSettings(this); + BigQueryReadSettings settings = new BigQueryReadSettings(this); + settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener); + return settings; } } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java index 3d8e3ea0ff..1407c254c0 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/EnhancedBigQueryReadStub.java @@ -31,6 +31,7 @@ import com.google.api.gax.tracing.SpanName; import com.google.api.gax.tracing.TracedServerStreamingCallable; import com.google.cloud.bigquery.storage.v1.BigQueryReadGrpc; +import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; @@ -54,10 +55,18 @@ public class EnhancedBigQueryReadStub implements BackgroundResource { private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage"; private final GrpcBigQueryReadStub stub; private final BigQueryReadStubSettings stubSettings; + private final BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener; private final ClientContext context; public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings) throws IOException { + return create(settings, null); + } + + public static EnhancedBigQueryReadStub create( + EnhancedBigQueryReadStubSettings settings, + BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener) + throws IOException { // Configure the base settings. BigQueryReadStubSettings.Builder baseSettingsBuilder = BigQueryReadStubSettings.newBuilder() @@ -88,14 +97,19 @@ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings s BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build(); ClientContext clientContext = ClientContext.create(baseSettings); GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext); - return new EnhancedBigQueryReadStub(stub, baseSettings, clientContext); + return new EnhancedBigQueryReadStub( + stub, baseSettings, readRowsRetryAttemptListener, clientContext); } @InternalApi("Visible for testing") EnhancedBigQueryReadStub( - GrpcBigQueryReadStub stub, BigQueryReadStubSettings stubSettings, ClientContext context) { + GrpcBigQueryReadStub stub, + BigQueryReadStubSettings stubSettings, + BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener, + ClientContext context) { this.stub = stub; this.stubSettings = stubSettings; + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; this.context = context; } @@ -123,7 +137,7 @@ public Map extract(ReadRowsRequest request) { StreamingRetryAlgorithm retryAlgorithm = new StreamingRetryAlgorithm<>( - new ApiResultRetryAlgorithm(), + new ApiResultRetryAlgorithm(readRowsRetryAttemptListener), new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock())); ScheduledRetryingExecutor retryingExecutor = diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java index 6e1269ae07..10bb440e5a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/stub/readrows/ApiResultRetryAlgorithm.java @@ -21,6 +21,8 @@ import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.ApiException; import com.google.cloud.bigquery.storage.util.Errors; +import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; +import io.grpc.Metadata; import io.grpc.Status; import org.threeten.bp.Duration; @@ -30,17 +32,41 @@ public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm< // Duration to sleep on if the error is DEADLINE_EXCEEDED. public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + private final BigQueryReadSettings.RetryAttemptListener retryAttemptListener; + + public ApiResultRetryAlgorithm() { + this(null); + } + + public ApiResultRetryAlgorithm(BigQueryReadSettings.RetryAttemptListener retryAttemptListener) { + super(); + this.retryAttemptListener = retryAttemptListener; + } + @Override public TimedAttemptSettings createNextAttempt( Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + Errors.IsRetryableStatusResult result = Errors.isRetryableStatus(status, metadata); + if (result.isRetryable) { + // If result.retryDelay isn't null, we know exactly how long we must wait, so both regular + // and randomized delays are the same. + Duration retryDelay = result.retryDelay; + Duration randomizedRetryDelay = result.retryDelay; + if (retryDelay == null) { + retryDelay = prevSettings.getRetryDelay(); + randomizedRetryDelay = DEADLINE_SLEEP_DURATION; + } + if (retryAttemptListener != null) { + retryAttemptListener.onRetryAttempt(status, metadata); + } return TimedAttemptSettings.newBuilder() .setGlobalSettings(prevSettings.getGlobalSettings()) - .setRetryDelay(prevSettings.getRetryDelay()) + .setRetryDelay(retryDelay) .setRpcTimeout(prevSettings.getRpcTimeout()) - .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION) + .setRandomizedRetryDelay(randomizedRetryDelay) .setAttemptCount(prevSettings.getAttemptCount() + 1) .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos()) .build(); @@ -53,7 +79,8 @@ public TimedAttemptSettings createNextAttempt( public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + if (Errors.isRetryableStatus(status, metadata).isRetryable) { return true; } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClient.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClient.java index ec5b048992..38d4504ce7 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClient.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClient.java @@ -141,7 +141,9 @@ public static final BigQueryStorageClient create(EnhancedBigQueryStorageStub stu */ protected BigQueryStorageClient(BigQueryStorageSettings settings) throws IOException { this.settings = settings; - this.stub = EnhancedBigQueryStorageStub.create(settings.getTypedStubSettings()); + this.stub = + EnhancedBigQueryStorageStub.create( + settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener()); } @BetaApi("A restructuring of stub classes is planned, so this may break in the future") diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageSettings.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageSettings.java index 172ee7fa5a..d600a4e49d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageSettings.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageSettings.java @@ -37,6 +37,8 @@ import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamResponse; import com.google.cloud.bigquery.storage.v1beta1.stub.EnhancedBigQueryStorageStubSettings; import com.google.protobuf.Empty; +import io.grpc.Metadata; +import io.grpc.Status; import java.io.IOException; import java.util.List; @@ -78,6 +80,26 @@ public ServerStreamingCallSettings readRowsSe return getTypedStubSettings().readRowsSettings(); } + public static interface RetryAttemptListener { + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata); + } + + private RetryAttemptListener readRowsRetryAttemptListener = null; + + /** + * If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt + * function before a failed ReadRows request is retried. This can be used as negative feedback + * mechanism for future decision to split read streams because some retried failures are due to + * resource exhaustion that increased parallelism only makes it worse. + */ + public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + } + + public RetryAttemptListener getReadRowsRetryAttemptListener() { + return readRowsRetryAttemptListener; + } + /** Returns the object with the settings used for calls to batchCreateReadSessionStreams. */ public UnaryCallSettings< BatchCreateReadSessionStreamsRequest, BatchCreateReadSessionStreamsResponse> @@ -197,6 +219,14 @@ public Builder applyToAllUnaryMethods( return this; } + private RetryAttemptListener readRowsRetryAttemptListener = null; + + public Builder setReadRowsRetryAttemptListener( + RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + return this; + } + /** Returns the builder for the settings used for calls to createReadSession. */ public UnaryCallSettings.Builder createReadSessionSettings() { @@ -229,7 +259,9 @@ public UnaryCallSettings.Builder finalizeStreamSet @Override public BigQueryStorageSettings build() throws IOException { - return new BigQueryStorageSettings(this); + BigQueryStorageSettings settings = new BigQueryStorageSettings(this); + settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener); + return settings; } } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java index 5f449adf8f..5a1940dff7 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/EnhancedBigQueryStorageStub.java @@ -31,6 +31,7 @@ import com.google.api.gax.tracing.SpanName; import com.google.api.gax.tracing.TracedServerStreamingCallable; import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageGrpc; +import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings; import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsRequest; import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsResponse; import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest; @@ -58,10 +59,18 @@ public class EnhancedBigQueryStorageStub implements BackgroundResource { private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage"; private final GrpcBigQueryStorageStub stub; private final BigQueryStorageStubSettings stubSettings; + private final BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener; private final ClientContext context; public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSettings settings) throws IOException { + return create(settings, null); + } + + public static EnhancedBigQueryStorageStub create( + EnhancedBigQueryStorageStubSettings settings, + BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener) + throws IOException { // Configure the base settings. BigQueryStorageStubSettings.Builder baseSettingsBuilder = BigQueryStorageStubSettings.newBuilder() @@ -107,16 +116,19 @@ public static EnhancedBigQueryStorageStub create(EnhancedBigQueryStorageStubSett BigQueryStorageStubSettings baseSettings = baseSettingsBuilder.build(); ClientContext clientContext = ClientContext.create(baseSettings); GrpcBigQueryStorageStub stub = new GrpcBigQueryStorageStub(baseSettings, clientContext); - return new EnhancedBigQueryStorageStub(stub, baseSettings, clientContext); + return new EnhancedBigQueryStorageStub( + stub, baseSettings, readRowsRetryAttemptListener, clientContext); } @InternalApi("Visible for testing") EnhancedBigQueryStorageStub( GrpcBigQueryStorageStub stub, BigQueryStorageStubSettings stubSettings, + BigQueryStorageSettings.RetryAttemptListener readRowsRetryAttemptListener, ClientContext context) { this.stub = stub; this.stubSettings = stubSettings; + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; this.context = context; } @@ -145,7 +157,7 @@ public Map extract(ReadRowsRequest request) { StreamingRetryAlgorithm retryAlgorithm = new StreamingRetryAlgorithm<>( - new ApiResultRetryAlgorithm(), + new ApiResultRetryAlgorithm(readRowsRetryAttemptListener), new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock())); ScheduledRetryingExecutor retryingExecutor = diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java index d9cf557a76..da8712584f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ApiResultRetryAlgorithm.java @@ -21,6 +21,8 @@ import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.ApiException; import com.google.cloud.bigquery.storage.util.Errors; +import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings; +import io.grpc.Metadata; import io.grpc.Status; import org.threeten.bp.Duration; @@ -30,17 +32,42 @@ public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm< // Duration to sleep on if the error is DEADLINE_EXCEEDED. public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + private final BigQueryStorageSettings.RetryAttemptListener retryAttemptListener; + + public ApiResultRetryAlgorithm() { + this(null); + } + + public ApiResultRetryAlgorithm( + BigQueryStorageSettings.RetryAttemptListener retryAttemptListener) { + super(); + this.retryAttemptListener = retryAttemptListener; + } + @Override public TimedAttemptSettings createNextAttempt( Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + Errors.IsRetryableStatusResult result = Errors.isRetryableStatus(status, metadata); + if (result.isRetryable) { + // If result.retryDelay isn't null, we know exactly how long we must wait, so both regular + // and randomized delays are the same. + Duration retryDelay = result.retryDelay; + Duration randomizedRetryDelay = result.retryDelay; + if (retryDelay == null) { + retryDelay = prevSettings.getRetryDelay(); + randomizedRetryDelay = DEADLINE_SLEEP_DURATION; + } + if (retryAttemptListener != null) { + retryAttemptListener.onRetryAttempt(status, metadata); + } return TimedAttemptSettings.newBuilder() .setGlobalSettings(prevSettings.getGlobalSettings()) - .setRetryDelay(prevSettings.getRetryDelay()) + .setRetryDelay(retryDelay) .setRpcTimeout(prevSettings.getRpcTimeout()) - .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION) + .setRandomizedRetryDelay(randomizedRetryDelay) .setAttemptCount(prevSettings.getAttemptCount() + 1) .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos()) .build(); @@ -53,7 +80,8 @@ public TimedAttemptSettings createNextAttempt( public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + if (Errors.isRetryableStatus(status, metadata).isRetryable) { return true; } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClient.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClient.java index 23d61233df..ba549d9b97 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClient.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClient.java @@ -126,7 +126,9 @@ public static final BigQueryReadClient create(EnhancedBigQueryReadStub stub) { */ protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException { this.settings = settings; - this.stub = EnhancedBigQueryReadStub.create(settings.getTypedStubSettings()); + this.stub = + EnhancedBigQueryReadStub.create( + settings.getTypedStubSettings(), settings.getReadRowsRetryAttemptListener()); } @BetaApi("A restructuring of stub classes is planned, so this may break in the future") diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadSettings.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadSettings.java index f18c9e19c6..4230a5fe8f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadSettings.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadSettings.java @@ -27,6 +27,8 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.cloud.bigquery.storage.v1beta2.stub.EnhancedBigQueryReadStubSettings; +import io.grpc.Metadata; +import io.grpc.Status; import java.io.IOException; import java.util.List; @@ -69,6 +71,26 @@ public ServerStreamingCallSettings readRowsSe return getTypedStubSettings().readRowsSettings(); } + public static interface RetryAttemptListener { + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata); + } + + private RetryAttemptListener readRowsRetryAttemptListener = null; + + /** + * If a non null readRowsRetryAttemptListener is provided, client will call onRetryAttempt + * function before a failed ReadRows request is retried. This can be used as negative feedback + * mechanism for future decision to split read streams because some retried failures are due to + * resource exhaustion that increased parallelism only makes it worse. + */ + public void setReadRowsRetryAttemptListener(RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + } + + public RetryAttemptListener getReadRowsRetryAttemptListener() { + return readRowsRetryAttemptListener; + } + /** Returns the object with the settings used for calls to splitReadStream. */ public UnaryCallSettings splitReadStreamSettings() { @@ -176,6 +198,14 @@ public Builder applyToAllUnaryMethods( return this; } + private RetryAttemptListener readRowsRetryAttemptListener = null; + + public Builder setReadRowsRetryAttemptListener( + RetryAttemptListener readRowsRetryAttemptListener) { + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; + return this; + } + /** Returns the builder for the settings used for calls to createReadSession. */ public UnaryCallSettings.Builder createReadSessionSettings() { @@ -196,7 +226,9 @@ public Builder applyToAllUnaryMethods( @Override public BigQueryReadSettings build() throws IOException { - return new BigQueryReadSettings(this); + BigQueryReadSettings settings = new BigQueryReadSettings(this); + settings.setReadRowsRetryAttemptListener(readRowsRetryAttemptListener); + return settings; } } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java index 351fd21c4f..28870b7a47 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/EnhancedBigQueryReadStub.java @@ -31,6 +31,7 @@ import com.google.api.gax.tracing.SpanName; import com.google.api.gax.tracing.TracedServerStreamingCallable; import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadSettings; import com.google.cloud.bigquery.storage.v1beta2.CreateReadSessionRequest; import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1beta2.ReadRowsResponse; @@ -54,10 +55,18 @@ public class EnhancedBigQueryReadStub implements BackgroundResource { private static final String TRACING_OUTER_CLIENT_NAME = "BigQueryStorage"; private final GrpcBigQueryReadStub stub; private final BigQueryReadStubSettings stubSettings; + private final BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener; private final ClientContext context; public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings settings) throws IOException { + return create(settings, null); + } + + public static EnhancedBigQueryReadStub create( + EnhancedBigQueryReadStubSettings settings, + BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener) + throws IOException { // Configure the base settings. BigQueryReadStubSettings.Builder baseSettingsBuilder = BigQueryReadStubSettings.newBuilder() @@ -88,14 +97,19 @@ public static EnhancedBigQueryReadStub create(EnhancedBigQueryReadStubSettings s BigQueryReadStubSettings baseSettings = baseSettingsBuilder.build(); ClientContext clientContext = ClientContext.create(baseSettings); GrpcBigQueryReadStub stub = new GrpcBigQueryReadStub(baseSettings, clientContext); - return new EnhancedBigQueryReadStub(stub, baseSettings, clientContext); + return new EnhancedBigQueryReadStub( + stub, baseSettings, readRowsRetryAttemptListener, clientContext); } @InternalApi("Visible for testing") EnhancedBigQueryReadStub( - GrpcBigQueryReadStub stub, BigQueryReadStubSettings stubSettings, ClientContext context) { + GrpcBigQueryReadStub stub, + BigQueryReadStubSettings stubSettings, + BigQueryReadSettings.RetryAttemptListener readRowsRetryAttemptListener, + ClientContext context) { this.stub = stub; this.stubSettings = stubSettings; + this.readRowsRetryAttemptListener = readRowsRetryAttemptListener; this.context = context; } @@ -123,7 +137,7 @@ public Map extract(ReadRowsRequest request) { StreamingRetryAlgorithm retryAlgorithm = new StreamingRetryAlgorithm<>( - new ApiResultRetryAlgorithm(), + new ApiResultRetryAlgorithm(readRowsRetryAttemptListener), new ExponentialRetryAlgorithm(callSettings.getRetrySettings(), context.getClock())); ScheduledRetryingExecutor retryingExecutor = diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java index 2c887e1424..1d9cad46bd 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/stub/readrows/ApiResultRetryAlgorithm.java @@ -21,6 +21,8 @@ import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.ApiException; import com.google.cloud.bigquery.storage.util.Errors; +import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadSettings; +import io.grpc.Metadata; import io.grpc.Status; import org.threeten.bp.Duration; @@ -30,17 +32,41 @@ public class ApiResultRetryAlgorithm implements ResultRetryAlgorithm< // Duration to sleep on if the error is DEADLINE_EXCEEDED. public static final Duration DEADLINE_SLEEP_DURATION = Duration.ofMillis(1); + private final BigQueryReadSettings.RetryAttemptListener retryAttemptListener; + + public ApiResultRetryAlgorithm() { + this(null); + } + + public ApiResultRetryAlgorithm(BigQueryReadSettings.RetryAttemptListener retryAttemptListener) { + super(); + this.retryAttemptListener = retryAttemptListener; + } + @Override public TimedAttemptSettings createNextAttempt( Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + Errors.IsRetryableStatusResult result = Errors.isRetryableStatus(status, metadata); + if (result.isRetryable) { + // If result.retryDelay isn't null, we know exactly how long we must wait, so both regular + // and randomized delays are the same. + Duration retryDelay = result.retryDelay; + Duration randomizedRetryDelay = result.retryDelay; + if (retryDelay == null) { + retryDelay = prevSettings.getRetryDelay(); + randomizedRetryDelay = DEADLINE_SLEEP_DURATION; + } + if (retryAttemptListener != null) { + retryAttemptListener.onRetryAttempt(status, metadata); + } return TimedAttemptSettings.newBuilder() .setGlobalSettings(prevSettings.getGlobalSettings()) - .setRetryDelay(prevSettings.getRetryDelay()) + .setRetryDelay(retryDelay) .setRpcTimeout(prevSettings.getRpcTimeout()) - .setRandomizedRetryDelay(DEADLINE_SLEEP_DURATION) + .setRandomizedRetryDelay(randomizedRetryDelay) .setAttemptCount(prevSettings.getAttemptCount() + 1) .setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos()) .build(); @@ -53,7 +79,8 @@ public TimedAttemptSettings createNextAttempt( public boolean shouldRetry(Throwable prevThrowable, ResponseT prevResponse) { if (prevThrowable != null) { Status status = Status.fromThrowable(prevThrowable); - if (Errors.isRetryableInternalStatus(status)) { + Metadata metadata = Status.trailersFromThrowable(prevThrowable); + if (Errors.isRetryableStatus(status, metadata).isRetryable) { return true; } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/util/ErrorsTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/util/ErrorsTest.java index fa885b424a..bed97c7b7e 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/util/ErrorsTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/util/ErrorsTest.java @@ -15,9 +15,15 @@ */ package com.google.cloud.bigquery.storage.util; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import com.google.protobuf.Duration; +import com.google.protobuf.Parser; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; import org.junit.Test; import org.junit.runner.RunWith; @@ -51,4 +57,78 @@ public void testNonRetryableOtherError() { Status.DATA_LOSS.withDescription( "RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR"))); } + + @Test + public void testIsRetryableStatus() { + Errors.IsRetryableStatusResult result = + Errors.isRetryableStatus( + Status.INTERNAL.withDescription( + "HTTP/2 error code: INTERNAL_ERROR\nReceived Rst stream"), + null); + assertTrue(result.isRetryable); + assertNull(result.retryDelay); + + result = + Errors.isRetryableStatus( + Status.INTERNAL.withDescription( + "RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR"), + null); + assertTrue(result.isRetryable); + assertNull(result.retryDelay); + + Metadata metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "some-key-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(Integer value) { + return new byte[] {}; + } + + @Override + public Integer parseBytes(byte[] serialized) { + return new Integer(1); + } + }), + new Integer(2)); + result = + Errors.isRetryableStatus( + Status.RESOURCE_EXHAUSTED.withDescription("You have run out of X quota"), metadata); + assertFalse(result.isRetryable); + assertNull(result.retryDelay); + + RetryInfo retryInfo = + RetryInfo.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build()) + .build(); + + metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "google.rpc.retryinfo-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(RetryInfo value) { + return value.toByteArray(); + } + + @Override + public RetryInfo parseBytes(byte[] serialized) { + try { + Parser parser = (RetryInfo.newBuilder().build()).getParserForType(); + return parser.parseFrom(serialized); + } catch (Exception e) { + return null; + } + } + }), + retryInfo); + + result = + Errors.isRetryableStatus( + Status.RESOURCE_EXHAUSTED.withDescription("Stop for a while"), metadata); + assertTrue(result.isRetryable); + assertEquals(result.retryDelay, org.threeten.bp.Duration.ofSeconds(123, 456)); + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java index df83a5a01a..d0859bdda5 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/BigQueryReadClientTest.java @@ -26,9 +26,14 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.InvalidArgumentException; +import com.google.api.gax.rpc.ResourceExhaustedException; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode; import com.google.protobuf.AbstractMessage; +import com.google.protobuf.Duration; +import com.google.protobuf.Parser; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -49,6 +54,8 @@ public class BigQueryReadClientTest { private static MockServiceHelper serviceHelper; private BigQueryReadClient client; private LocalChannelProvider channelProvider; + private int retryCount; + private Code lastRetryStatusCode; @BeforeClass public static void startStaticServer() { @@ -68,10 +75,22 @@ public static void stopServer() { public void setUp() throws IOException { serviceHelper.reset(); channelProvider = serviceHelper.createChannelProvider(); + retryCount = 0; + lastRetryStatusCode = Code.OK; BigQueryReadSettings settings = BigQueryReadSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) + .setReadRowsRetryAttemptListener( + new BigQueryReadSettings.RetryAttemptListener() { + @Override + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata) { + synchronized (this) { + retryCount += 1; + lastRetryStatusCode = prevStatus.getCode(); + } + } + }) .build(); client = BigQueryReadClient.create(settings); } @@ -143,6 +162,9 @@ public void readRowsTest() throws Exception { List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); Assert.assertEquals(expectedResponse, actualResponses.get(0)); + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -165,6 +187,9 @@ public void readRowsExceptionTest() throws Exception { InvalidArgumentException apiException = (InvalidArgumentException) e.getCause(); Assert.assertEquals(StatusCode.Code.INVALID_ARGUMENT, apiException.getStatusCode().getCode()); } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -189,6 +214,9 @@ public void readRowsRetryingEOSExceptionTest() throws ExecutionException, Interr callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); } @Test @@ -213,5 +241,97 @@ public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, Inte callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithoutRetryInfo() + throws ExecutionException, InterruptedException { + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription("You are out of quota X")), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + + try { + List actualResponses = responseObserver.future().get(); + Assert.fail("No exception thrown"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof ResourceExhaustedException); + ResourceExhaustedException apiException = (ResourceExhaustedException) e.getCause(); + Assert.assertEquals( + StatusCode.Code.RESOURCE_EXHAUSTED, apiException.getStatusCode().getCode()); + } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithRetryInfo() + throws ExecutionException, InterruptedException { + RetryInfo retryInfo = + RetryInfo.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build()) + .build(); + + Metadata metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "google.rpc.retryinfo-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(RetryInfo value) { + return value.toByteArray(); + } + + @Override + public RetryInfo parseBytes(byte[] serialized) { + try { + Parser parser = (RetryInfo.newBuilder().build()).getParserForType(); + return parser.parseFrom(serialized); + } catch (Exception e) { + return null; + } + } + }), + retryInfo); + + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription("Try again in a bit"), metadata), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.RESOURCE_EXHAUSTED); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java index 9dc725c9a1..3ec383cc31 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/BigQueryStorageClientTest.java @@ -26,6 +26,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.InvalidArgumentException; +import com.google.api.gax.rpc.ResourceExhaustedException; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode; import com.google.cloud.bigquery.storage.v1beta1.Storage.BatchCreateReadSessionStreamsRequest; @@ -41,7 +42,11 @@ import com.google.cloud.bigquery.storage.v1beta1.Storage.StreamPosition; import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto.TableReference; import com.google.protobuf.AbstractMessage; +import com.google.protobuf.Duration; import com.google.protobuf.Empty; +import com.google.protobuf.Parser; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -61,6 +66,8 @@ public class BigQueryStorageClientTest { private static MockServiceHelper serviceHelper; private BigQueryStorageClient client; private LocalChannelProvider channelProvider; + private int retryCount; + private Code lastRetryStatusCode; @BeforeClass public static void startStaticServer() { @@ -79,10 +86,22 @@ public static void stopServer() { public void setUp() throws IOException { serviceHelper.reset(); channelProvider = serviceHelper.createChannelProvider(); + retryCount = 0; + lastRetryStatusCode = Code.OK; BigQueryStorageSettings settings = BigQueryStorageSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) + .setReadRowsRetryAttemptListener( + new BigQueryStorageSettings.RetryAttemptListener() { + @Override + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata) { + synchronized (this) { + retryCount += 1; + lastRetryStatusCode = prevStatus.getCode(); + } + } + }) .build(); client = BigQueryStorageClient.create(settings); } @@ -153,6 +172,9 @@ public void readRowsTest() throws Exception { List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); Assert.assertEquals(expectedResponse, actualResponses.get(0)); + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -176,6 +198,9 @@ public void readRowsExceptionTest() throws Exception { InvalidArgumentException apiException = (InvalidArgumentException) e.getCause(); Assert.assertEquals(StatusCode.Code.INVALID_ARGUMENT, apiException.getStatusCode().getCode()); } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -319,6 +344,9 @@ public void readRowsRetryingEOSExceptionTest() throws ExecutionException, Interr callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); } @Test @@ -343,5 +371,97 @@ public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, Inte callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithoutRetryInfo() + throws ExecutionException, InterruptedException { + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription("You are out of quota X")), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryStorage.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryStorage.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + + try { + List actualResponses = responseObserver.future().get(); + Assert.fail("No exception thrown"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof ResourceExhaustedException); + ResourceExhaustedException apiException = (ResourceExhaustedException) e.getCause(); + Assert.assertEquals( + StatusCode.Code.RESOURCE_EXHAUSTED, apiException.getStatusCode().getCode()); + } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithRetryInfo() + throws ExecutionException, InterruptedException { + RetryInfo retryInfo = + RetryInfo.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build()) + .build(); + + Metadata metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "google.rpc.retryinfo-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(RetryInfo value) { + return value.toByteArray(); + } + + @Override + public RetryInfo parseBytes(byte[] serialized) { + try { + Parser parser = (RetryInfo.newBuilder().build()).getParserForType(); + return parser.parseFrom(serialized); + } catch (Exception e) { + return null; + } + } + }), + retryInfo); + + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription("Try again in a bit"), metadata), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryStorage.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryStorage.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.RESOURCE_EXHAUSTED); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java index 90bea22573..a551146bbc 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java @@ -26,9 +26,14 @@ import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.InvalidArgumentException; +import com.google.api.gax.rpc.ResourceExhaustedException; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode; import com.google.protobuf.AbstractMessage; +import com.google.protobuf.Duration; +import com.google.protobuf.Parser; +import com.google.rpc.RetryInfo; +import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -49,6 +54,8 @@ public class BigQueryReadClientTest { private static MockServiceHelper serviceHelper; private BigQueryReadClient client; private LocalChannelProvider channelProvider; + private int retryCount; + private Code lastRetryStatusCode; @BeforeClass public static void startStaticServer() { @@ -68,10 +75,22 @@ public static void stopServer() { public void setUp() throws IOException { serviceHelper.reset(); channelProvider = serviceHelper.createChannelProvider(); + retryCount = 0; + lastRetryStatusCode = Code.OK; BigQueryReadSettings settings = BigQueryReadSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) + .setReadRowsRetryAttemptListener( + new BigQueryReadSettings.RetryAttemptListener() { + @Override + public void onRetryAttempt(Status prevStatus, Metadata prevMetadata) { + synchronized (this) { + retryCount += 1; + lastRetryStatusCode = prevStatus.getCode(); + } + } + }) .build(); client = BigQueryReadClient.create(settings); } @@ -143,6 +162,9 @@ public void readRowsTest() throws Exception { List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); Assert.assertEquals(expectedResponse, actualResponses.get(0)); + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -165,6 +187,9 @@ public void readRowsExceptionTest() throws Exception { InvalidArgumentException apiException = (InvalidArgumentException) e.getCause(); Assert.assertEquals(StatusCode.Code.INVALID_ARGUMENT, apiException.getStatusCode().getCode()); } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); } @Test @@ -189,6 +214,9 @@ public void readRowsRetryingEOSExceptionTest() throws ExecutionException, Interr callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); } @Test @@ -213,5 +241,97 @@ public void readRowsRetryingHttp2StreamRstTest() throws ExecutionException, Inte callable.serverStreamingCall(request, responseObserver); List actualResponses = responseObserver.future().get(); Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.INTERNAL); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithoutRetryInfo() + throws ExecutionException, InterruptedException { + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription("You are out of quota X")), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + + try { + List actualResponses = responseObserver.future().get(); + Assert.fail("No exception thrown"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof ResourceExhaustedException); + ResourceExhaustedException apiException = (ResourceExhaustedException) e.getCause(); + Assert.assertEquals( + StatusCode.Code.RESOURCE_EXHAUSTED, apiException.getStatusCode().getCode()); + } + + Assert.assertEquals(retryCount, 0); + Assert.assertEquals(lastRetryStatusCode, Code.OK); + } + + @Test + @SuppressWarnings("all") + public void readRowsNoRetryForResourceExhaustedWithRetryInfo() + throws ExecutionException, InterruptedException { + RetryInfo retryInfo = + RetryInfo.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build()) + .build(); + + Metadata metadata = new Metadata(); + metadata.put( + Metadata.Key.of( + "google.rpc.retryinfo-bin", + new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(RetryInfo value) { + return value.toByteArray(); + } + + @Override + public RetryInfo parseBytes(byte[] serialized) { + try { + Parser parser = (RetryInfo.newBuilder().build()).getParserForType(); + return parser.parseFrom(serialized); + } catch (Exception e) { + return null; + } + } + }), + retryInfo); + + ApiException exception = + new ResourceExhaustedException( + new StatusRuntimeException( + Status.RESOURCE_EXHAUSTED.withDescription("Try again in a bit"), metadata), + GrpcStatusCode.of(Code.RESOURCE_EXHAUSTED), + /* retryable = */ false); + mockBigQueryRead.addException(exception); + long rowCount = 1340416618L; + ReadRowsResponse expectedResponse = ReadRowsResponse.newBuilder().setRowCount(rowCount).build(); + mockBigQueryRead.addResponse(expectedResponse); + ReadRowsRequest request = ReadRowsRequest.newBuilder().build(); + + MockStreamObserver responseObserver = new MockStreamObserver<>(); + + ServerStreamingCallable callable = client.readRowsCallable(); + callable.serverStreamingCall(request, responseObserver); + List actualResponses = responseObserver.future().get(); + Assert.assertEquals(1, actualResponses.size()); + + Assert.assertEquals(retryCount, 1); + Assert.assertEquals(lastRetryStatusCode, Code.RESOURCE_EXHAUSTED); } }