From 45b93089f5398740413104285cc8acfd5ebc9c08 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 14 Jul 2021 21:20:27 +0200 Subject: [PATCH] fix: avoid possible job already exists error (#751) * fix: avoid possible job already exists error If job create request fails, a query job might still have started successfully. This commit handles this edge case and returns such query job one can be found. * Catch only Conflict errors on query job create --- google/cloud/bigquery/client.py | 26 +++++++++++- tests/unit/test_client.py | 75 +++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 2a02c7629..de259abce 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -3190,6 +3190,7 @@ def query( If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.QueryJobConfig` class. """ + job_id_given = job_id is not None job_id = _make_job_id(job_id, job_id_prefix) if project is None: @@ -3221,9 +3222,30 @@ def query( job_ref = job._JobReference(job_id, project=project, location=location) query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config) - query_job._begin(retry=retry, timeout=timeout) - return query_job + try: + query_job._begin(retry=retry, timeout=timeout) + except core_exceptions.Conflict as create_exc: + # The thought is if someone is providing their own job IDs and they get + # their job ID generation wrong, this could end up returning results for + # the wrong query. We thus only try to recover if job ID was not given. + if job_id_given: + raise create_exc + + try: + query_job = self.get_job( + job_id, + project=project, + location=location, + retry=retry, + timeout=timeout, + ) + except core_exceptions.GoogleAPIError: # (includes RetryError) + raise create_exc + else: + return query_job + else: + return query_job def insert_rows( self, diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index dffe7bdba..2be8daab6 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -4617,6 +4617,81 @@ def test_query_w_query_parameters(self): }, ) + def test_query_job_rpc_fail_w_random_error(self): + from google.api_core.exceptions import Unknown + from google.cloud.bigquery.job import QueryJob + + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + job_create_error = Unknown("Not sure what went wrong.") + job_begin_patcher = mock.patch.object( + QueryJob, "_begin", side_effect=job_create_error + ) + with job_begin_patcher: + with pytest.raises(Unknown, match="Not sure what went wrong."): + client.query("SELECT 1;", job_id="123") + + def test_query_job_rpc_fail_w_conflict_job_id_given(self): + from google.api_core.exceptions import Conflict + from google.cloud.bigquery.job import QueryJob + + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + job_create_error = Conflict("Job already exists.") + job_begin_patcher = mock.patch.object( + QueryJob, "_begin", side_effect=job_create_error + ) + with job_begin_patcher: + with pytest.raises(Conflict, match="Job already exists."): + client.query("SELECT 1;", job_id="123") + + def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_fails(self): + from google.api_core.exceptions import Conflict + from google.api_core.exceptions import DataLoss + from google.cloud.bigquery.job import QueryJob + + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + job_create_error = Conflict("Job already exists.") + job_begin_patcher = mock.patch.object( + QueryJob, "_begin", side_effect=job_create_error + ) + get_job_patcher = mock.patch.object( + client, "get_job", side_effect=DataLoss("we lost yor job, sorry") + ) + + with job_begin_patcher, get_job_patcher: + # If get job request fails, the original exception should be raised. + with pytest.raises(Conflict, match="Job already exists."): + client.query("SELECT 1;", job_id=None) + + def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_succeeds(self): + from google.api_core.exceptions import Conflict + from google.cloud.bigquery.job import QueryJob + + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + job_create_error = Conflict("Job already exists.") + job_begin_patcher = mock.patch.object( + QueryJob, "_begin", side_effect=job_create_error + ) + get_job_patcher = mock.patch.object( + client, "get_job", return_value=mock.sentinel.query_job + ) + + with job_begin_patcher, get_job_patcher: + result = client.query("SELECT 1;", job_id=None) + + assert result is mock.sentinel.query_job + def test_insert_rows_w_timeout(self): from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table