diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryErrorMessages.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryErrorMessages.java new file mode 100644 index 000000000..04cabfc67 --- /dev/null +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryErrorMessages.java @@ -0,0 +1,22 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery; + +public class BigQueryErrorMessages { + public static final String RATE_LIMIT_EXCEEDED_MSG = + "Exceeded rate limits:"; // Error Message for RateLimitExceeded Error +} diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryException.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryException.java index 8d6da0b4e..06cbf344c 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryException.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryException.java @@ -122,6 +122,14 @@ static BaseServiceException translateAndThrow(RetryHelperException ex) { throw new BigQueryException(UNKNOWN_CODE, ex.getMessage(), ex.getCause()); } + static BaseServiceException translateAndThrow( + BigQueryRetryHelper.BigQueryRetryHelperException ex) { + if (ex.getCause() instanceof BaseServiceException) { + throw (BaseServiceException) ex.getCause(); + } + throw new BigQueryException(UNKNOWN_CODE, ex.getMessage(), ex.getCause()); + } + static BaseServiceException translateAndThrow(ExecutionException ex) { BaseServiceException.translate(ex); throw new BigQueryException(UNKNOWN_CODE, ex.getMessage(), ex.getCause()); diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java index 4086c6a73..c871eb551 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java @@ -237,6 +237,10 @@ public Page getNextPage() { } private final BigQueryRpc bigQueryRpc; + private static final BigQueryRetryConfig DEFAULT_RETRY_CONFIG = + BigQueryRetryConfig.newBuilder() + .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG) + .build(); // retry config with Error Message for RateLimitExceeded Error BigQueryImpl(BigQueryOptions options) { super(options); @@ -1271,7 +1275,7 @@ private TableResult queryRpc( com.google.api.services.bigquery.model.QueryResponse results; try { results = - runWithRetries( + BigQueryRetryHelper.runWithRetries( new Callable() { @Override public com.google.api.services.bigquery.model.QueryResponse call() { @@ -1280,8 +1284,9 @@ public com.google.api.services.bigquery.model.QueryResponse call() { }, getOptions().getRetrySettings(), BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER, - getOptions().getClock()); - } catch (RetryHelperException e) { + getOptions().getClock(), + DEFAULT_RETRY_CONFIG); + } catch (BigQueryRetryHelper.BigQueryRetryHelperException e) { throw BigQueryException.translateAndThrow(e); } @@ -1362,7 +1367,7 @@ private static QueryResponse getQueryResults( : jobId.getLocation()); try { GetQueryResultsResponse results = - runWithRetries( + BigQueryRetryHelper.runWithRetries( new Callable() { @Override public GetQueryResultsResponse call() { @@ -1376,8 +1381,10 @@ public GetQueryResultsResponse call() { } }, serviceOptions.getRetrySettings(), - EXCEPTION_HANDLER, - serviceOptions.getClock()); + BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER, + serviceOptions.getClock(), + DEFAULT_RETRY_CONFIG); + TableSchema schemaPb = results.getSchema(); ImmutableList.Builder errors = ImmutableList.builder(); @@ -1393,7 +1400,7 @@ public GetQueryResultsResponse call() { .setTotalRows(results.getTotalRows() == null ? 0 : results.getTotalRows().longValue()) .setErrors(errors.build()) .build(); - } catch (RetryHelper.RetryHelperException e) { + } catch (BigQueryRetryHelper.BigQueryRetryHelperException e) { throw BigQueryException.translateAndThrow(e); } } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryAlgorithm.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryAlgorithm.java new file mode 100644 index 000000000..1a75a9c08 --- /dev/null +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryAlgorithm.java @@ -0,0 +1,152 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.retrying.ResultRetryAlgorithmWithContext; +import com.google.api.gax.retrying.RetryAlgorithm; +import com.google.api.gax.retrying.RetryingContext; +import com.google.api.gax.retrying.TimedAttemptSettings; +import com.google.api.gax.retrying.TimedRetryAlgorithm; +import com.google.api.gax.retrying.TimedRetryAlgorithmWithContext; +import java.util.Iterator; +import java.util.concurrent.CancellationException; + +public class BigQueryRetryAlgorithm extends RetryAlgorithm { + private final BigQueryRetryConfig bigQueryRetryConfig; + private final ResultRetryAlgorithm resultAlgorithm; + private final TimedRetryAlgorithm timedAlgorithm; + private final ResultRetryAlgorithmWithContext resultAlgorithmWithContext; + private final TimedRetryAlgorithmWithContext timedAlgorithmWithContext; + + public BigQueryRetryAlgorithm( + ResultRetryAlgorithm resultAlgorithm, + TimedRetryAlgorithm timedAlgorithm, + BigQueryRetryConfig bigQueryRetryConfig) { + super(resultAlgorithm, timedAlgorithm); + this.bigQueryRetryConfig = checkNotNull(bigQueryRetryConfig); + this.resultAlgorithm = checkNotNull(resultAlgorithm); + this.timedAlgorithm = checkNotNull(timedAlgorithm); + this.resultAlgorithmWithContext = null; + this.timedAlgorithmWithContext = null; + } + + @Override + public boolean shouldRetry( + RetryingContext context, + Throwable previousThrowable, + ResponseT previousResponse, + TimedAttemptSettings nextAttemptSettings) + throws CancellationException { + // Implementing shouldRetryBasedOnBigQueryRetryConfig so that we can retry exceptions based on + // the exception messages + return (shouldRetryBasedOnResult(context, previousThrowable, previousResponse) + || shouldRetryBasedOnBigQueryRetryConfig(previousThrowable, bigQueryRetryConfig)) + && shouldRetryBasedOnTiming(context, nextAttemptSettings); + } + + private boolean shouldRetryBasedOnBigQueryRetryConfig( + Throwable previousThrowable, BigQueryRetryConfig bigQueryRetryConfig) { + /* + We are deciding if a given error should be retried on the basis of error message. + Cannot rely on Error/Status code as for example error code 400 (which is not retriable) could be thrown due to rateLimitExceed, which is retriable + */ + String errorDesc; + if (previousThrowable != null && (errorDesc = previousThrowable.getMessage()) != null) { + for (Iterator retriableMessages = + bigQueryRetryConfig.getRetriableErrorMessages().iterator(); + retriableMessages.hasNext(); ) { + if (errorDesc.contains(retriableMessages.next())) { // Error message should be retried + return true; + } + } + } + return false; + } + + /*Duplicating this method as it can not be inherited from the RetryAlgorithm due to the default access modifier*/ + boolean shouldRetryBasedOnResult( + RetryingContext context, Throwable previousThrowable, ResponseT previousResponse) { + if (resultAlgorithmWithContext != null && context != null) { + return resultAlgorithmWithContext.shouldRetry(context, previousThrowable, previousResponse); + } + return getResultAlgorithm().shouldRetry(previousThrowable, previousResponse); + } + + /*Duplicating this method as it can not be inherited from the RetryAlgorithm due to the private access modifier*/ + private boolean shouldRetryBasedOnTiming( + RetryingContext context, TimedAttemptSettings nextAttemptSettings) { + if (nextAttemptSettings == null) { + return false; + } + if (timedAlgorithmWithContext != null && context != null) { + return timedAlgorithmWithContext.shouldRetry(context, nextAttemptSettings); + } + return getTimedAlgorithm().shouldRetry(nextAttemptSettings); + } + + @Override + public TimedAttemptSettings createNextAttempt( + RetryingContext context, + Throwable previousThrowable, + ResponseT previousResponse, + TimedAttemptSettings previousSettings) { + // a small optimization that avoids calling relatively heavy methods + // like timedAlgorithm.createNextAttempt(), when it is not necessary. + + if (!((shouldRetryBasedOnResult(context, previousThrowable, previousResponse) + || shouldRetryBasedOnBigQueryRetryConfig( + previousThrowable, + bigQueryRetryConfig)))) { // Calling shouldRetryBasedOnBigQueryRetryConfig to check if + // the error message could be retried + return null; + } + + TimedAttemptSettings newSettings = + createNextAttemptBasedOnResult( + context, previousThrowable, previousResponse, previousSettings); + if (newSettings == null) { + newSettings = createNextAttemptBasedOnTiming(context, previousSettings); + } + return newSettings; + } + + /*Duplicating this method as it can not be inherited from the RetryAlgorithm due to the private access modifier*/ + private TimedAttemptSettings createNextAttemptBasedOnResult( + RetryingContext context, + Throwable previousThrowable, + ResponseT previousResponse, + TimedAttemptSettings previousSettings) { + if (resultAlgorithmWithContext != null && context != null) { + return resultAlgorithmWithContext.createNextAttempt( + context, previousThrowable, previousResponse, previousSettings); + } + return getResultAlgorithm() + .createNextAttempt(previousThrowable, previousResponse, previousSettings); + } + + /*Duplicating this method as it can not be inherited from the RetryAlgorithm due to the private access modifier*/ + private TimedAttemptSettings createNextAttemptBasedOnTiming( + RetryingContext context, TimedAttemptSettings previousSettings) { + if (timedAlgorithmWithContext != null && context != null) { + return timedAlgorithmWithContext.createNextAttempt(context, previousSettings); + } + return getTimedAlgorithm().createNextAttempt(previousSettings); + } +} diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryConfig.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryConfig.java new file mode 100644 index 000000000..7e28e5707 --- /dev/null +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryConfig.java @@ -0,0 +1,54 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableSet; + +public class BigQueryRetryConfig { + private final ImmutableSet retriableErrorMessages; + + private BigQueryRetryConfig(Builder builder) { + retriableErrorMessages = builder.retriableErrorMessages.build(); + } + + public ImmutableSet getRetriableErrorMessages() { + return retriableErrorMessages; + } + + // BigQueryRetryConfig builder + public static class Builder { + private final ImmutableSet.Builder retriableErrorMessages = ImmutableSet.builder(); + + private Builder() {} + + public final Builder retryOnMessage(String... errorMessages) { + for (String errorMessage : errorMessages) { + retriableErrorMessages.add(checkNotNull(errorMessage)); + } + return this; + } + + public BigQueryRetryConfig build() { + return new BigQueryRetryConfig(this); + } + } + + public static Builder newBuilder() { + return new Builder(); + } +} diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryHelper.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryHelper.java new file mode 100644 index 000000000..7c1e0e592 --- /dev/null +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryHelper.java @@ -0,0 +1,76 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery; + +import com.google.api.core.ApiClock; +import com.google.api.gax.retrying.*; +import com.google.cloud.RetryHelper; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +public class BigQueryRetryHelper extends RetryHelper { + + public static V runWithRetries( + Callable callable, + RetrySettings retrySettings, + ResultRetryAlgorithm resultRetryAlgorithm, + ApiClock clock, + BigQueryRetryConfig bigQueryRetryConfig) + throws RetryHelperException { + try { + // Suppressing should be ok as a workaraund. Current and only ResultRetryAlgorithm + // implementation does not use response at all, so ignoring its type is ok. + @SuppressWarnings("unchecked") + ResultRetryAlgorithm algorithm = (ResultRetryAlgorithm) resultRetryAlgorithm; + return run( + callable, + new ExponentialRetryAlgorithm(retrySettings, clock), + algorithm, + bigQueryRetryConfig); + } catch (Exception e) { + throw new BigQueryRetryHelperException(e.getCause()); + } + } + + private static V run( + Callable callable, + TimedRetryAlgorithm timedAlgorithm, + ResultRetryAlgorithm resultAlgorithm, + BigQueryRetryConfig bigQueryRetryConfig) + throws ExecutionException, InterruptedException { + RetryAlgorithm retryAlgorithm = + new BigQueryRetryAlgorithm<>( + resultAlgorithm, + timedAlgorithm, + bigQueryRetryConfig); // using BigQueryRetryAlgorithm in place of + // com.google.api.gax.retrying.RetryAlgorithm, as + // BigQueryRetryAlgorithm retries considering bigQueryRetryConfig + RetryingExecutor executor = new DirectRetryingExecutor<>(retryAlgorithm); + + RetryingFuture retryingFuture = executor.createFuture(callable); + executor.submit(retryingFuture); + return retryingFuture.get(); + } + + public static class BigQueryRetryHelperException extends RuntimeException { + + private static final long serialVersionUID = -8519852520090965314L; + + BigQueryRetryHelperException(Throwable cause) { + super(cause); + } + } +} diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java index 6ef7eb906..7ed27676c 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java @@ -73,6 +73,10 @@ public class Job extends JobInfo { private final BigQueryOptions options; private transient BigQuery bigquery; + private static final BigQueryRetryConfig DEFAULT_RETRY_CONFIG = + BigQueryRetryConfig.newBuilder() + .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG) + .build(); // retry config with Error Message for RateLimitExceeded Error /** A builder for {@code Job} objects. */ public static final class Builder extends JobInfo.Builder { @@ -319,7 +323,7 @@ public TableResult getQueryResults(QueryResultsOption... options) } private QueryResponse waitForQueryResults( - RetrySettings waitSettings, final QueryResultsOption... resultsOptions) + RetrySettings retrySettings, final QueryResultsOption... resultsOptions) throws InterruptedException { if (getConfiguration().getType() != Type.QUERY) { throw new UnsupportedOperationException( @@ -327,22 +331,26 @@ private QueryResponse waitForQueryResults( } try { - return RetryHelper.poll( + return BigQueryRetryHelper.runWithRetries( new Callable() { @Override public QueryResponse call() { return bigquery.getQueryResults(getJobId(), resultsOptions); } }, - waitSettings, + retrySettings, new BasicResultRetryAlgorithm() { @Override - public boolean shouldRetry(Throwable prevThrowable, QueryResponse prevResponse) { + public boolean shouldRetry( + Throwable prevThrowable, + QueryResponse + prevResponse) { // Used by BigQueryRetryAlgorithm.shouldRetryBasedOnResult return prevResponse != null && !prevResponse.getCompleted(); } }, - options.getClock()); - } catch (ExecutionException e) { + options.getClock(), + DEFAULT_RETRY_CONFIG); + } catch (BigQueryRetryHelper.BigQueryRetryHelperException e) { throw BigQueryException.translateAndThrow(e); } } diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java index 7c970313f..654cc0266 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java @@ -2173,6 +2173,49 @@ public void testGetQueryResults() { verify(bigqueryRpcMock).getQueryResults(PROJECT, JOB, null, EMPTY_RPC_OPTIONS); } + @Test + public void testGetQueryResultsRetry() { + JobId queryJob = JobId.of(JOB); + GetQueryResultsResponse responsePb = + new GetQueryResultsResponse() + .setEtag("etag") + .setJobReference(queryJob.toPb()) + .setRows(ImmutableList.of(TABLE_ROW)) + .setJobComplete(true) + .setCacheHit(false) + .setPageToken(CURSOR) + .setTotalBytesProcessed(42L) + .setTotalRows(BigInteger.valueOf(1L)); + + when(bigqueryRpcMock.getQueryResults(PROJECT, JOB, null, EMPTY_RPC_OPTIONS)) + .thenThrow(new BigQueryException(500, "InternalError")) + .thenThrow(new BigQueryException(502, "Bad Gateway")) + .thenThrow(new BigQueryException(503, "Service Unavailable")) + .thenThrow(new BigQueryException(504, "Gateway Timeout")) + .thenThrow( + new BigQueryException( + 400, + BigQueryErrorMessages + .RATE_LIMIT_EXCEEDED_MSG)) // retrial on based on RATE_LIMIT_EXCEEDED_MSG + .thenReturn(responsePb); + + bigquery = + options + .toBuilder() + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .build() + .getService(); + + QueryResponse response = bigquery.getQueryResults(queryJob); + assertEquals(true, response.getCompleted()); + assertEquals(null, response.getSchema()); + // IMP: Unable to test for idempotency of the requests using getQueryResults(PROJECT, JOB, null, + // EMPTY_RPC_OPTIONS) as there is no + // identifier in this method which will can potentially differ and which can be used to + // establish idempotency + verify(bigqueryRpcMock, times(6)).getQueryResults(PROJECT, JOB, null, EMPTY_RPC_OPTIONS); + } + @Test public void testGetQueryResultsWithProject() { JobId queryJob = JobId.of(OTHER_PROJECT, JOB); @@ -2377,6 +2420,56 @@ public void testFastQueryDMLShouldRetry() throws Exception { verify(bigqueryRpcMock, times(5)).queryRpc(eq(PROJECT), requestPbCapture.capture()); } + @Test + public void testFastQueryRateLimitIdempotency() throws Exception { + com.google.api.services.bigquery.model.QueryResponse responsePb = + new com.google.api.services.bigquery.model.QueryResponse() + .setCacheHit(false) + .setJobComplete(true) + .setRows(ImmutableList.of(TABLE_ROW)) + .setPageToken(null) + .setTotalBytesProcessed(42L) + .setNumDmlAffectedRows(1L) + .setSchema(TABLE_SCHEMA.toPb()); + + when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture())) + .thenThrow(new BigQueryException(500, "InternalError")) + .thenThrow(new BigQueryException(502, "Bad Gateway")) + .thenThrow(new BigQueryException(503, "Service Unavailable")) + .thenThrow(new BigQueryException(504, "Gateway Timeout")) + .thenThrow( + new BigQueryException( + 400, + BigQueryErrorMessages + .RATE_LIMIT_EXCEEDED_MSG)) // retrial on based on RATE_LIMIT_EXCEEDED_MSG + .thenReturn(responsePb); + + bigquery = + options + .toBuilder() + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .build() + .getService(); + + TableResult response = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY); + assertEquals(TABLE_SCHEMA, response.getSchema()); + assertEquals(1, response.getTotalRows()); + + List allRequests = requestPbCapture.getAllValues(); + boolean idempotent = true; + String firstRequestId = allRequests.get(0).getRequestId(); + for (QueryRequest request : allRequests) { + idempotent = + idempotent + && request + .getRequestId() + .equals(firstRequestId); // all the requestIds should be the same + } + + assertTrue(idempotent); + verify(bigqueryRpcMock, times(6)).queryRpc(eq(PROJECT), requestPbCapture.capture()); + } + @Test public void testFastQueryDDLShouldRetry() throws Exception { com.google.api.services.bigquery.model.QueryResponse responsePb =