From f75dcdf3943b87daba60011c9a3b42e34ff81910 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 15 Apr 2021 18:40:04 -0500 Subject: [PATCH] feat: accept job object as argument to `get_job` and `cancel_job` (#617) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This allows one to more easily cancel or get updated metadata for an existing job from the client class. Ensures that project ID and location are correctly populated. Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #616 🦕 --- google/cloud/bigquery/client.py | 63 ++++++++++++++++++++++++++++++--- tests/system/test_client.py | 11 +++--- tests/unit/test_client.py | 43 ++++++++++++++-------- 3 files changed, 93 insertions(+), 24 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 10127e10d..8211e23a3 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -1734,12 +1734,20 @@ def get_job( https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get Args: - job_id (str): Unique job identifier. + job_id (Union[ \ + str, \ + google.cloud.bigquery.job.LoadJob, \ + google.cloud.bigquery.job.CopyJob, \ + google.cloud.bigquery.job.ExtractJob, \ + google.cloud.bigquery.job.QueryJob \ + ]): Job identifier. Keyword Arguments: project (Optional[str]): ID of the project which owns the job (defaults to the client's project). - location (Optional[str]): Location where the job was run. + location (Optional[str]): + Location where the job was run. Ignored if ``job_id`` is a job + object. retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. timeout (Optional[float]): @@ -1757,6 +1765,10 @@ def get_job( """ extra_params = {"projection": "full"} + project, location, job_id = _extract_job_reference( + job_id, project=project, location=location + ) + if project is None: project = self.project @@ -1791,12 +1803,20 @@ def cancel_job( https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel Args: - job_id (str): Unique job identifier. + job_id (Union[ \ + str, \ + google.cloud.bigquery.job.LoadJob, \ + google.cloud.bigquery.job.CopyJob, \ + google.cloud.bigquery.job.ExtractJob, \ + google.cloud.bigquery.job.QueryJob \ + ]): Job identifier. Keyword Arguments: project (Optional[str]): ID of the project which owns the job (defaults to the client's project). - location (Optional[str]): Location where the job was run. + location (Optional[str]): + Location where the job was run. Ignored if ``job_id`` is a job + object. retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. timeout (Optional[float]): @@ -1814,6 +1834,10 @@ def cancel_job( """ extra_params = {"projection": "full"} + project, location, job_id = _extract_job_reference( + job_id, project=project, location=location + ) + if project is None: project = self.project @@ -3518,6 +3542,37 @@ def _item_to_table(iterator, resource): return TableListItem(resource) +def _extract_job_reference(job, project=None, location=None): + """Extract fully-qualified job reference from a job-like object. + + Args: + job_id (Union[ \ + str, \ + google.cloud.bigquery.job.LoadJob, \ + google.cloud.bigquery.job.CopyJob, \ + google.cloud.bigquery.job.ExtractJob, \ + google.cloud.bigquery.job.QueryJob \ + ]): Job identifier. + project (Optional[str]): + Project where the job was run. Ignored if ``job_id`` is a job + object. + location (Optional[str]): + Location where the job was run. Ignored if ``job_id`` is a job + object. + + Returns: + Tuple[str, str, str]: ``(project, location, job_id)`` + """ + if hasattr(job, "job_id"): + project = job.project + job_id = job.job_id + location = job.location + else: + job_id = job + + return (project, location, job_id) + + def _make_job_id(job_id, prefix=None): """Construct an ID for a new job. diff --git a/tests/system/test_client.py b/tests/system/test_client.py index 024441012..f31d994ca 100644 --- a/tests/system/test_client.py +++ b/tests/system/test_client.py @@ -189,7 +189,9 @@ def test_get_service_account_email(self): def _create_bucket(self, bucket_name, location=None): storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) - retry_storage_errors(bucket.create)(location=location) + retry_storage_errors(storage_client.create_bucket)( + bucket_name, location=location + ) self.to_delete.append(bucket) return bucket @@ -872,7 +874,7 @@ def test_load_table_from_file_w_explicit_location(self): job_id = load_job.job_id # Can get the job from the EU. - load_job = client.get_job(job_id, location="EU") + load_job = client.get_job(load_job) self.assertEqual(job_id, load_job.job_id) self.assertEqual("EU", load_job.location) self.assertTrue(load_job.exists()) @@ -889,7 +891,7 @@ def test_load_table_from_file_w_explicit_location(self): # Can cancel the job from the EU. self.assertTrue(load_job.cancel()) - load_job = client.cancel_job(job_id, location="EU") + load_job = client.cancel_job(load_job) self.assertEqual(job_id, load_job.job_id) self.assertEqual("EU", load_job.location) @@ -1204,8 +1206,7 @@ def test_query_w_timeout(self): # Even though the query takes >1 second, the call to getQueryResults # should succeed. self.assertFalse(query_job.done(timeout=1)) - - Config.CLIENT.cancel_job(query_job.job_id, location=query_job.location) + self.assertIsNotNone(Config.CLIENT.cancel_job(query_job)) def test_query_w_page_size(self): page_size = 45 diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 96e51678f..c5e742c9e 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -2933,31 +2933,30 @@ def test_get_job_miss_w_explict_project(self): conn = client._connection = make_connection() with self.assertRaises(NotFound): - client.get_job(JOB_ID, project=OTHER_PROJECT, location=self.LOCATION) + client.get_job(JOB_ID, project=OTHER_PROJECT) conn.api_request.assert_called_once_with( method="GET", path="/projects/OTHER_PROJECT/jobs/NONESUCH", - query_params={"projection": "full", "location": self.LOCATION}, + query_params={"projection": "full"}, timeout=None, ) def test_get_job_miss_w_client_location(self): from google.cloud.exceptions import NotFound - OTHER_PROJECT = "OTHER_PROJECT" JOB_ID = "NONESUCH" creds = _make_credentials() - client = self._make_one(self.PROJECT, creds, location=self.LOCATION) + client = self._make_one("client-proj", creds, location="client-loc") conn = client._connection = make_connection() with self.assertRaises(NotFound): - client.get_job(JOB_ID, project=OTHER_PROJECT) + client.get_job(JOB_ID) conn.api_request.assert_called_once_with( method="GET", - path="/projects/OTHER_PROJECT/jobs/NONESUCH", - query_params={"projection": "full", "location": self.LOCATION}, + path="/projects/client-proj/jobs/NONESUCH", + query_params={"projection": "full", "location": "client-loc"}, timeout=None, ) @@ -2971,7 +2970,11 @@ def test_get_job_hit_w_timeout(self): QUERY = "SELECT * from test_dataset:test_table" ASYNC_QUERY_DATA = { "id": "{}:{}".format(self.PROJECT, JOB_ID), - "jobReference": {"projectId": self.PROJECT, "jobId": "query_job"}, + "jobReference": { + "projectId": "resource-proj", + "jobId": "query_job", + "location": "us-east1", + }, "state": "DONE", "configuration": { "query": { @@ -2989,18 +2992,21 @@ def test_get_job_hit_w_timeout(self): creds = _make_credentials() client = self._make_one(self.PROJECT, creds) conn = client._connection = make_connection(ASYNC_QUERY_DATA) + job_from_resource = QueryJob.from_api_repr(ASYNC_QUERY_DATA, client) - job = client.get_job(JOB_ID, timeout=7.5) + job = client.get_job(job_from_resource, timeout=7.5) self.assertIsInstance(job, QueryJob) self.assertEqual(job.job_id, JOB_ID) + self.assertEqual(job.project, "resource-proj") + self.assertEqual(job.location, "us-east1") self.assertEqual(job.create_disposition, CreateDisposition.CREATE_IF_NEEDED) self.assertEqual(job.write_disposition, WriteDisposition.WRITE_TRUNCATE) conn.api_request.assert_called_once_with( method="GET", - path="/projects/PROJECT/jobs/query_job", - query_params={"projection": "full"}, + path="/projects/resource-proj/jobs/query_job", + query_params={"projection": "full", "location": "us-east1"}, timeout=7.5, ) @@ -3049,7 +3055,11 @@ def test_cancel_job_hit(self): QUERY = "SELECT * from test_dataset:test_table" QUERY_JOB_RESOURCE = { "id": "{}:{}".format(self.PROJECT, JOB_ID), - "jobReference": {"projectId": self.PROJECT, "jobId": "query_job"}, + "jobReference": { + "projectId": "job-based-proj", + "jobId": "query_job", + "location": "asia-northeast1", + }, "state": "RUNNING", "configuration": {"query": {"query": QUERY}}, } @@ -3057,17 +3067,20 @@ def test_cancel_job_hit(self): creds = _make_credentials() client = self._make_one(self.PROJECT, creds) conn = client._connection = make_connection(RESOURCE) + job_from_resource = QueryJob.from_api_repr(QUERY_JOB_RESOURCE, client) - job = client.cancel_job(JOB_ID) + job = client.cancel_job(job_from_resource) self.assertIsInstance(job, QueryJob) self.assertEqual(job.job_id, JOB_ID) + self.assertEqual(job.project, "job-based-proj") + self.assertEqual(job.location, "asia-northeast1") self.assertEqual(job.query, QUERY) conn.api_request.assert_called_once_with( method="POST", - path="/projects/PROJECT/jobs/query_job/cancel", - query_params={"projection": "full"}, + path="/projects/job-based-proj/jobs/query_job/cancel", + query_params={"projection": "full", "location": "asia-northeast1"}, timeout=None, )