Skip to content

Commit

Permalink
feat: accept job object as argument to get_job and cancel_job (#617)
Browse files Browse the repository at this point in the history
This allows one to more easily cancel or get updated metadata for an
existing job from the client class. Ensures that project ID and location
are correctly populated.

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #616 🦕
  • Loading branch information
tswast committed Apr 15, 2021
1 parent 72d4c4a commit f75dcdf
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 24 deletions.
63 changes: 59 additions & 4 deletions google/cloud/bigquery/client.py
Expand Up @@ -1734,12 +1734,20 @@ def get_job(
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
Args:
job_id (str): Unique job identifier.
job_id (Union[ \
str, \
google.cloud.bigquery.job.LoadJob, \
google.cloud.bigquery.job.CopyJob, \
google.cloud.bigquery.job.ExtractJob, \
google.cloud.bigquery.job.QueryJob \
]): Job identifier.
Keyword Arguments:
project (Optional[str]):
ID of the project which owns the job (defaults to the client's project).
location (Optional[str]): Location where the job was run.
location (Optional[str]):
Location where the job was run. Ignored if ``job_id`` is a job
object.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
timeout (Optional[float]):
Expand All @@ -1757,6 +1765,10 @@ def get_job(
"""
extra_params = {"projection": "full"}

project, location, job_id = _extract_job_reference(
job_id, project=project, location=location
)

if project is None:
project = self.project

Expand Down Expand Up @@ -1791,12 +1803,20 @@ def cancel_job(
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/cancel
Args:
job_id (str): Unique job identifier.
job_id (Union[ \
str, \
google.cloud.bigquery.job.LoadJob, \
google.cloud.bigquery.job.CopyJob, \
google.cloud.bigquery.job.ExtractJob, \
google.cloud.bigquery.job.QueryJob \
]): Job identifier.
Keyword Arguments:
project (Optional[str]):
ID of the project which owns the job (defaults to the client's project).
location (Optional[str]): Location where the job was run.
location (Optional[str]):
Location where the job was run. Ignored if ``job_id`` is a job
object.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
timeout (Optional[float]):
Expand All @@ -1814,6 +1834,10 @@ def cancel_job(
"""
extra_params = {"projection": "full"}

project, location, job_id = _extract_job_reference(
job_id, project=project, location=location
)

if project is None:
project = self.project

Expand Down Expand Up @@ -3518,6 +3542,37 @@ def _item_to_table(iterator, resource):
return TableListItem(resource)


def _extract_job_reference(job, project=None, location=None):
"""Extract fully-qualified job reference from a job-like object.
Args:
job_id (Union[ \
str, \
google.cloud.bigquery.job.LoadJob, \
google.cloud.bigquery.job.CopyJob, \
google.cloud.bigquery.job.ExtractJob, \
google.cloud.bigquery.job.QueryJob \
]): Job identifier.
project (Optional[str]):
Project where the job was run. Ignored if ``job_id`` is a job
object.
location (Optional[str]):
Location where the job was run. Ignored if ``job_id`` is a job
object.
Returns:
Tuple[str, str, str]: ``(project, location, job_id)``
"""
if hasattr(job, "job_id"):
project = job.project
job_id = job.job_id
location = job.location
else:
job_id = job

return (project, location, job_id)


def _make_job_id(job_id, prefix=None):
"""Construct an ID for a new job.
Expand Down
11 changes: 6 additions & 5 deletions tests/system/test_client.py
Expand Up @@ -189,7 +189,9 @@ def test_get_service_account_email(self):
def _create_bucket(self, bucket_name, location=None):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
retry_storage_errors(bucket.create)(location=location)
retry_storage_errors(storage_client.create_bucket)(
bucket_name, location=location
)
self.to_delete.append(bucket)

return bucket
Expand Down Expand Up @@ -872,7 +874,7 @@ def test_load_table_from_file_w_explicit_location(self):
job_id = load_job.job_id

# Can get the job from the EU.
load_job = client.get_job(job_id, location="EU")
load_job = client.get_job(load_job)
self.assertEqual(job_id, load_job.job_id)
self.assertEqual("EU", load_job.location)
self.assertTrue(load_job.exists())
Expand All @@ -889,7 +891,7 @@ def test_load_table_from_file_w_explicit_location(self):

# Can cancel the job from the EU.
self.assertTrue(load_job.cancel())
load_job = client.cancel_job(job_id, location="EU")
load_job = client.cancel_job(load_job)
self.assertEqual(job_id, load_job.job_id)
self.assertEqual("EU", load_job.location)

Expand Down Expand Up @@ -1204,8 +1206,7 @@ def test_query_w_timeout(self):
# Even though the query takes >1 second, the call to getQueryResults
# should succeed.
self.assertFalse(query_job.done(timeout=1))

Config.CLIENT.cancel_job(query_job.job_id, location=query_job.location)
self.assertIsNotNone(Config.CLIENT.cancel_job(query_job))

def test_query_w_page_size(self):
page_size = 45
Expand Down
43 changes: 28 additions & 15 deletions tests/unit/test_client.py
Expand Up @@ -2933,31 +2933,30 @@ def test_get_job_miss_w_explict_project(self):
conn = client._connection = make_connection()

with self.assertRaises(NotFound):
client.get_job(JOB_ID, project=OTHER_PROJECT, location=self.LOCATION)
client.get_job(JOB_ID, project=OTHER_PROJECT)

conn.api_request.assert_called_once_with(
method="GET",
path="/projects/OTHER_PROJECT/jobs/NONESUCH",
query_params={"projection": "full", "location": self.LOCATION},
query_params={"projection": "full"},
timeout=None,
)

def test_get_job_miss_w_client_location(self):
from google.cloud.exceptions import NotFound

OTHER_PROJECT = "OTHER_PROJECT"
JOB_ID = "NONESUCH"
creds = _make_credentials()
client = self._make_one(self.PROJECT, creds, location=self.LOCATION)
client = self._make_one("client-proj", creds, location="client-loc")
conn = client._connection = make_connection()

with self.assertRaises(NotFound):
client.get_job(JOB_ID, project=OTHER_PROJECT)
client.get_job(JOB_ID)

conn.api_request.assert_called_once_with(
method="GET",
path="/projects/OTHER_PROJECT/jobs/NONESUCH",
query_params={"projection": "full", "location": self.LOCATION},
path="/projects/client-proj/jobs/NONESUCH",
query_params={"projection": "full", "location": "client-loc"},
timeout=None,
)

Expand All @@ -2971,7 +2970,11 @@ def test_get_job_hit_w_timeout(self):
QUERY = "SELECT * from test_dataset:test_table"
ASYNC_QUERY_DATA = {
"id": "{}:{}".format(self.PROJECT, JOB_ID),
"jobReference": {"projectId": self.PROJECT, "jobId": "query_job"},
"jobReference": {
"projectId": "resource-proj",
"jobId": "query_job",
"location": "us-east1",
},
"state": "DONE",
"configuration": {
"query": {
Expand All @@ -2989,18 +2992,21 @@ def test_get_job_hit_w_timeout(self):
creds = _make_credentials()
client = self._make_one(self.PROJECT, creds)
conn = client._connection = make_connection(ASYNC_QUERY_DATA)
job_from_resource = QueryJob.from_api_repr(ASYNC_QUERY_DATA, client)

job = client.get_job(JOB_ID, timeout=7.5)
job = client.get_job(job_from_resource, timeout=7.5)

self.assertIsInstance(job, QueryJob)
self.assertEqual(job.job_id, JOB_ID)
self.assertEqual(job.project, "resource-proj")
self.assertEqual(job.location, "us-east1")
self.assertEqual(job.create_disposition, CreateDisposition.CREATE_IF_NEEDED)
self.assertEqual(job.write_disposition, WriteDisposition.WRITE_TRUNCATE)

conn.api_request.assert_called_once_with(
method="GET",
path="/projects/PROJECT/jobs/query_job",
query_params={"projection": "full"},
path="/projects/resource-proj/jobs/query_job",
query_params={"projection": "full", "location": "us-east1"},
timeout=7.5,
)

Expand Down Expand Up @@ -3049,25 +3055,32 @@ def test_cancel_job_hit(self):
QUERY = "SELECT * from test_dataset:test_table"
QUERY_JOB_RESOURCE = {
"id": "{}:{}".format(self.PROJECT, JOB_ID),
"jobReference": {"projectId": self.PROJECT, "jobId": "query_job"},
"jobReference": {
"projectId": "job-based-proj",
"jobId": "query_job",
"location": "asia-northeast1",
},
"state": "RUNNING",
"configuration": {"query": {"query": QUERY}},
}
RESOURCE = {"job": QUERY_JOB_RESOURCE}
creds = _make_credentials()
client = self._make_one(self.PROJECT, creds)
conn = client._connection = make_connection(RESOURCE)
job_from_resource = QueryJob.from_api_repr(QUERY_JOB_RESOURCE, client)

job = client.cancel_job(JOB_ID)
job = client.cancel_job(job_from_resource)

self.assertIsInstance(job, QueryJob)
self.assertEqual(job.job_id, JOB_ID)
self.assertEqual(job.project, "job-based-proj")
self.assertEqual(job.location, "asia-northeast1")
self.assertEqual(job.query, QUERY)

conn.api_request.assert_called_once_with(
method="POST",
path="/projects/PROJECT/jobs/query_job/cancel",
query_params={"projection": "full"},
path="/projects/job-based-proj/jobs/query_job/cancel",
query_params={"projection": "full", "location": "asia-northeast1"},
timeout=None,
)

Expand Down

0 comments on commit f75dcdf

Please sign in to comment.