Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: create Job retry for rate limit exceeded with status code 200 (#…
…1744)

* retrying job create with new job id each time

* retrying for rate limit exceeded on create Job regardless of the status code 200

* removing flattened pom

* refactoring, removing unnecesary variables, adding comments

* linting code

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* refactoring, removing comments

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
franklinWhaite and gcf-owl-bot[bot] committed Dec 14, 2021
1 parent faf5897 commit 97a61dc
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 19 deletions.
Expand Up @@ -347,37 +347,46 @@ public JobId get() {

@InternalApi("visible for testing")
Job create(JobInfo jobInfo, Supplier<JobId> idProvider, JobOption... options) {
boolean idRandom = false;
if (jobInfo.getJobId() == null) {
jobInfo = jobInfo.toBuilder().setJobId(idProvider.get()).build();
idRandom = true;
}
final com.google.api.services.bigquery.model.Job jobPb =
jobInfo.setProjectId(getOptions().getProjectId()).toPb();
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
final boolean idRandom = (jobInfo.getJobId() == null);

final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
BigQueryException createException;
// NOTE(pongad): This double-try structure is admittedly odd.
// translateAndThrow itself throws, and pretends to return an exception only
// so users can pretend to throw.
// This makes it difficult to translate without throwing.
// Fixing this entails some work on BaseServiceException.translate.
// Since that affects a bunch of APIs, we should fix this as a separate change.
final JobId[] finalJobId = new JobId[1];
try {
try {
return Job.fromPb(
this,
runWithRetries(
BigQueryRetryHelper.runWithRetries(
new Callable<com.google.api.services.bigquery.model.Job>() {
@Override
public com.google.api.services.bigquery.model.Job call() {
return bigQueryRpc.create(jobPb, optionsMap);
if (idRandom) {
// re-generate a new random job with the same jobInfo when jobId is not
// provided by the user
JobInfo recreatedJobInfo =
jobInfo.toBuilder().setJobId(idProvider.get()).build();
com.google.api.services.bigquery.model.Job newJobPb =
recreatedJobInfo.setProjectId(getOptions().getProjectId()).toPb();
finalJobId[0] = recreatedJobInfo.getJobId();
return bigQueryRpc.create(newJobPb, optionsMap);
} else {
com.google.api.services.bigquery.model.Job jobPb =
jobInfo.setProjectId(getOptions().getProjectId()).toPb();
return bigQueryRpc.create(jobPb, optionsMap);
}
}
},
getOptions().getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
getOptions().getClock()));
} catch (RetryHelper.RetryHelperException e) {
EXCEPTION_HANDLER,
getOptions().getClock(),
DEFAULT_RETRY_CONFIG));
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
} catch (BigQueryException e) {
Expand All @@ -396,7 +405,7 @@ public com.google.api.services.bigquery.model.Job call() {
// fetch a job created by someone else.
Job job;
try {
job = getJob(jobInfo.getJobId());
job = getJob(finalJobId[0]);
} catch (BigQueryException e) {
throw createException;
}
Expand Down
Expand Up @@ -72,7 +72,8 @@ public boolean shouldRetry(
// the exception messages
boolean shouldRetry =
(shouldRetryBasedOnResult(context, previousThrowable, previousResponse)
|| shouldRetryBasedOnBigQueryRetryConfig(previousThrowable, bigQueryRetryConfig))
|| shouldRetryBasedOnBigQueryRetryConfig(
previousThrowable, bigQueryRetryConfig, previousResponse))
&& shouldRetryBasedOnTiming(context, nextAttemptSettings);

if (LOG.isLoggable(Level.FINEST)) {
Expand All @@ -92,13 +93,26 @@ public boolean shouldRetry(
}

private boolean shouldRetryBasedOnBigQueryRetryConfig(
Throwable previousThrowable, BigQueryRetryConfig bigQueryRetryConfig) {
Throwable previousThrowable,
BigQueryRetryConfig bigQueryRetryConfig,
ResponseT previousResponse) {
/*
We are deciding if a given error should be retried on the basis of error message.
Cannot rely on Error/Status code as for example error code 400 (which is not retriable) could be thrown due to rateLimitExceed, which is retriable
*/
String errorDesc;
if (previousThrowable != null && (errorDesc = previousThrowable.getMessage()) != null) {
String errorDesc = null;
if (previousThrowable != null) {
errorDesc = previousThrowable.getMessage();
} else if (previousResponse != null) {
/*
In some cases error messages may come without an exception
e.g. status code 200 with a rate limit exceeded for job create
in these cases there is now previousThrowable so we need to check previousResponse
*/
errorDesc = previousResponse.toString();
}

if (errorDesc != null) {
errorDesc = errorDesc.toLowerCase(); // for case insensitive comparison
for (Iterator<String> retriableMessages =
bigQueryRetryConfig.getRetriableErrorMessages().iterator();
Expand Down Expand Up @@ -161,7 +175,8 @@ public TimedAttemptSettings createNextAttempt(
if (!((shouldRetryBasedOnResult(context, previousThrowable, previousResponse)
|| shouldRetryBasedOnBigQueryRetryConfig(
previousThrowable,
bigQueryRetryConfig)))) { // Calling shouldRetryBasedOnBigQueryRetryConfig to check if
bigQueryRetryConfig,
previousResponse)))) { // Calling shouldRetryBasedOnBigQueryRetryConfig to check if
// the error message could be retried
return null;
}
Expand Down
Expand Up @@ -1529,6 +1529,30 @@ public void testCreateJobSuccess() {
verify(bigqueryRpcMock).create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS));
}

@Test
public void testCreateJobFailureShouldRetry() {
when(bigqueryRpcMock.create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS)))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
.thenThrow(new BigQueryException(503, "Service Unavailable"))
.thenThrow(
new BigQueryException(
400, RATE_LIMIT_ERROR_MSG)) // retrial on based on RATE_LIMIT_EXCEEDED_MSG
.thenThrow(new BigQueryException(200, RATE_LIMIT_ERROR_MSG))
.thenReturn(newJobPb());

bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

((BigQueryImpl) bigquery).create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY));
verify(bigqueryRpcMock, times(6)).create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS));
}

@Test
public void testCreateJobWithSelectedFields() {
when(bigqueryRpcMock.create(
Expand Down

0 comments on commit 97a61dc

Please sign in to comment.