diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 10127e10d..8211e23a3 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -1734,12 +1734,20 @@ def get_job( https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get Args: - job_id (str): Unique job identifier. + job_id (Union[ \ + str, \ + google.cloud.bigquery.job.LoadJob, \ + google.cloud.bigquery.job.CopyJob, \ + google.cloud.bigquery.job.ExtractJob, \ + google.cloud.bigquery.job.QueryJob \ + ]): Job identifier. Keyword Arguments: project (Optional[str]): ID of the project which owns the job (defaults to the client's project). - location (Optional[str]): Location where the job was run. + location (Optional[str]): + Location where the job was run. Ignored if ``job_id`` is a job + object. retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. timeout (Optional[float]): @@ -1757,6 +1765,10 @@ def get_job( """ extra_params = {"projection": "full"} + project, location, job_id = _extract_job_reference( + job_id, project=project, location=location + ) + if project is None: project = self.project @@ -1791,12 +1803,20 @@ def cancel_job( https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel Args: - job_id (str): Unique job identifier. + job_id (Union[ \ + str, \ + google.cloud.bigquery.job.LoadJob, \ + google.cloud.bigquery.job.CopyJob, \ + google.cloud.bigquery.job.ExtractJob, \ + google.cloud.bigquery.job.QueryJob \ + ]): Job identifier. Keyword Arguments: project (Optional[str]): ID of the project which owns the job (defaults to the client's project). - location (Optional[str]): Location where the job was run. + location (Optional[str]): + Location where the job was run. Ignored if ``job_id`` is a job + object. retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. timeout (Optional[float]): @@ -1814,6 +1834,10 @@ def cancel_job( """ extra_params = {"projection": "full"} + project, location, job_id = _extract_job_reference( + job_id, project=project, location=location + ) + if project is None: project = self.project @@ -3518,6 +3542,37 @@ def _item_to_table(iterator, resource): return TableListItem(resource) +def _extract_job_reference(job, project=None, location=None): + """Extract fully-qualified job reference from a job-like object. + + Args: + job_id (Union[ \ + str, \ + google.cloud.bigquery.job.LoadJob, \ + google.cloud.bigquery.job.CopyJob, \ + google.cloud.bigquery.job.ExtractJob, \ + google.cloud.bigquery.job.QueryJob \ + ]): Job identifier. + project (Optional[str]): + Project where the job was run. Ignored if ``job_id`` is a job + object. + location (Optional[str]): + Location where the job was run. Ignored if ``job_id`` is a job + object. + + Returns: + Tuple[str, str, str]: ``(project, location, job_id)`` + """ + if hasattr(job, "job_id"): + project = job.project + job_id = job.job_id + location = job.location + else: + job_id = job + + return (project, location, job_id) + + def _make_job_id(job_id, prefix=None): """Construct an ID for a new job. diff --git a/tests/system/test_client.py b/tests/system/test_client.py index 024441012..f31d994ca 100644 --- a/tests/system/test_client.py +++ b/tests/system/test_client.py @@ -189,7 +189,9 @@ def test_get_service_account_email(self): def _create_bucket(self, bucket_name, location=None): storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) - retry_storage_errors(bucket.create)(location=location) + retry_storage_errors(storage_client.create_bucket)( + bucket_name, location=location + ) self.to_delete.append(bucket) return bucket @@ -872,7 +874,7 @@ def test_load_table_from_file_w_explicit_location(self): job_id = load_job.job_id # Can get the job from the EU. - load_job = client.get_job(job_id, location="EU") + load_job = client.get_job(load_job) self.assertEqual(job_id, load_job.job_id) self.assertEqual("EU", load_job.location) self.assertTrue(load_job.exists()) @@ -889,7 +891,7 @@ def test_load_table_from_file_w_explicit_location(self): # Can cancel the job from the EU. self.assertTrue(load_job.cancel()) - load_job = client.cancel_job(job_id, location="EU") + load_job = client.cancel_job(load_job) self.assertEqual(job_id, load_job.job_id) self.assertEqual("EU", load_job.location) @@ -1204,8 +1206,7 @@ def test_query_w_timeout(self): # Even though the query takes >1 second, the call to getQueryResults # should succeed. self.assertFalse(query_job.done(timeout=1)) - - Config.CLIENT.cancel_job(query_job.job_id, location=query_job.location) + self.assertIsNotNone(Config.CLIENT.cancel_job(query_job)) def test_query_w_page_size(self): page_size = 45 diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 96e51678f..c5e742c9e 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -2933,31 +2933,30 @@ def test_get_job_miss_w_explict_project(self): conn = client._connection = make_connection() with self.assertRaises(NotFound): - client.get_job(JOB_ID, project=OTHER_PROJECT, location=self.LOCATION) + client.get_job(JOB_ID, project=OTHER_PROJECT) conn.api_request.assert_called_once_with( method="GET", path="/projects/OTHER_PROJECT/jobs/NONESUCH", - query_params={"projection": "full", "location": self.LOCATION}, + query_params={"projection": "full"}, timeout=None, ) def test_get_job_miss_w_client_location(self): from google.cloud.exceptions import NotFound - OTHER_PROJECT = "OTHER_PROJECT" JOB_ID = "NONESUCH" creds = _make_credentials() - client = self._make_one(self.PROJECT, creds, location=self.LOCATION) + client = self._make_one("client-proj", creds, location="client-loc") conn = client._connection = make_connection() with self.assertRaises(NotFound): - client.get_job(JOB_ID, project=OTHER_PROJECT) + client.get_job(JOB_ID) conn.api_request.assert_called_once_with( method="GET", - path="/projects/OTHER_PROJECT/jobs/NONESUCH", - query_params={"projection": "full", "location": self.LOCATION}, + path="/projects/client-proj/jobs/NONESUCH", + query_params={"projection": "full", "location": "client-loc"}, timeout=None, ) @@ -2971,7 +2970,11 @@ def test_get_job_hit_w_timeout(self): QUERY = "SELECT * from test_dataset:test_table" ASYNC_QUERY_DATA = { "id": "{}:{}".format(self.PROJECT, JOB_ID), - "jobReference": {"projectId": self.PROJECT, "jobId": "query_job"}, + "jobReference": { + "projectId": "resource-proj", + "jobId": "query_job", + "location": "us-east1", + }, "state": "DONE", "configuration": { "query": { @@ -2989,18 +2992,21 @@ def test_get_job_hit_w_timeout(self): creds = _make_credentials() client = self._make_one(self.PROJECT, creds) conn = client._connection = make_connection(ASYNC_QUERY_DATA) + job_from_resource = QueryJob.from_api_repr(ASYNC_QUERY_DATA, client) - job = client.get_job(JOB_ID, timeout=7.5) + job = client.get_job(job_from_resource, timeout=7.5) self.assertIsInstance(job, QueryJob) self.assertEqual(job.job_id, JOB_ID) + self.assertEqual(job.project, "resource-proj") + self.assertEqual(job.location, "us-east1") self.assertEqual(job.create_disposition, CreateDisposition.CREATE_IF_NEEDED) self.assertEqual(job.write_disposition, WriteDisposition.WRITE_TRUNCATE) conn.api_request.assert_called_once_with( method="GET", - path="/projects/PROJECT/jobs/query_job", - query_params={"projection": "full"}, + path="/projects/resource-proj/jobs/query_job", + query_params={"projection": "full", "location": "us-east1"}, timeout=7.5, ) @@ -3049,7 +3055,11 @@ def test_cancel_job_hit(self): QUERY = "SELECT * from test_dataset:test_table" QUERY_JOB_RESOURCE = { "id": "{}:{}".format(self.PROJECT, JOB_ID), - "jobReference": {"projectId": self.PROJECT, "jobId": "query_job"}, + "jobReference": { + "projectId": "job-based-proj", + "jobId": "query_job", + "location": "asia-northeast1", + }, "state": "RUNNING", "configuration": {"query": {"query": QUERY}}, } @@ -3057,17 +3067,20 @@ def test_cancel_job_hit(self): creds = _make_credentials() client = self._make_one(self.PROJECT, creds) conn = client._connection = make_connection(RESOURCE) + job_from_resource = QueryJob.from_api_repr(QUERY_JOB_RESOURCE, client) - job = client.cancel_job(JOB_ID) + job = client.cancel_job(job_from_resource) self.assertIsInstance(job, QueryJob) self.assertEqual(job.job_id, JOB_ID) + self.assertEqual(job.project, "job-based-proj") + self.assertEqual(job.location, "asia-northeast1") self.assertEqual(job.query, QUERY) conn.api_request.assert_called_once_with( method="POST", - path="/projects/PROJECT/jobs/query_job/cancel", - query_params={"projection": "full"}, + path="/projects/job-based-proj/jobs/query_job/cancel", + query_params={"projection": "full", "location": "asia-northeast1"}, timeout=None, )