From e51fd45fdb0481ac5d59cc0edbfa0750928b2596 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 28 Oct 2020 16:32:02 -0500 Subject: [PATCH] feat: add `reload` argument to `*Job.done()` functions (#341) This enables checking the job status without making an API call. It also fixes an inconsistency in `QueryJob`, where a job can be reported as "done" without having the results of a `getQueryResults` API call. Follow-up to https://github.com/googleapis/python-bigquery/pull/340 --- google/cloud/bigquery/job.py | 67 +++++++------- tests/unit/test_job.py | 101 +++++++++++++++++--- tests/unit/test_magics.py | 173 +++++++++++++++++++---------------- 3 files changed, 218 insertions(+), 123 deletions(-) diff --git a/google/cloud/bigquery/job.py b/google/cloud/bigquery/job.py index 977d7a559..204c5f774 100644 --- a/google/cloud/bigquery/job.py +++ b/google/cloud/bigquery/job.py @@ -767,7 +767,7 @@ def _set_future_result(self): # set, do not call set_result/set_exception again. # Note: self._result_set is set to True in set_result and # set_exception, in case those methods are invoked directly. - if self.state != _DONE_STATE or self._result_set: + if not self.done(reload=False) or self._result_set: return if self.error_result is not None: @@ -776,21 +776,24 @@ def _set_future_result(self): else: self.set_result(self) - def done(self, retry=DEFAULT_RETRY, timeout=None): - """Refresh the job and checks if it is complete. + def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): + """Checks if the job is complete. Args: retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. + reload (Optional[bool]): + If ``True``, make an API call to refresh the job state of + unfinished jobs before checking. Default ``True``. Returns: bool: True if the job is complete, False otherwise. """ # Do not refresh is the state is already done, as the job will not # change once complete. - if self.state != _DONE_STATE: + if self.state != _DONE_STATE and reload: self.reload(retry=retry, timeout=timeout) return self.state == _DONE_STATE @@ -3073,7 +3076,7 @@ def estimated_bytes_processed(self): result = int(result) return result - def done(self, retry=DEFAULT_RETRY, timeout=None): + def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): """Refresh the job and checks if it is complete. Args: @@ -3082,10 +3085,25 @@ def done(self, retry=DEFAULT_RETRY, timeout=None): timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. + reload (Optional[bool]): + If ``True``, make an API call to refresh the job state of + unfinished jobs before checking. Default ``True``. Returns: bool: True if the job is complete, False otherwise. """ + is_done = ( + # Only consider a QueryJob complete when we know we have the final + # query results available. + self._query_results is not None + and self._query_results.complete + and self.state == _DONE_STATE + ) + # Do not refresh if the state is already done, as the job will not + # change once complete. + if not reload or is_done: + return is_done + # Since the API to getQueryResults can hang up to the timeout value # (default of 10 seconds), set the timeout parameter to ensure that # the timeout from the futures API is respected. See: @@ -3103,23 +3121,20 @@ def done(self, retry=DEFAULT_RETRY, timeout=None): # stored in _blocking_poll() in the process of polling for job completion. transport_timeout = timeout if timeout is not None else self._transport_timeout - # Do not refresh if the state is already done, as the job will not - # change once complete. - if self.state != _DONE_STATE: - self._query_results = self._client._get_query_results( - self.job_id, - retry, - project=self.project, - timeout_ms=timeout_ms, - location=self.location, - timeout=transport_timeout, - ) + self._query_results = self._client._get_query_results( + self.job_id, + retry, + project=self.project, + timeout_ms=timeout_ms, + location=self.location, + timeout=transport_timeout, + ) - # Only reload the job once we know the query is complete. - # This will ensure that fields such as the destination table are - # correctly populated. - if self._query_results.complete: - self.reload(retry=retry, timeout=transport_timeout) + # Only reload the job once we know the query is complete. + # This will ensure that fields such as the destination table are + # correctly populated. + if self._query_results.complete and self.state != _DONE_STATE: + self.reload(retry=retry, timeout=transport_timeout) return self.state == _DONE_STATE @@ -3231,16 +3246,6 @@ def result( """ try: super(QueryJob, self).result(retry=retry, timeout=timeout) - - # Return an iterator instead of returning the job. - if not self._query_results: - self._query_results = self._client._get_query_results( - self.job_id, - retry, - project=self.project, - location=self.location, - timeout=timeout, - ) except exceptions.GoogleCloudError as exc: exc.message += self._format_for_exception(self.query, self.job_id) exc.query_job = self diff --git a/tests/unit/test_job.py b/tests/unit/test_job.py index f577b08bd..2d1e8fec8 100644 --- a/tests/unit/test_job.py +++ b/tests/unit/test_job.py @@ -45,6 +45,8 @@ except (ImportError, AttributeError): # pragma: NO COVER tqdm = None +import google.cloud.bigquery.query + def _make_credentials(): import google.auth.credentials @@ -3942,10 +3944,6 @@ def _make_resource(self, started=False, ended=False): resource = super(TestQueryJob, self)._make_resource(started, ended) config = resource["configuration"]["query"] config["query"] = self.QUERY - - if ended: - resource["status"] = {"state": "DONE"} - return resource def _verifyBooleanResourceProperties(self, job, config): @@ -4211,6 +4209,9 @@ def test_done(self): client = _make_client(project=self.PROJECT) resource = self._make_resource(ended=True) job = self._get_target_class().from_api_repr(resource, client) + job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr( + {"jobComplete": True, "jobReference": resource["jobReference"]} + ) self.assertTrue(job.done()) def test_done_w_timeout(self): @@ -4668,28 +4669,39 @@ def test_result(self): from google.cloud.bigquery.table import RowIterator query_resource = { + "jobComplete": False, + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + } + query_resource_done = { "jobComplete": True, "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, "totalRows": "2", } + job_resource = self._make_resource(started=True) + job_resource_done = self._make_resource(started=True, ended=True) + job_resource_done["configuration"]["query"]["destinationTable"] = { + "projectId": "dest-project", + "datasetId": "dest_dataset", + "tableId": "dest_table", + } tabledata_resource = { - # Explicitly set totalRows to be different from the query response. - # to test update during iteration. + # Explicitly set totalRows to be different from the initial + # response to test update during iteration. "totalRows": "1", "pageToken": None, "rows": [{"f": [{"v": "abc"}]}], } - connection = _make_connection(query_resource, tabledata_resource) - client = _make_client(self.PROJECT, connection=connection) - resource = self._make_resource(ended=True) - job = self._get_target_class().from_api_repr(resource, client) + conn = _make_connection( + query_resource, query_resource_done, job_resource_done, tabledata_resource + ) + client = _make_client(self.PROJECT, connection=conn) + job = self._get_target_class().from_api_repr(job_resource, client) result = job.result() self.assertIsInstance(result, RowIterator) self.assertEqual(result.total_rows, 2) - rows = list(result) self.assertEqual(len(rows), 1) self.assertEqual(rows[0].col1, "abc") @@ -4697,6 +4709,70 @@ def test_result(self): # on the response from tabledata.list. self.assertEqual(result.total_rows, 1) + query_results_call = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}", + query_params={"maxResults": 0}, + timeout=None, + ) + reload_call = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}", + query_params={}, + timeout=None, + ) + tabledata_call = mock.call( + method="GET", + path="/projects/dest-project/datasets/dest_dataset/tables/dest_table/data", + query_params={}, + timeout=None, + ) + conn.api_request.assert_has_calls( + [query_results_call, query_results_call, reload_call, tabledata_call] + ) + + def test_result_with_done_job_calls_get_query_results(self): + query_resource_done = { + "jobComplete": True, + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, + "totalRows": "1", + } + job_resource = self._make_resource(started=True, ended=True) + job_resource["configuration"]["query"]["destinationTable"] = { + "projectId": "dest-project", + "datasetId": "dest_dataset", + "tableId": "dest_table", + } + tabledata_resource = { + "totalRows": "1", + "pageToken": None, + "rows": [{"f": [{"v": "abc"}]}], + } + conn = _make_connection(query_resource_done, tabledata_resource) + client = _make_client(self.PROJECT, connection=conn) + job = self._get_target_class().from_api_repr(job_resource, client) + + result = job.result() + + rows = list(result) + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0].col1, "abc") + + query_results_call = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}", + query_params={"maxResults": 0}, + timeout=None, + ) + tabledata_call = mock.call( + method="GET", + path="/projects/dest-project/datasets/dest_dataset/tables/dest_table/data", + query_params={}, + timeout=None, + ) + conn.api_request.assert_has_calls([query_results_call, tabledata_call]) + def test_result_with_max_results(self): from google.cloud.bigquery.table import RowIterator @@ -4938,6 +5014,9 @@ def test_result_error(self): "errors": [error_result], "state": "DONE", } + job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr( + {"jobComplete": True, "jobReference": job._properties["jobReference"]} + ) job._set_future_result() with self.assertRaises(exceptions.GoogleCloudError) as exc_info: diff --git a/tests/unit/test_magics.py b/tests/unit/test_magics.py index 30ca4d70c..b2877845a 100644 --- a/tests/unit/test_magics.py +++ b/tests/unit/test_magics.py @@ -19,7 +19,6 @@ import mock import pytest -import six try: import pandas @@ -101,27 +100,38 @@ def fail_if(name, globals, locals, fromlist, level): return maybe_fail_import(predicate=fail_if) -JOB_REFERENCE_RESOURCE = {"projectId": "its-a-project-eh", "jobId": "some-random-id"} +PROJECT_ID = "its-a-project-eh" +JOB_ID = "some-random-id" +JOB_REFERENCE_RESOURCE = {"projectId": PROJECT_ID, "jobId": JOB_ID} +DATASET_ID = "dest_dataset" +TABLE_ID = "dest_table" TABLE_REFERENCE_RESOURCE = { - "projectId": "its-a-project-eh", - "datasetId": "ds", - "tableId": "persons", + "projectId": PROJECT_ID, + "datasetId": DATASET_ID, + "tableId": TABLE_ID, } +QUERY_STRING = "SELECT 42 AS the_answer FROM `life.the_universe.and_everything`;" QUERY_RESOURCE = { "jobReference": JOB_REFERENCE_RESOURCE, "configuration": { "query": { "destinationTable": TABLE_REFERENCE_RESOURCE, - "query": "SELECT 42 FROM `life.the_universe.and_everything`;", + "query": QUERY_STRING, "queryParameters": [], "useLegacySql": False, } }, "status": {"state": "DONE"}, } +QUERY_RESULTS_RESOURCE = { + "jobReference": JOB_REFERENCE_RESOURCE, + "totalRows": 1, + "jobComplete": True, + "schema": {"fields": [{"name": "the_answer", "type": "INTEGER"}]}, +} -def test_context_credentials_auto_set_w_application_default_credentials(): +def test_context_with_default_credentials(): """When Application Default Credentials are set, the context credentials will be created the first time it is called """ @@ -142,6 +152,50 @@ def test_context_credentials_auto_set_w_application_default_credentials(): assert default_mock.call_count == 2 +@pytest.mark.usefixtures("ipython_interactive") +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +def test_context_with_default_connection(): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + magics.context._credentials = None + magics.context._project = None + magics.context._connection = None + + default_credentials = mock.create_autospec( + google.auth.credentials.Credentials, instance=True + ) + credentials_patch = mock.patch( + "google.auth.default", return_value=(default_credentials, "project-from-env") + ) + default_conn = make_connection(QUERY_RESOURCE, QUERY_RESULTS_RESOURCE) + conn_patch = mock.patch("google.cloud.bigquery.client.Connection", autospec=True) + list_rows_patch = mock.patch( + "google.cloud.bigquery.client.Client.list_rows", + return_value=google.cloud.bigquery.table._EmptyRowIterator(), + ) + + with conn_patch as conn, credentials_patch, list_rows_patch as list_rows: + conn.return_value = default_conn + ip.run_cell_magic("bigquery", "", QUERY_STRING) + + # Check that query actually starts the job. + conn.assert_called() + list_rows.assert_called() + begin_call = mock.call( + method="POST", + path="/projects/project-from-env/jobs", + data=mock.ANY, + timeout=None, + ) + query_results_call = mock.call( + method="GET", + path=f"/projects/{PROJECT_ID}/queries/{JOB_ID}", + query_params=mock.ANY, + timeout=mock.ANY, + ) + default_conn.api_request.assert_has_calls([begin_call, query_results_call]) + + def test_context_credentials_and_project_can_be_set_explicitly(): project1 = "one-project-55564" project2 = "other-project-52569" @@ -163,93 +217,47 @@ def test_context_credentials_and_project_can_be_set_explicitly(): @pytest.mark.usefixtures("ipython_interactive") @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -def test_context_connection_can_be_overriden(): +def test_context_with_custom_connection(): ip = IPython.get_ipython() ip.extension_manager.load_extension("google.cloud.bigquery") magics.context._project = None magics.context._credentials = None - - credentials_mock = mock.create_autospec( - google.auth.credentials.Credentials, instance=True - ) - project = "project-123" - default_patch = mock.patch( - "google.auth.default", return_value=(credentials_mock, project) - ) - job_reference = copy.deepcopy(JOB_REFERENCE_RESOURCE) - job_reference["projectId"] = project - - query = "select * from persons" - resource = copy.deepcopy(QUERY_RESOURCE) - resource["jobReference"] = job_reference - resource["configuration"]["query"]["query"] = query - data = {"jobReference": job_reference, "totalRows": 0, "rows": []} - - conn = magics.context._connection = make_connection(resource, data) - list_rows_patch = mock.patch( - "google.cloud.bigquery.client.Client.list_rows", - return_value=google.cloud.bigquery.table._EmptyRowIterator(), + context_conn = magics.context._connection = make_connection( + QUERY_RESOURCE, QUERY_RESULTS_RESOURCE ) - with list_rows_patch as list_rows, default_patch: - ip.run_cell_magic("bigquery", "", query) - # Check that query actually starts the job. - list_rows.assert_called() - assert len(conn.api_request.call_args_list) == 2 - _, req = conn.api_request.call_args_list[0] - assert req["method"] == "POST" - assert req["path"] == "/projects/{}/jobs".format(project) - sent = req["data"] - assert isinstance(sent["jobReference"]["jobId"], six.string_types) - sent_config = sent["configuration"]["query"] - assert sent_config["query"] == query - - -@pytest.mark.usefixtures("ipython_interactive") -@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -def test_context_no_connection(): - ip = IPython.get_ipython() - ip.extension_manager.load_extension("google.cloud.bigquery") - magics.context._project = None - magics.context._credentials = None - magics.context._connection = None - - credentials_mock = mock.create_autospec( + default_credentials = mock.create_autospec( google.auth.credentials.Credentials, instance=True ) - project = "project-123" - default_patch = mock.patch( - "google.auth.default", return_value=(credentials_mock, project) + credentials_patch = mock.patch( + "google.auth.default", return_value=(default_credentials, "project-from-env") ) - job_reference = copy.deepcopy(JOB_REFERENCE_RESOURCE) - job_reference["projectId"] = project - - query = "select * from persons" - resource = copy.deepcopy(QUERY_RESOURCE) - resource["jobReference"] = job_reference - resource["configuration"]["query"]["query"] = query - data = {"jobReference": job_reference, "totalRows": 0, "rows": []} - - conn_mock = make_connection(resource, data, data, data) + default_conn = make_connection() conn_patch = mock.patch("google.cloud.bigquery.client.Connection", autospec=True) list_rows_patch = mock.patch( "google.cloud.bigquery.client.Client.list_rows", return_value=google.cloud.bigquery.table._EmptyRowIterator(), ) - with conn_patch as conn, list_rows_patch as list_rows, default_patch: - conn.return_value = conn_mock - ip.run_cell_magic("bigquery", "", query) - # Check that query actually starts the job. + with conn_patch as conn, credentials_patch, list_rows_patch as list_rows: + conn.return_value = default_conn + ip.run_cell_magic("bigquery", "", QUERY_STRING) + list_rows.assert_called() - assert len(conn_mock.api_request.call_args_list) == 2 - _, req = conn_mock.api_request.call_args_list[0] - assert req["method"] == "POST" - assert req["path"] == "/projects/{}/jobs".format(project) - sent = req["data"] - assert isinstance(sent["jobReference"]["jobId"], six.string_types) - sent_config = sent["configuration"]["query"] - assert sent_config["query"] == query + default_conn.api_request.assert_not_called() + begin_call = mock.call( + method="POST", + path="/projects/project-from-env/jobs", + data=mock.ANY, + timeout=None, + ) + query_results_call = mock.call( + method="GET", + path=f"/projects/{PROJECT_ID}/queries/{JOB_ID}", + query_params=mock.ANY, + timeout=mock.ANY, + ) + context_conn.api_request.assert_has_calls([begin_call, query_results_call]) def test__run_query(): @@ -1060,6 +1068,7 @@ def test_bigquery_magic_w_maximum_bytes_billed_overrides_context(param_value, ex resource = copy.deepcopy(QUERY_RESOURCE) resource["jobReference"] = job_reference resource["configuration"]["query"]["query"] = query + query_results = {"jobReference": job_reference, "totalRows": 0, "jobComplete": True} data = {"jobReference": job_reference, "totalRows": 0, "rows": []} credentials_mock = mock.create_autospec( google.auth.credentials.Credentials, instance=True @@ -1067,7 +1076,7 @@ def test_bigquery_magic_w_maximum_bytes_billed_overrides_context(param_value, ex default_patch = mock.patch( "google.auth.default", return_value=(credentials_mock, "general-project") ) - conn = magics.context._connection = make_connection(resource, data) + conn = magics.context._connection = make_connection(resource, query_results, data) list_rows_patch = mock.patch( "google.cloud.bigquery.client.Client.list_rows", return_value=google.cloud.bigquery.table._EmptyRowIterator(), @@ -1098,6 +1107,7 @@ def test_bigquery_magic_w_maximum_bytes_billed_w_context_inplace(): resource = copy.deepcopy(QUERY_RESOURCE) resource["jobReference"] = job_reference resource["configuration"]["query"]["query"] = query + query_results = {"jobReference": job_reference, "totalRows": 0, "jobComplete": True} data = {"jobReference": job_reference, "totalRows": 0, "rows": []} credentials_mock = mock.create_autospec( google.auth.credentials.Credentials, instance=True @@ -1105,7 +1115,7 @@ def test_bigquery_magic_w_maximum_bytes_billed_w_context_inplace(): default_patch = mock.patch( "google.auth.default", return_value=(credentials_mock, "general-project") ) - conn = magics.context._connection = make_connection(resource, data) + conn = magics.context._connection = make_connection(resource, query_results, data) list_rows_patch = mock.patch( "google.cloud.bigquery.client.Client.list_rows", return_value=google.cloud.bigquery.table._EmptyRowIterator(), @@ -1136,6 +1146,7 @@ def test_bigquery_magic_w_maximum_bytes_billed_w_context_setter(): resource = copy.deepcopy(QUERY_RESOURCE) resource["jobReference"] = job_reference resource["configuration"]["query"]["query"] = query + query_results = {"jobReference": job_reference, "totalRows": 0, "jobComplete": True} data = {"jobReference": job_reference, "totalRows": 0, "rows": []} credentials_mock = mock.create_autospec( google.auth.credentials.Credentials, instance=True @@ -1143,7 +1154,7 @@ def test_bigquery_magic_w_maximum_bytes_billed_w_context_setter(): default_patch = mock.patch( "google.auth.default", return_value=(credentials_mock, "general-project") ) - conn = magics.context._connection = make_connection(resource, data) + conn = magics.context._connection = make_connection(resource, query_results, data) list_rows_patch = mock.patch( "google.cloud.bigquery.client.Client.list_rows", return_value=google.cloud.bigquery.table._EmptyRowIterator(),