Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add reload argument to *Job.done() functions #341

Merged
merged 5 commits into from Oct 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
67 changes: 36 additions & 31 deletions google/cloud/bigquery/job.py
Expand Up @@ -767,7 +767,7 @@ def _set_future_result(self):
# set, do not call set_result/set_exception again.
# Note: self._result_set is set to True in set_result and
# set_exception, in case those methods are invoked directly.
if self.state != _DONE_STATE or self._result_set:
if not self.done(reload=False) or self._result_set:
return

if self.error_result is not None:
Expand All @@ -776,21 +776,24 @@ def _set_future_result(self):
else:
self.set_result(self)

def done(self, retry=DEFAULT_RETRY, timeout=None):
"""Refresh the job and checks if it is complete.
def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
"""Checks if the job is complete.

Args:
retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
reload (Optional[bool]):
If ``True``, make an API call to refresh the job state of
unfinished jobs before checking. Default ``True``.

Returns:
bool: True if the job is complete, False otherwise.
"""
# Do not refresh is the state is already done, as the job will not
# change once complete.
if self.state != _DONE_STATE:
if self.state != _DONE_STATE and reload:
self.reload(retry=retry, timeout=timeout)
return self.state == _DONE_STATE

Expand Down Expand Up @@ -3073,7 +3076,7 @@ def estimated_bytes_processed(self):
result = int(result)
return result

def done(self, retry=DEFAULT_RETRY, timeout=None):
def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
"""Refresh the job and checks if it is complete.

Args:
Expand All @@ -3082,10 +3085,25 @@ def done(self, retry=DEFAULT_RETRY, timeout=None):
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
reload (Optional[bool]):
If ``True``, make an API call to refresh the job state of
unfinished jobs before checking. Default ``True``.

Returns:
bool: True if the job is complete, False otherwise.
"""
is_done = (
# Only consider a QueryJob complete when we know we have the final
# query results available.
self._query_results is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking: What about DDL statements, where we have no results? Or would this already retain the empty row iterator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DDL statements don't 404 for jobs.getQueryResults, do they? This is the full response object of jobs.getQueryResults, not just the rows.

and self._query_results.complete
and self.state == _DONE_STATE
)
# Do not refresh if the state is already done, as the job will not
# change once complete.
if not reload or is_done:
return is_done

# Since the API to getQueryResults can hang up to the timeout value
# (default of 10 seconds), set the timeout parameter to ensure that
# the timeout from the futures API is respected. See:
Expand All @@ -3103,23 +3121,20 @@ def done(self, retry=DEFAULT_RETRY, timeout=None):
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

# Do not refresh if the state is already done, as the job will not
# change once complete.
if self.state != _DONE_STATE:
self._query_results = self._client._get_query_results(
self.job_id,
retry,
project=self.project,
timeout_ms=timeout_ms,
location=self.location,
timeout=transport_timeout,
)
self._query_results = self._client._get_query_results(
self.job_id,
retry,
project=self.project,
timeout_ms=timeout_ms,
location=self.location,
timeout=transport_timeout,
)

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete:
self.reload(retry=retry, timeout=transport_timeout)
# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete and self.state != _DONE_STATE:
self.reload(retry=retry, timeout=transport_timeout)

return self.state == _DONE_STATE

Expand Down Expand Up @@ -3231,16 +3246,6 @@ def result(
"""
try:
super(QueryJob, self).result(retry=retry, timeout=timeout)

# Return an iterator instead of returning the job.
if not self._query_results:
self._query_results = self._client._get_query_results(
self.job_id,
retry,
project=self.project,
location=self.location,
timeout=timeout,
)
except exceptions.GoogleCloudError as exc:
exc.message += self._format_for_exception(self.query, self.job_id)
exc.query_job = self
Expand Down
101 changes: 90 additions & 11 deletions tests/unit/test_job.py
Expand Up @@ -45,6 +45,8 @@
except (ImportError, AttributeError): # pragma: NO COVER
tqdm = None

