diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java index 7d0e4c555..16814e99a 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java @@ -347,15 +347,9 @@ public JobId get() { @InternalApi("visible for testing") Job create(JobInfo jobInfo, Supplier idProvider, JobOption... options) { - boolean idRandom = false; - if (jobInfo.getJobId() == null) { - jobInfo = jobInfo.toBuilder().setJobId(idProvider.get()).build(); - idRandom = true; - } - final com.google.api.services.bigquery.model.Job jobPb = - jobInfo.setProjectId(getOptions().getProjectId()).toPb(); - final Map optionsMap = optionMap(options); + final boolean idRandom = (jobInfo.getJobId() == null); + final Map optionsMap = optionMap(options); BigQueryException createException; // NOTE(pongad): This double-try structure is admittedly odd. // translateAndThrow itself throws, and pretends to return an exception only @@ -363,21 +357,36 @@ Job create(JobInfo jobInfo, Supplier idProvider, JobOption... options) { // This makes it difficult to translate without throwing. // Fixing this entails some work on BaseServiceException.translate. // Since that affects a bunch of APIs, we should fix this as a separate change. + final JobId[] finalJobId = new JobId[1]; try { try { return Job.fromPb( this, - runWithRetries( + BigQueryRetryHelper.runWithRetries( new Callable() { @Override public com.google.api.services.bigquery.model.Job call() { - return bigQueryRpc.create(jobPb, optionsMap); + if (idRandom) { + // re-generate a new random job with the same jobInfo when jobId is not + // provided by the user + JobInfo recreatedJobInfo = + jobInfo.toBuilder().setJobId(idProvider.get()).build(); + com.google.api.services.bigquery.model.Job newJobPb = + recreatedJobInfo.setProjectId(getOptions().getProjectId()).toPb(); + finalJobId[0] = recreatedJobInfo.getJobId(); + return bigQueryRpc.create(newJobPb, optionsMap); + } else { + com.google.api.services.bigquery.model.Job jobPb = + jobInfo.setProjectId(getOptions().getProjectId()).toPb(); + return bigQueryRpc.create(jobPb, optionsMap); + } } }, getOptions().getRetrySettings(), - BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER, - getOptions().getClock())); - } catch (RetryHelper.RetryHelperException e) { + EXCEPTION_HANDLER, + getOptions().getClock(), + DEFAULT_RETRY_CONFIG)); + } catch (BigQueryRetryHelper.BigQueryRetryHelperException e) { throw BigQueryException.translateAndThrow(e); } } catch (BigQueryException e) { @@ -396,7 +405,7 @@ public com.google.api.services.bigquery.model.Job call() { // fetch a job created by someone else. Job job; try { - job = getJob(jobInfo.getJobId()); + job = getJob(finalJobId[0]); } catch (BigQueryException e) { throw createException; } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryAlgorithm.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryAlgorithm.java index a4e642191..af472430f 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryAlgorithm.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryAlgorithm.java @@ -72,7 +72,8 @@ public boolean shouldRetry( // the exception messages boolean shouldRetry = (shouldRetryBasedOnResult(context, previousThrowable, previousResponse) - || shouldRetryBasedOnBigQueryRetryConfig(previousThrowable, bigQueryRetryConfig)) + || shouldRetryBasedOnBigQueryRetryConfig( + previousThrowable, bigQueryRetryConfig, previousResponse)) && shouldRetryBasedOnTiming(context, nextAttemptSettings); if (LOG.isLoggable(Level.FINEST)) { @@ -92,13 +93,26 @@ public boolean shouldRetry( } private boolean shouldRetryBasedOnBigQueryRetryConfig( - Throwable previousThrowable, BigQueryRetryConfig bigQueryRetryConfig) { + Throwable previousThrowable, + BigQueryRetryConfig bigQueryRetryConfig, + ResponseT previousResponse) { /* We are deciding if a given error should be retried on the basis of error message. Cannot rely on Error/Status code as for example error code 400 (which is not retriable) could be thrown due to rateLimitExceed, which is retriable */ - String errorDesc; - if (previousThrowable != null && (errorDesc = previousThrowable.getMessage()) != null) { + String errorDesc = null; + if (previousThrowable != null) { + errorDesc = previousThrowable.getMessage(); + } else if (previousResponse != null) { + /* + In some cases error messages may come without an exception + e.g. status code 200 with a rate limit exceeded for job create + in these cases there is now previousThrowable so we need to check previousResponse + */ + errorDesc = previousResponse.toString(); + } + + if (errorDesc != null) { errorDesc = errorDesc.toLowerCase(); // for case insensitive comparison for (Iterator retriableMessages = bigQueryRetryConfig.getRetriableErrorMessages().iterator(); @@ -161,7 +175,8 @@ public TimedAttemptSettings createNextAttempt( if (!((shouldRetryBasedOnResult(context, previousThrowable, previousResponse) || shouldRetryBasedOnBigQueryRetryConfig( previousThrowable, - bigQueryRetryConfig)))) { // Calling shouldRetryBasedOnBigQueryRetryConfig to check if + bigQueryRetryConfig, + previousResponse)))) { // Calling shouldRetryBasedOnBigQueryRetryConfig to check if // the error message could be retried return null; } diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java index 760c84f32..b00d4b860 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java @@ -1529,6 +1529,30 @@ public void testCreateJobSuccess() { verify(bigqueryRpcMock).create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS)); } + @Test + public void testCreateJobFailureShouldRetry() { + when(bigqueryRpcMock.create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS))) + .thenThrow(new BigQueryException(500, "InternalError")) + .thenThrow(new BigQueryException(502, "Bad Gateway")) + .thenThrow(new BigQueryException(503, "Service Unavailable")) + .thenThrow( + new BigQueryException( + 400, RATE_LIMIT_ERROR_MSG)) // retrial on based on RATE_LIMIT_EXCEEDED_MSG + .thenThrow(new BigQueryException(200, RATE_LIMIT_ERROR_MSG)) + .thenReturn(newJobPb()); + + bigquery = options.getService(); + bigquery = + options + .toBuilder() + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .build() + .getService(); + + ((BigQueryImpl) bigquery).create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY)); + verify(bigqueryRpcMock, times(6)).create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS)); + } + @Test public void testCreateJobWithSelectedFields() { when(bigqueryRpcMock.create(