Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implemented BigQueryRetryAlgorithm to retry on the basis of the configured re-triable error messages #1426

Merged
merged 38 commits into from Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c48231d
Updated BigQueryImpl
prash-mi Jun 29, 2021
e7c33e6
Initial Commit
prash-mi Jun 29, 2021
faaee99
Merge branch 'googleapis:master' into bigquery-bugfix-1250
prash-mi Jun 29, 2021
dbcfa7d
Using BigQueryRetryAlgorithm as the retry algorithm
prash-mi Jun 30, 2021
e902f50
Created BigQueryRetryAlgorithm as a subclass of RetryAlgorithm
prash-mi Jun 30, 2021
ab25cea
BigQueryErrorMessages property file
prash-mi Jun 30, 2021
77a874d
Implemented Builder Logic for BigQueryRetryConfig
prash-mi Jun 30, 2021
ca153b8
Using BigQueryRetryConfig for getQueryResults
prash-mi Jun 30, 2021
cb15bc6
Updated shouldRetry with the logic to retry based on error messages
prash-mi Jun 30, 2021
7774662
Implemented null checks on shouldRetryBasedOnBigQueryRetryConfig
prash-mi Jul 1, 2021
a271259
Removed `Status` from shouldRetryBasedOnBigQueryRetryConfig implement…
prash-mi Jul 2, 2021
6d7e4bf
Removed unused imports
prash-mi Jul 2, 2021
68299bf
created DEFAULT_RATE_LIMIT_EXCEEDED_RETRY_CONFIG for getQueryResults
prash-mi Jul 3, 2021
480b13b
Merge branch 'googleapis:master' into bigquery-bugfix-1250
prash-mi Jul 3, 2021
43752b3
Merge branch 'bigquery-bugfix-1250' of https://github.com/prash-mi/ja…
prash-mi Jul 3, 2021
0e99fa3
Added testGetQueryResultsRetry test for testing getQueryResults Retry
prash-mi Jul 3, 2021
38f3977
Overriding createNextAttempt method so that it generates an attempt b…
prash-mi Jul 3, 2021
40393b3
Linted BigQueryRetryHelper
prash-mi Jul 3, 2021
2e234b0
Linted testGetQueryResultsRetry
prash-mi Jul 3, 2021
28fa870
Linted BigQueryRetryAlgorithm
prash-mi Jul 3, 2021
67f4ce1
Linted BigQueryErrorMessages
prash-mi Jul 3, 2021
a986f37
Linted BigQueryRetryConfig
prash-mi Jul 3, 2021
3da3430
Fixed Linting
prash-mi Jul 3, 2021
66fd067
Fixed Linting
prash-mi Jul 3, 2021
a72a2f5
Fixed Linting
prash-mi Jul 3, 2021
cc5f730
Created translateAndThrow(BigQueryRetryHelper.BigQueryRetryHelperExce…
prash-mi Jul 3, 2021
0648071
Handling BigQueryRetryHelper.BigQueryRetryHelperException for getQuer…
prash-mi Jul 3, 2021
286e3f1
Implementing BigQueryRetryHelper.runWithRetries from TableResult.quer…
prash-mi Jul 5, 2021
68c06e2
Implementing testFastQueryRateLimitIdempotency Method to test Idempot…
prash-mi Jul 5, 2021
22b1706
Changed DEFAULT_RATE_LIMIT_EXCEEDED_RETRY_CONFIG to DEFAULT_RETRY_CONFIG
prash-mi Jul 12, 2021
84a3418
Implemented `BigQueryRetryHelper.runWithRetries` on `QueryResponse wa…
prash-mi Jul 12, 2021
13e8132
Merge branch 'googleapis:master' into bigquery-bugfix-1250
prash-mi Jul 12, 2021
8f5fc45
Revert "Implemented `BigQueryRetryHelper.runWithRetries` on `QueryRes…
prash-mi Jul 12, 2021
c5d6b70
Revert "Changed DEFAULT_RATE_LIMIT_EXCEEDED_RETRY_CONFIG to DEFAULT_R…
prash-mi Jul 12, 2021
2d21e11
Renamed DEFAULT_RATE_LIMIT_EXCEEDED_RETRY_CONFIG to DEFAULT_RETRY_CONFIG
prash-mi Jul 12, 2021
592eea8
Revert "Renamed DEFAULT_RATE_LIMIT_EXCEEDED_RETRY_CONFIG to DEFAULT_R…
prash-mi Jul 12, 2021
d448132
Renamed DEFAULT_RATE_LIMIT_EXCEEDED_RETRY_CONFIG to DEFAULT_RETRY_CONFIG
prash-mi Jul 13, 2021
9833823
Implemented BigQueryRetryHelper.runWithRetries on `QueryResponse wait…
prash-mi Jul 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,22 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigquery;

public class BigQueryErrorMessages {
public static final String RATE_LIMIT_EXCEEDED_MSG =
"Exceeded rate limits:"; // Error Message for RateLimitExceeded Error
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Up @@ -122,6 +122,14 @@ static BaseServiceException translateAndThrow(RetryHelperException ex) {
throw new BigQueryException(UNKNOWN_CODE, ex.getMessage(), ex.getCause());
}

static BaseServiceException translateAndThrow(
BigQueryRetryHelper.BigQueryRetryHelperException ex) {
if (ex.getCause() instanceof BaseServiceException) {
throw (BaseServiceException) ex.getCause();
}
throw new BigQueryException(UNKNOWN_CODE, ex.getMessage(), ex.getCause());
}

static BaseServiceException translateAndThrow(ExecutionException ex) {
BaseServiceException.translate(ex);
throw new BigQueryException(UNKNOWN_CODE, ex.getMessage(), ex.getCause());
Expand Down
Expand Up @@ -237,6 +237,10 @@ public Page<FieldValueList> getNextPage() {
}

private final BigQueryRpc bigQueryRpc;
private static final BigQueryRetryConfig DEFAULT_RETRY_CONFIG =
BigQueryRetryConfig.newBuilder()
.retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this a little extraneous -- why do we need to specify every message we need to retry on? If we build a config, I imagine we just retry on all the specified error messages in the config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure Stephanie. As discussed, I have tried to create it on the lines of com.google.cloud.ExceptionHandler and this may give us additional flexibility to configure hooks like retryOnStatus or retryOnReason later as necessary. Happy to refactor it as needed.

.build(); // retry config with Error Message for RateLimitExceeded Error

BigQueryImpl(BigQueryOptions options) {
super(options);
Expand Down Expand Up @@ -1271,7 +1275,7 @@ private TableResult queryRpc(
com.google.api.services.bigquery.model.QueryResponse results;
try {
results =
runWithRetries(
BigQueryRetryHelper.runWithRetries(
new Callable<com.google.api.services.bigquery.model.QueryResponse>() {
@Override
public com.google.api.services.bigquery.model.QueryResponse call() {
Expand All @@ -1280,8 +1284,9 @@ public com.google.api.services.bigquery.model.QueryResponse call() {
},
getOptions().getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
getOptions().getClock(),
DEFAULT_RETRY_CONFIG);
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}

Expand Down Expand Up @@ -1362,7 +1367,7 @@ private static QueryResponse getQueryResults(
: jobId.getLocation());
try {
GetQueryResultsResponse results =
runWithRetries(
BigQueryRetryHelper.runWithRetries(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this also to TableResult query(...) methods? That should also allow you to verify idempotency using requestId for jobs.query API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have added BigQueryRetryHelper.runWithRetries for TableResult query(...) and have implemented testcase testFastQueryRateLimitIdempotency to test the idempotency

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay great but I think we need it for the jobs.insert endpoint too (line 1269). There are 2 query paths here. The one you've updated is the "fast" query path (jobs.query).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have implemented BigQueryRetryHelper.runWithRetries on QueryResponse waitForQueryResults method, which is used by TableResult getQueryResults method

new Callable<GetQueryResultsResponse>() {
@Override
public GetQueryResultsResponse call() {
Expand All @@ -1376,8 +1381,10 @@ public GetQueryResultsResponse call() {
}
},
serviceOptions.getRetrySettings(),
EXCEPTION_HANDLER,
serviceOptions.getClock());
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
serviceOptions.getClock(),
DEFAULT_RETRY_CONFIG);

TableSchema schemaPb = results.getSchema();

ImmutableList.Builder<BigQueryError> errors = ImmutableList.builder();
Expand All @@ -1393,7 +1400,7 @@ public GetQueryResultsResponse call() {
.setTotalRows(results.getTotalRows() == null ? 0 : results.getTotalRows().longValue())
.setErrors(errors.build())
.build();
} catch (RetryHelper.RetryHelperException e) {
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
}
Expand Down
@@ -0,0 +1,152 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigquery;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.ResultRetryAlgorithmWithContext;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingContext;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.api.gax.retrying.TimedRetryAlgorithmWithContext;
import java.util.Iterator;
import java.util.concurrent.CancellationException;

public class BigQueryRetryAlgorithm<ResponseT> extends RetryAlgorithm<ResponseT> {
private final BigQueryRetryConfig bigQueryRetryConfig;
private final ResultRetryAlgorithm<ResponseT> resultAlgorithm;
private final TimedRetryAlgorithm timedAlgorithm;
private final ResultRetryAlgorithmWithContext<ResponseT> resultAlgorithmWithContext;
private final TimedRetryAlgorithmWithContext timedAlgorithmWithContext;

public BigQueryRetryAlgorithm(
ResultRetryAlgorithm<ResponseT> resultAlgorithm,
TimedRetryAlgorithm timedAlgorithm,
BigQueryRetryConfig bigQueryRetryConfig) {
super(resultAlgorithm, timedAlgorithm);
this.bigQueryRetryConfig = checkNotNull(bigQueryRetryConfig);
this.resultAlgorithm = checkNotNull(resultAlgorithm);
this.timedAlgorithm = checkNotNull(timedAlgorithm);
this.resultAlgorithmWithContext = null;
this.timedAlgorithmWithContext = null;
}

@Override
public boolean shouldRetry(
RetryingContext context,
Throwable previousThrowable,
ResponseT previousResponse,
TimedAttemptSettings nextAttemptSettings)
throws CancellationException {
// Implementing shouldRetryBasedOnBigQueryRetryConfig so that we can retry exceptions based on
// the exception messages
return (shouldRetryBasedOnResult(context, previousThrowable, previousResponse)
|| shouldRetryBasedOnBigQueryRetryConfig(previousThrowable, bigQueryRetryConfig))
&& shouldRetryBasedOnTiming(context, nextAttemptSettings);
}

private boolean shouldRetryBasedOnBigQueryRetryConfig(
Throwable previousThrowable, BigQueryRetryConfig bigQueryRetryConfig) {
/*
We are deciding if a given error should be retried on the basis of error message.
Cannot rely on Error/Status code as for example error code 400 (which is not retriable) could be thrown due to rateLimitExceed, which is retriable
*/
String errorDesc;
if (previousThrowable != null && (errorDesc = previousThrowable.getMessage()) != null) {
for (Iterator<String> retriableMessages =
bigQueryRetryConfig.getRetriableErrorMessages().iterator();
retriableMessages.hasNext(); ) {
if (errorDesc.contains(retriableMessages.next())) { // Error message should be retried
return true;
}
}
}
return false;
}

/*Duplicating this method as it can not be inherited from the RetryAlgorithm due to the default access modifier*/
boolean shouldRetryBasedOnResult(
RetryingContext context, Throwable previousThrowable, ResponseT previousResponse) {
if (resultAlgorithmWithContext != null && context != null) {
return resultAlgorithmWithContext.shouldRetry(context, previousThrowable, previousResponse);
}
return getResultAlgorithm().shouldRetry(previousThrowable, previousResponse);
}

/*Duplicating this method as it can not be inherited from the RetryAlgorithm due to the private access modifier*/
private boolean shouldRetryBasedOnTiming(
RetryingContext context, TimedAttemptSettings nextAttemptSettings) {
if (nextAttemptSettings == null) {
return false;
}
if (timedAlgorithmWithContext != null && context != null) {
return timedAlgorithmWithContext.shouldRetry(context, nextAttemptSettings);
}
return getTimedAlgorithm().shouldRetry(nextAttemptSettings);
}

@Override
public TimedAttemptSettings createNextAttempt(
RetryingContext context,
Throwable previousThrowable,
ResponseT previousResponse,
TimedAttemptSettings previousSettings) {
// a small optimization that avoids calling relatively heavy methods
// like timedAlgorithm.createNextAttempt(), when it is not necessary.

if (!((shouldRetryBasedOnResult(context, previousThrowable, previousResponse)
|| shouldRetryBasedOnBigQueryRetryConfig(
previousThrowable,
bigQueryRetryConfig)))) { // Calling shouldRetryBasedOnBigQueryRetryConfig to check if
// the error message could be retried
return null;
}

TimedAttemptSettings newSettings =
createNextAttemptBasedOnResult(
context, previousThrowable, previousResponse, previousSettings);
if (newSettings == null) {
newSettings = createNextAttemptBasedOnTiming(context, previousSettings);
}
return newSettings;
}

/*Duplicating this method as it can not be inherited from the RetryAlgorithm due to the private access modifier*/
private TimedAttemptSettings createNextAttemptBasedOnResult(
RetryingContext context,
Throwable previousThrowable,
ResponseT previousResponse,
TimedAttemptSettings previousSettings) {
if (resultAlgorithmWithContext != null && context != null) {
return resultAlgorithmWithContext.createNextAttempt(
context, previousThrowable, previousResponse, previousSettings);
}
return getResultAlgorithm()
.createNextAttempt(previousThrowable, previousResponse, previousSettings);
}

/*Duplicating this method as it can not be inherited from the RetryAlgorithm due to the private access modifier*/
private TimedAttemptSettings createNextAttemptBasedOnTiming(
RetryingContext context, TimedAttemptSettings previousSettings) {
if (timedAlgorithmWithContext != null && context != null) {
return timedAlgorithmWithContext.createNextAttempt(context, previousSettings);
}
return getTimedAlgorithm().createNextAttempt(previousSettings);
}
}
@@ -0,0 +1,54 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.ImmutableSet;

public class BigQueryRetryConfig {
private final ImmutableSet<String> retriableErrorMessages;

private BigQueryRetryConfig(Builder builder) {
retriableErrorMessages = builder.retriableErrorMessages.build();
}

public ImmutableSet<String> getRetriableErrorMessages() {
return retriableErrorMessages;
}

// BigQueryRetryConfig builder
public static class Builder {
private final ImmutableSet.Builder<String> retriableErrorMessages = ImmutableSet.builder();

private Builder() {}

public final Builder retryOnMessage(String... errorMessages) {
for (String errorMessage : errorMessages) {
retriableErrorMessages.add(checkNotNull(errorMessage));
}
return this;
}

public BigQueryRetryConfig build() {
return new BigQueryRetryConfig(this);
}
}

public static Builder newBuilder() {
return new Builder();
}
}
@@ -0,0 +1,76 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery;

import com.google.api.core.ApiClock;
import com.google.api.gax.retrying.*;
import com.google.cloud.RetryHelper;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

public class BigQueryRetryHelper extends RetryHelper {

public static <V> V runWithRetries(
Callable<V> callable,
RetrySettings retrySettings,
ResultRetryAlgorithm<?> resultRetryAlgorithm,
ApiClock clock,
BigQueryRetryConfig bigQueryRetryConfig)
throws RetryHelperException {
try {
// Suppressing should be ok as a workaraund. Current and only ResultRetryAlgorithm
// implementation does not use response at all, so ignoring its type is ok.
@SuppressWarnings("unchecked")
ResultRetryAlgorithm<V> algorithm = (ResultRetryAlgorithm<V>) resultRetryAlgorithm;
return run(
callable,
new ExponentialRetryAlgorithm(retrySettings, clock),
algorithm,
bigQueryRetryConfig);
} catch (Exception e) {
throw new BigQueryRetryHelperException(e.getCause());
}
}

private static <V> V run(
Callable<V> callable,
TimedRetryAlgorithm timedAlgorithm,
ResultRetryAlgorithm<V> resultAlgorithm,
BigQueryRetryConfig bigQueryRetryConfig)
throws ExecutionException, InterruptedException {
RetryAlgorithm<V> retryAlgorithm =
new BigQueryRetryAlgorithm<>(
resultAlgorithm,
timedAlgorithm,
bigQueryRetryConfig); // using BigQueryRetryAlgorithm in place of
// com.google.api.gax.retrying.RetryAlgorithm, as
// BigQueryRetryAlgorithm retries considering bigQueryRetryConfig
RetryingExecutor<V> executor = new DirectRetryingExecutor<>(retryAlgorithm);

RetryingFuture<V> retryingFuture = executor.createFuture(callable);
executor.submit(retryingFuture);
return retryingFuture.get();
}

public static class BigQueryRetryHelperException extends RuntimeException {

private static final long serialVersionUID = -8519852520090965314L;

BigQueryRetryHelperException(Throwable cause) {
super(cause);
}
}
}