Skip to content

Commit

Permalink
fix: avoid possible job already exists error (#751)
Browse files Browse the repository at this point in the history
* fix: avoid possible job already exists error

If job create request fails, a query job might still have started
successfully. This commit handles this edge case and returns such
query job one can be found.

* Catch only Conflict errors on query job create
  • Loading branch information
plamut committed Jul 14, 2021
1 parent 5deef6f commit 45b9308
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 2 deletions.
26 changes: 24 additions & 2 deletions google/cloud/bigquery/client.py
Expand Up @@ -3190,6 +3190,7 @@ def query(
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.QueryJobConfig`
class.
"""
job_id_given = job_id is not None
job_id = _make_job_id(job_id, job_id_prefix)

if project is None:
Expand Down Expand Up @@ -3221,9 +3222,30 @@ def query(

job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config)
query_job._begin(retry=retry, timeout=timeout)

return query_job
try:
query_job._begin(retry=retry, timeout=timeout)
except core_exceptions.Conflict as create_exc:
# The thought is if someone is providing their own job IDs and they get
# their job ID generation wrong, this could end up returning results for
# the wrong query. We thus only try to recover if job ID was not given.
if job_id_given:
raise create_exc

try:
query_job = self.get_job(
job_id,
project=project,
location=location,
retry=retry,
timeout=timeout,
)
except core_exceptions.GoogleAPIError: # (includes RetryError)
raise create_exc
else:
return query_job
else:
return query_job

def insert_rows(
self,
Expand Down
75 changes: 75 additions & 0 deletions tests/unit/test_client.py
Expand Up @@ -4617,6 +4617,81 @@ def test_query_w_query_parameters(self):
},
)

def test_query_job_rpc_fail_w_random_error(self):
from google.api_core.exceptions import Unknown
from google.cloud.bigquery.job import QueryJob

creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

job_create_error = Unknown("Not sure what went wrong.")
job_begin_patcher = mock.patch.object(
QueryJob, "_begin", side_effect=job_create_error
)
with job_begin_patcher:
with pytest.raises(Unknown, match="Not sure what went wrong."):
client.query("SELECT 1;", job_id="123")

def test_query_job_rpc_fail_w_conflict_job_id_given(self):
from google.api_core.exceptions import Conflict
from google.cloud.bigquery.job import QueryJob

creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

job_create_error = Conflict("Job already exists.")
job_begin_patcher = mock.patch.object(
QueryJob, "_begin", side_effect=job_create_error
)
with job_begin_patcher:
with pytest.raises(Conflict, match="Job already exists."):
client.query("SELECT 1;", job_id="123")

def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_fails(self):
from google.api_core.exceptions import Conflict
from google.api_core.exceptions import DataLoss
from google.cloud.bigquery.job import QueryJob

creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

job_create_error = Conflict("Job already exists.")
job_begin_patcher = mock.patch.object(
QueryJob, "_begin", side_effect=job_create_error
)
get_job_patcher = mock.patch.object(
client, "get_job", side_effect=DataLoss("we lost yor job, sorry")
)

with job_begin_patcher, get_job_patcher:
# If get job request fails, the original exception should be raised.
with pytest.raises(Conflict, match="Job already exists."):
client.query("SELECT 1;", job_id=None)

def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_succeeds(self):
from google.api_core.exceptions import Conflict
from google.cloud.bigquery.job import QueryJob

creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

job_create_error = Conflict("Job already exists.")
job_begin_patcher = mock.patch.object(
QueryJob, "_begin", side_effect=job_create_error
)
get_job_patcher = mock.patch.object(
client, "get_job", return_value=mock.sentinel.query_job
)

with job_begin_patcher, get_job_patcher:
result = client.query("SELECT 1;", job_id=None)

assert result is mock.sentinel.query_job

def test_insert_rows_w_timeout(self):
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import Table
Expand Down

0 comments on commit 45b9308

Please sign in to comment.