import google.cloud.bigquery.query


def _make_credentials():
import google.auth.credentials
Expand Down Expand Up @@ -3942,10 +3944,6 @@ def _make_resource(self, started=False, ended=False):
resource = super(TestQueryJob, self)._make_resource(started, ended)
config = resource["configuration"]["query"]
config["query"] = self.QUERY

if ended:
resource["status"] = {"state": "DONE"}

return resource

def _verifyBooleanResourceProperties(self, job, config):
Expand Down Expand Up @@ -4211,6 +4209,9 @@ def test_done(self):
client = _make_client(project=self.PROJECT)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
{"jobComplete": True, "jobReference": resource["jobReference"]}
)
self.assertTrue(job.done())

def test_done_w_timeout(self):
Expand Down Expand Up @@ -4668,35 +4669,110 @@ def test_result(self):
from google.cloud.bigquery.table import RowIterator

query_resource = {
"jobComplete": False,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
}
query_resource_done = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "2",
}
job_resource = self._make_resource(started=True)
job_resource_done = self._make_resource(started=True, ended=True)
job_resource_done["configuration"]["query"]["destinationTable"] = {
"projectId": "dest-project",
"datasetId": "dest_dataset",
"tableId": "dest_table",
}
tabledata_resource = {
# Explicitly set totalRows to be different from the query response.
# to test update during iteration.
# Explicitly set totalRows to be different from the initial
# response to test update during iteration.
"totalRows": "1",
"pageToken": None,
"rows": [{"f": [{"v": "abc"}]}],
}
connection = _make_connection(query_resource, tabledata_resource)
client = _make_client(self.PROJECT, connection=connection)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
conn = _make_connection(
query_resource, query_resource_done, job_resource_done, tabledata_resource
)
client = _make_client(self.PROJECT, connection=conn)
job = self._get_target_class().from_api_repr(job_resource, client)

result = job.result()

self.assertIsInstance(result, RowIterator)
self.assertEqual(result.total_rows, 2)

rows = list(result)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")
# Test that the total_rows property has changed during iteration, based
# on the response from tabledata.list.
self.assertEqual(result.total_rows, 1)

query_results_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
query_params={"maxResults": 0},
timeout=None,
)
reload_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}",
query_params={},
timeout=None,
)
tabledata_call = mock.call(
method="GET",
path="/projects/dest-project/datasets/dest_dataset/tables/dest_table/data",
query_params={},
timeout=None,
)
conn.api_request.assert_has_calls(
[query_results_call, query_results_call, reload_call, tabledata_call]
)

def test_result_with_done_job_calls_get_query_results(self):
query_resource_done = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "1",
}
job_resource = self._make_resource(started=True, ended=True)
job_resource["configuration"]["query"]["destinationTable"] = {
"projectId": "dest-project",
"datasetId": "dest_dataset",
"tableId": "dest_table",
}
tabledata_resource = {
"totalRows": "1",
"pageToken": None,
"rows": [{"f": [{"v": "abc"}]}],
}
conn = _make_connection(query_resource_done, tabledata_resource)
client = _make_client(self.PROJECT, connection=conn)
job = self._get_target_class().from_api_repr(job_resource, client)

result = job.result()

rows = list(result)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")

query_results_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
query_params={"maxResults": 0},
timeout=None,
)
tabledata_call = mock.call(
method="GET",
path="/projects/dest-project/datasets/dest_dataset/tables/dest_table/data",
query_params={},
timeout=None,
)
conn.api_request.assert_has_calls([query_results_call, tabledata_call])

def test_result_with_max_results(self):
from google.cloud.bigquery.table import RowIterator

Expand Down Expand Up @@ -4938,6 +5014,9 @@ def test_result_error(self):
"errors": [error_result],
"state": "DONE",
}
job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
{"jobComplete": True, "jobReference": job._properties["jobReference"]}
)
job._set_future_result()

with self.assertRaises(exceptions.GoogleCloudError) as exc_info:
Expand Down