Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: Implemented BigQueryRetryAlgorithm to retry on the basis of the…
… configured re-triable error messages (#1426)

* Updated BigQueryImpl

* Initial Commit

* Using BigQueryRetryAlgorithm as the retry algorithm

* Created BigQueryRetryAlgorithm as a subclass of RetryAlgorithm

* BigQueryErrorMessages property file

* Implemented Builder Logic for BigQueryRetryConfig

* Using BigQueryRetryConfig for getQueryResults

* Updated shouldRetry with the logic to retry based on error messages

* Implemented null checks on shouldRetryBasedOnBigQueryRetryConfig

* Removed `Status` from shouldRetryBasedOnBigQueryRetryConfig implementation

* Removed unused imports

* created DEFAULT_RATE_LIMIT_EXCEEDED_RETRY_CONFIG for getQueryResults

* Added testGetQueryResultsRetry test for testing getQueryResults Retry

* Overriding createNextAttempt method so that it generates an attempt based on the error message

* Linted BigQueryRetryHelper

* Linted testGetQueryResultsRetry

* Linted BigQueryRetryAlgorithm

* Linted BigQueryErrorMessages

* Linted BigQueryRetryConfig

* Fixed Linting

* Fixed Linting

* Fixed Linting

* Created translateAndThrow(BigQueryRetryHelper.BigQueryRetryHelperException ex) method to handle BigQueryRetryHelperException

* Handling BigQueryRetryHelper.BigQueryRetryHelperException for getQueryResults

* Implementing BigQueryRetryHelper.runWithRetries from TableResult.queryRPC method

* Implementing testFastQueryRateLimitIdempotency Method to test Idempotency of the BigQueryRetryHelper.runWithRetries for TableResult.query(...)

* Changed DEFAULT_RATE_LIMIT_EXCEEDED_RETRY_CONFIG to DEFAULT_RETRY_CONFIG

* Implemented `BigQueryRetryHelper.runWithRetries` on `QueryResponse waitForQueryResults` method, which is used by `TableResult getQueryResults` method

* Revert "Implemented `BigQueryRetryHelper.runWithRetries` on `QueryResponse waitForQueryResults` method, which is used by `TableResult getQueryResults` method"

This reverts commit 84a3418.

* Revert "Changed DEFAULT_RATE_LIMIT_EXCEEDED_RETRY_CONFIG to DEFAULT_RETRY_CONFIG"

This reverts commit 22b1706.

* Renamed DEFAULT_RATE_LIMIT_EXCEEDED_RETRY_CONFIG to DEFAULT_RETRY_CONFIG

* Revert "Renamed DEFAULT_RATE_LIMIT_EXCEEDED_RETRY_CONFIG to DEFAULT_RETRY_CONFIG"

This reverts commit 2d21e11.

* Renamed DEFAULT_RATE_LIMIT_EXCEEDED_RETRY_CONFIG to DEFAULT_RETRY_CONFIG

* Implemented BigQueryRetryHelper.runWithRetries on `QueryResponse waitForQueryResults` method, which is used by `TableResult getQueryResults` method
  • Loading branch information
prash-mi committed Jul 14, 2021
1 parent 5bee589 commit 44d9795
Show file tree
Hide file tree
Showing 8 changed files with 433 additions and 13 deletions.
@@ -0,0 +1,22 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigquery;

public class BigQueryErrorMessages {
public static final String RATE_LIMIT_EXCEEDED_MSG =
"Exceeded rate limits:"; // Error Message for RateLimitExceeded Error
}
Expand Up @@ -122,6 +122,14 @@ static BaseServiceException translateAndThrow(RetryHelperException ex) {
throw new BigQueryException(UNKNOWN_CODE, ex.getMessage(), ex.getCause());
}

static BaseServiceException translateAndThrow(
BigQueryRetryHelper.BigQueryRetryHelperException ex) {
if (ex.getCause() instanceof BaseServiceException) {
throw (BaseServiceException) ex.getCause();
}
throw new BigQueryException(UNKNOWN_CODE, ex.getMessage(), ex.getCause());
}

static BaseServiceException translateAndThrow(ExecutionException ex) {
BaseServiceException.translate(ex);
throw new BigQueryException(UNKNOWN_CODE, ex.getMessage(), ex.getCause());
Expand Down
Expand Up @@ -237,6 +237,10 @@ public Page<FieldValueList> getNextPage() {
}

private final BigQueryRpc bigQueryRpc;
private static final BigQueryRetryConfig DEFAULT_RETRY_CONFIG =
BigQueryRetryConfig.newBuilder()
.retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG)
.build(); // retry config with Error Message for RateLimitExceeded Error

BigQueryImpl(BigQueryOptions options) {
super(options);
Expand Down Expand Up @@ -1271,7 +1275,7 @@ private TableResult queryRpc(
com.google.api.services.bigquery.model.QueryResponse results;
try {
results =
runWithRetries(
BigQueryRetryHelper.runWithRetries(
new Callable<com.google.api.services.bigquery.model.QueryResponse>() {
@Override
public com.google.api.services.bigquery.model.QueryResponse call() {
Expand All @@ -1280,8 +1284,9 @@ public com.google.api.services.bigquery.model.QueryResponse call() {
},
getOptions().getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
getOptions().getClock(),
DEFAULT_RETRY_CONFIG);
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}

Expand Down Expand Up @@ -1362,7 +1367,7 @@ private static QueryResponse getQueryResults(
: jobId.getLocation());
try {
GetQueryResultsResponse results =
runWithRetries(
BigQueryRetryHelper.runWithRetries(
new Callable<GetQueryResultsResponse>() {
@Override
public GetQueryResultsResponse call() {
Expand All @@ -1376,8 +1381,10 @@ public GetQueryResultsResponse call() {
}
},
serviceOptions.getRetrySettings(),
EXCEPTION_HANDLER,
serviceOptions.getClock());
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
serviceOptions.getClock(),
DEFAULT_RETRY_CONFIG);

TableSchema schemaPb = results.getSchema();

ImmutableList.Builder<BigQueryError> errors = ImmutableList.builder();
Expand All @@ -1393,7 +1400,7 @@ public GetQueryResultsResponse call() {
.setTotalRows(results.getTotalRows() == null ? 0 : results.getTotalRows().longValue())
.setErrors(errors.build())
.build();
} catch (RetryHelper.RetryHelperException e) {
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
}
Expand Down
@@ -0,0 +1,152 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigquery;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.ResultRetryAlgorithmWithContext;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingContext;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.api.gax.retrying.TimedRetryAlgorithmWithContext;
import java.util.Iterator;
import java.util.concurrent.CancellationException;

public class BigQueryRetryAlgorithm<ResponseT> extends RetryAlgorithm<ResponseT> {
private final BigQueryRetryConfig bigQueryRetryConfig;
private final ResultRetryAlgorithm<ResponseT> resultAlgorithm;
private final TimedRetryAlgorithm timedAlgorithm;
private final ResultRetryAlgorithmWithContext<ResponseT> resultAlgorithmWithContext;
private final TimedRetryAlgorithmWithContext timedAlgorithmWithContext;

public BigQueryRetryAlgorithm(
ResultRetryAlgorithm<ResponseT> resultAlgorithm,
TimedRetryAlgorithm timedAlgorithm,
BigQueryRetryConfig bigQueryRetryConfig) {
super(resultAlgorithm, timedAlgorithm);
this.bigQueryRetryConfig = checkNotNull(bigQueryRetryConfig);
this.resultAlgorithm = checkNotNull(resultAlgorithm);
this.timedAlgorithm = checkNotNull(timedAlgorithm);
this.resultAlgorithmWithContext = null;
this.timedAlgorithmWithContext = null;
}

@Override
public boolean shouldRetry(
RetryingContext context,
Throwable previousThrowable,
ResponseT previousResponse,
TimedAttemptSettings nextAttemptSettings)
throws CancellationException {
// Implementing shouldRetryBasedOnBigQueryRetryConfig so that we can retry exceptions based on
// the exception messages
return (shouldRetryBasedOnResult(context, previousThrowable, previousResponse)
|| shouldRetryBasedOnBigQueryRetryConfig(previousThrowable, bigQueryRetryConfig))
&& shouldRetryBasedOnTiming(context, nextAttemptSettings);
}

private boolean shouldRetryBasedOnBigQueryRetryConfig(
Throwable previousThrowable, BigQueryRetryConfig bigQueryRetryConfig) {
/*
We are deciding if a given error should be retried on the basis of error message.
Cannot rely on Error/Status code as for example error code 400 (which is not retriable) could be thrown due to rateLimitExceed, which is retriable
*/
String errorDesc;
if (previousThrowable != null && (errorDesc = previousThrowable.getMessage()) != null) {
for (Iterator<String> retriableMessages =
bigQueryRetryConfig.getRetriableErrorMessages().iterator();
retriableMessages.hasNext(); ) {
if (errorDesc.contains(retriableMessages.next())) { // Error message should be retried
return true;
}
}
}
return false;
}

/*Duplicating this method as it can not be inherited from the RetryAlgorithm due to the default access modifier*/
boolean shouldRetryBasedOnResult(
RetryingContext context, Throwable previousThrowable, ResponseT previousResponse) {
if (resultAlgorithmWithContext != null && context != null) {
return resultAlgorithmWithContext.shouldRetry(context, previousThrowable, previousResponse);
}
return getResultAlgorithm().shouldRetry(previousThrowable, previousResponse);
}

/*Duplicating this method as it can not be inherited from the RetryAlgorithm due to the private access modifier*/
private boolean shouldRetryBasedOnTiming(
RetryingContext context, TimedAttemptSettings nextAttemptSettings) {
if (nextAttemptSettings == null) {
return false;
}
if (timedAlgorithmWithContext != null && context != null) {
return timedAlgorithmWithContext.shouldRetry(context, nextAttemptSettings);
}
return getTimedAlgorithm().shouldRetry(nextAttemptSettings);
}

@Override
public TimedAttemptSettings createNextAttempt(
RetryingContext context,
Throwable previousThrowable,
ResponseT previousResponse,
TimedAttemptSettings previousSettings) {
// a small optimization that avoids calling relatively heavy methods
// like timedAlgorithm.createNextAttempt(), when it is not necessary.

if (!((shouldRetryBasedOnResult(context, previousThrowable, previousResponse)
|| shouldRetryBasedOnBigQueryRetryConfig(
previousThrowable,
bigQueryRetryConfig)))) { // Calling shouldRetryBasedOnBigQueryRetryConfig to check if
// the error message could be retried
return null;
}

TimedAttemptSettings newSettings =
createNextAttemptBasedOnResult(
context, previousThrowable, previousResponse, previousSettings);
if (newSettings == null) {
newSettings = createNextAttemptBasedOnTiming(context, previousSettings);
}
return newSettings;
}

/*Duplicating this method as it can not be inherited from the RetryAlgorithm due to the private access modifier*/
private TimedAttemptSettings createNextAttemptBasedOnResult(
RetryingContext context,
Throwable previousThrowable,
ResponseT previousResponse,
TimedAttemptSettings previousSettings) {
if (resultAlgorithmWithContext != null && context != null) {
return resultAlgorithmWithContext.createNextAttempt(
context, previousThrowable, previousResponse, previousSettings);
}
return getResultAlgorithm()
.createNextAttempt(previousThrowable, previousResponse, previousSettings);
}

/*Duplicating this method as it can not be inherited from the RetryAlgorithm due to the private access modifier*/
private TimedAttemptSettings createNextAttemptBasedOnTiming(
RetryingContext context, TimedAttemptSettings previousSettings) {
if (timedAlgorithmWithContext != null && context != null) {
return timedAlgorithmWithContext.createNextAttempt(context, previousSettings);
}
return getTimedAlgorithm().createNextAttempt(previousSettings);
}
}
@@ -0,0 +1,54 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.ImmutableSet;

public class BigQueryRetryConfig {
private final ImmutableSet<String> retriableErrorMessages;

private BigQueryRetryConfig(Builder builder) {
retriableErrorMessages = builder.retriableErrorMessages.build();
}

public ImmutableSet<String> getRetriableErrorMessages() {
return retriableErrorMessages;
}

// BigQueryRetryConfig builder
public static class Builder {
private final ImmutableSet.Builder<String> retriableErrorMessages = ImmutableSet.builder();

private Builder() {}

public final Builder retryOnMessage(String... errorMessages) {
for (String errorMessage : errorMessages) {
retriableErrorMessages.add(checkNotNull(errorMessage));
}
return this;
}

public BigQueryRetryConfig build() {
return new BigQueryRetryConfig(this);
}
}

public static Builder newBuilder() {
return new Builder();
}
}
@@ -0,0 +1,76 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery;

import com.google.api.core.ApiClock;
import com.google.api.gax.retrying.*;
import com.google.cloud.RetryHelper;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

public class BigQueryRetryHelper extends RetryHelper {

public static <V> V runWithRetries(
Callable<V> callable,
RetrySettings retrySettings,
ResultRetryAlgorithm<?> resultRetryAlgorithm,
ApiClock clock,
BigQueryRetryConfig bigQueryRetryConfig)
throws RetryHelperException {
try {
// Suppressing should be ok as a workaraund. Current and only ResultRetryAlgorithm
// implementation does not use response at all, so ignoring its type is ok.
@SuppressWarnings("unchecked")
ResultRetryAlgorithm<V> algorithm = (ResultRetryAlgorithm<V>) resultRetryAlgorithm;
return run(
callable,
new ExponentialRetryAlgorithm(retrySettings, clock),
algorithm,
bigQueryRetryConfig);
} catch (Exception e) {
throw new BigQueryRetryHelperException(e.getCause());
}
}

private static <V> V run(
Callable<V> callable,
TimedRetryAlgorithm timedAlgorithm,
ResultRetryAlgorithm<V> resultAlgorithm,
BigQueryRetryConfig bigQueryRetryConfig)
throws ExecutionException, InterruptedException {
RetryAlgorithm<V> retryAlgorithm =
new BigQueryRetryAlgorithm<>(
resultAlgorithm,
timedAlgorithm,
bigQueryRetryConfig); // using BigQueryRetryAlgorithm in place of
// com.google.api.gax.retrying.RetryAlgorithm, as
// BigQueryRetryAlgorithm retries considering bigQueryRetryConfig
RetryingExecutor<V> executor = new DirectRetryingExecutor<>(retryAlgorithm);

RetryingFuture<V> retryingFuture = executor.createFuture(callable);
executor.submit(retryingFuture);
return retryingFuture.get();
}

public static class BigQueryRetryHelperException extends RuntimeException {

private static final long serialVersionUID = -8519852520090965314L;

BigQueryRetryHelperException(Throwable cause) {
super(cause);
}
}
}

0 comments on commit 44d9795

Please sign in to comment.