From f8887d3625a718aab9c8849ee6d1ef9b0293c14e Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 24 Nov 2020 13:29:49 -0600 Subject: [PATCH 1/6] perf: dont fetch rows when waiting for query to finish When there are large result sets, fetching rows while waiting for the query to finish can cause the API to hang indefinitely. (This may be due to an interaction between connection timeout and API timeout.) This reverts commit 86f6a516d1c7c5dc204ab085ea2578793e6561ff (#374). --- google/cloud/bigquery/client.py | 4 +- google/cloud/bigquery/job/query.py | 85 +++++++++++------------------ google/cloud/bigquery/table.py | 11 +--- tests/unit/job/test_query.py | 55 +++++-------------- tests/unit/job/test_query_pandas.py | 16 ++++-- tests/unit/test_client.py | 4 +- 6 files changed, 60 insertions(+), 115 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index c67ef54e0..cd1474336 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -1534,7 +1534,7 @@ def _get_query_results( A new ``_QueryResults`` instance. """ - extra_params = {} + extra_params = {"maxResults": 0} if project is None: project = self.project @@ -3187,7 +3187,6 @@ def _list_rows_from_query_results( page_size=None, retry=DEFAULT_RETRY, timeout=None, - first_page_response=None, ): """List the rows of a completed query. See @@ -3248,7 +3247,6 @@ def _list_rows_from_query_results( table=destination, extra_params=params, total_rows=total_rows, - first_page_response=first_page_response, ) return row_iterator diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 7a1a74954..9931ffdf9 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -991,22 +991,48 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): Returns: bool: True if the job is complete, False otherwise. """ + is_done = ( + # Only consider a QueryJob complete when we know we have the final + # query results available. + self._query_results is not None + and self._query_results.complete + and self.state == _DONE_STATE + ) # Do not refresh if the state is already done, as the job will not # change once complete. - is_done = self.state == _DONE_STATE if not reload or is_done: return is_done - self._reload_query_results(retry=retry, timeout=timeout) + # Since the API to getQueryResults can hang up to the timeout value + # (default of 10 seconds), set the timeout parameter to ensure that + # the timeout from the futures API is respected. See: + # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135 + timeout_ms = None + if self._done_timeout is not None: + # Subtract a buffer for context switching, network latency, etc. + api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS + api_timeout = max(min(api_timeout, 10), 0) + self._done_timeout -= api_timeout + self._done_timeout = max(0, self._done_timeout) + timeout_ms = int(api_timeout * 1000) # If an explicit timeout is not given, fall back to the transport timeout # stored in _blocking_poll() in the process of polling for job completion. transport_timeout = timeout if timeout is not None else self._transport_timeout + self._query_results = self._client._get_query_results( + self.job_id, + retry, + project=self.project, + timeout_ms=timeout_ms, + location=self.location, + timeout=transport_timeout, + ) + # Only reload the job once we know the query is complete. # This will ensure that fields such as the destination table are # correctly populated. - if self._query_results.complete: + if self._query_results.complete and self.state != _DONE_STATE: self.reload(retry=retry, timeout=transport_timeout) return self.state == _DONE_STATE @@ -1073,45 +1099,6 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): exc.query_job = self raise - def _reload_query_results(self, retry=DEFAULT_RETRY, timeout=None): - """Refresh the cached query results. - - Args: - retry (Optional[google.api_core.retry.Retry]): - How to retry the call that retrieves query results. - timeout (Optional[float]): - The number of seconds to wait for the underlying HTTP transport - before using ``retry``. - """ - if self._query_results and self._query_results.complete: - return - - # Since the API to getQueryResults can hang up to the timeout value - # (default of 10 seconds), set the timeout parameter to ensure that - # the timeout from the futures API is respected. See: - # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135 - timeout_ms = None - if self._done_timeout is not None: - # Subtract a buffer for context switching, network latency, etc. - api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS - api_timeout = max(min(api_timeout, 10), 0) - self._done_timeout -= api_timeout - self._done_timeout = max(0, self._done_timeout) - timeout_ms = int(api_timeout * 1000) - - # If an explicit timeout is not given, fall back to the transport timeout - # stored in _blocking_poll() in the process of polling for job completion. - transport_timeout = timeout if timeout is not None else self._transport_timeout - - self._query_results = self._client._get_query_results( - self.job_id, - retry, - project=self.project, - timeout_ms=timeout_ms, - location=self.location, - timeout=transport_timeout, - ) - def result( self, page_size=None, @@ -1158,11 +1145,6 @@ def result( """ try: super(QueryJob, self).result(retry=retry, timeout=timeout) - - # Since the job could already be "done" (e.g. got a finished job - # via client.get_job), the superclass call to done() might not - # set the self._query_results cache. - self._reload_query_results(retry=retry, timeout=timeout) except exceptions.GoogleAPICallError as exc: exc.message += self._format_for_exception(self.query, self.job_id) exc.query_job = self @@ -1177,14 +1159,10 @@ def result( if self._query_results.total_rows is None: return _EmptyRowIterator() - first_page_response = None - if max_results is None and page_size is None and start_index is None: - first_page_response = self._query_results._properties - rows = self._client._list_rows_from_query_results( - self.job_id, + self._query_results.job_id, self.location, - self.project, + self._query_results.project, self._query_results.schema, total_rows=self._query_results.total_rows, destination=self.destination, @@ -1193,7 +1171,6 @@ def result( start_index=start_index, retry=retry, timeout=timeout, - first_page_response=first_page_response, ) rows._preserve_order = _contains_order_by(self.query) return rows diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 4bfedd758..80d1fd13b 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -1301,9 +1301,7 @@ class RowIterator(HTTPIterator): A subset of columns to select from this table. total_rows (Optional[int]): Total number of rows in the table. - first_page_response (Optional[dict]): - API response for the first page of results. These are returned when - the first page is requested. + """ def __init__( @@ -1319,7 +1317,6 @@ def __init__( table=None, selected_fields=None, total_rows=None, - first_page_response=None, ): super(RowIterator, self).__init__( client, @@ -1342,7 +1339,6 @@ def __init__( self._selected_fields = selected_fields self._table = table self._total_rows = total_rows - self._first_page_response = first_page_response def _is_completely_cached(self): """Check if all results are completely cached. @@ -1386,11 +1382,6 @@ def _get_next_page_response(self): Dict[str, object]: The parsed JSON response of the next page's contents. """ - if self._first_page_response: - response = self._first_page_response - self._first_page_response = None - return response - params = self._get_query_params() if self._page_size is not None: if self.page_number and "startIndex" in params: diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index 41e31f469..daaf2e557 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -787,9 +787,7 @@ def test_result(self): "location": "EU", }, "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, - "totalRows": "3", - "rows": [{"f": [{"v": "abc"}]}], - "pageToken": "next-page", + "totalRows": "2", } job_resource = self._make_resource(started=True, location="EU") job_resource_done = self._make_resource(started=True, ended=True, location="EU") @@ -801,9 +799,9 @@ def test_result(self): query_page_resource = { # Explicitly set totalRows to be different from the initial # response to test update during iteration. - "totalRows": "2", + "totalRows": "1", "pageToken": None, - "rows": [{"f": [{"v": "def"}]}], + "rows": [{"f": [{"v": "abc"}]}], } conn = _make_connection( query_resource, query_resource_done, job_resource_done, query_page_resource @@ -814,20 +812,19 @@ def test_result(self): result = job.result() self.assertIsInstance(result, RowIterator) - self.assertEqual(result.total_rows, 3) + self.assertEqual(result.total_rows, 2) rows = list(result) - self.assertEqual(len(rows), 2) + self.assertEqual(len(rows), 1) self.assertEqual(rows[0].col1, "abc") - self.assertEqual(rows[1].col1, "def") # Test that the total_rows property has changed during iteration, based # on the response from tabledata.list. - self.assertEqual(result.total_rows, 2) + self.assertEqual(result.total_rows, 1) query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}" query_results_call = mock.call( method="GET", path=query_results_path, - query_params={"location": "EU"}, + query_params={"maxResults": 0, "location": "EU"}, timeout=None, ) reload_call = mock.call( @@ -842,7 +839,6 @@ def test_result(self): query_params={ "fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS, "location": "EU", - "pageToken": "next-page", }, timeout=None, ) @@ -855,9 +851,7 @@ def test_result_with_done_job_calls_get_query_results(self): "jobComplete": True, "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, - "totalRows": "2", - "rows": [{"f": [{"v": "abc"}]}], - "pageToken": "next-page", + "totalRows": "1", } job_resource = self._make_resource(started=True, ended=True, location="EU") job_resource["configuration"]["query"]["destinationTable"] = { @@ -866,9 +860,9 @@ def test_result_with_done_job_calls_get_query_results(self): "tableId": "dest_table", } results_page_resource = { - "totalRows": "2", + "totalRows": "1", "pageToken": None, - "rows": [{"f": [{"v": "def"}]}], + "rows": [{"f": [{"v": "abc"}]}], } conn = _make_connection(query_resource_done, results_page_resource) client = _make_client(self.PROJECT, connection=conn) @@ -877,15 +871,14 @@ def test_result_with_done_job_calls_get_query_results(self): result = job.result() rows = list(result) - self.assertEqual(len(rows), 2) + self.assertEqual(len(rows), 1) self.assertEqual(rows[0].col1, "abc") - self.assertEqual(rows[1].col1, "def") query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}" query_results_call = mock.call( method="GET", path=query_results_path, - query_params={"location": "EU"}, + query_params={"maxResults": 0, "location": "EU"}, timeout=None, ) query_results_page_call = mock.call( @@ -894,7 +887,6 @@ def test_result_with_done_job_calls_get_query_results(self): query_params={ "fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS, "location": "EU", - "pageToken": "next-page", }, timeout=None, ) @@ -908,12 +900,6 @@ def test_result_with_max_results(self): "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, "totalRows": "5", - # These rows are discarded because max_results is set. - "rows": [ - {"f": [{"v": "xyz"}]}, - {"f": [{"v": "uvw"}]}, - {"f": [{"v": "rst"}]}, - ], } query_page_resource = { "totalRows": "5", @@ -939,7 +925,6 @@ def test_result_with_max_results(self): rows = list(result) self.assertEqual(len(rows), 3) - self.assertEqual(rows[0].col1, "abc") self.assertEqual(len(connection.api_request.call_args_list), 2) query_page_request = connection.api_request.call_args_list[1] self.assertEqual( @@ -994,7 +979,7 @@ def test_result_w_retry(self): query_results_call = mock.call( method="GET", path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}", - query_params={"location": "asia-northeast1"}, + query_params={"maxResults": 0, "location": "asia-northeast1"}, timeout=None, ) reload_call = mock.call( @@ -1094,12 +1079,6 @@ def test_result_w_page_size(self): "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, "totalRows": "4", - # These rows are discarded because page_size is set. - "rows": [ - {"f": [{"v": "xyz"}]}, - {"f": [{"v": "uvw"}]}, - {"f": [{"v": "rst"}]}, - ], } job_resource = self._make_resource(started=True, ended=True, location="US") q_config = job_resource["configuration"]["query"] @@ -1130,7 +1109,6 @@ def test_result_w_page_size(self): # Assert actual_rows = list(result) self.assertEqual(len(actual_rows), 4) - self.assertEqual(actual_rows[0].col1, "row1") query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}" query_page_1_call = mock.call( @@ -1164,12 +1142,6 @@ def test_result_with_start_index(self): "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, "totalRows": "5", - # These rows are discarded because start_index is set. - "rows": [ - {"f": [{"v": "xyz"}]}, - {"f": [{"v": "uvw"}]}, - {"f": [{"v": "rst"}]}, - ], } tabledata_resource = { "totalRows": "5", @@ -1196,7 +1168,6 @@ def test_result_with_start_index(self): rows = list(result) self.assertEqual(len(rows), 4) - self.assertEqual(rows[0].col1, "abc") self.assertEqual(len(connection.api_request.call_args_list), 2) tabledata_list_request = connection.api_request.call_args_list[1] self.assertEqual( diff --git a/tests/unit/job/test_query_pandas.py b/tests/unit/job/test_query_pandas.py index f9d823eb0..43651887f 100644 --- a/tests/unit/job/test_query_pandas.py +++ b/tests/unit/job/test_query_pandas.py @@ -172,6 +172,8 @@ def test_to_arrow(method_kwargs): }, ] }, + } + tabledata_resource = { "rows": [ { "f": [ @@ -185,11 +187,13 @@ def test_to_arrow(method_kwargs): {"v": {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}}, ] }, - ], + ] } done_resource = copy.deepcopy(begun_resource) done_resource["status"] = {"state": "DONE"} - connection = _make_connection(begun_resource, query_resource, done_resource) + connection = _make_connection( + begun_resource, query_resource, done_resource, tabledata_resource + ) client = _make_client(connection=connection) job = target_class.from_api_repr(begun_resource, client) @@ -398,16 +402,20 @@ def test_to_dataframe(method_kwargs): {"name": "age", "type": "INTEGER", "mode": "NULLABLE"}, ] }, + } + tabledata_resource = { "rows": [ {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, - ], + ] } done_resource = copy.deepcopy(begun_resource) done_resource["status"] = {"state": "DONE"} - connection = _make_connection(begun_resource, query_resource, done_resource) + connection = _make_connection( + begun_resource, query_resource, done_resource, tabledata_resource + ) client = _make_client(connection=connection) job = target_class.from_api_repr(begun_resource, client) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 4fba1150c..c4bdea2f8 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -319,7 +319,7 @@ def test__get_query_results_miss_w_explicit_project_and_timeout(self): conn.api_request.assert_called_once_with( method="GET", path=path, - query_params={"timeoutMs": 500, "location": self.LOCATION}, + query_params={"maxResults": 0, "timeoutMs": 500, "location": self.LOCATION}, timeout=42, ) @@ -336,7 +336,7 @@ def test__get_query_results_miss_w_client_location(self): conn.api_request.assert_called_once_with( method="GET", path="/projects/PROJECT/queries/nothere", - query_params={"location": self.LOCATION}, + query_params={"maxResults": 0, "location": self.LOCATION}, timeout=None, ) From 5d19af2350bb2787e8ec7eb41e7e8184f5d7c080 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 24 Nov 2020 13:39:17 -0600 Subject: [PATCH 2/6] dont revert self._reload_query_results refactoring --- google/cloud/bigquery/job/query.py | 79 ++++++++++++++++++------------ 1 file changed, 48 insertions(+), 31 deletions(-) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 9931ffdf9..89e68550c 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -991,48 +991,22 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): Returns: bool: True if the job is complete, False otherwise. """ - is_done = ( - # Only consider a QueryJob complete when we know we have the final - # query results available. - self._query_results is not None - and self._query_results.complete - and self.state == _DONE_STATE - ) # Do not refresh if the state is already done, as the job will not # change once complete. + is_done = self.state == _DONE_STATE if not reload or is_done: return is_done - # Since the API to getQueryResults can hang up to the timeout value - # (default of 10 seconds), set the timeout parameter to ensure that - # the timeout from the futures API is respected. See: - # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135 - timeout_ms = None - if self._done_timeout is not None: - # Subtract a buffer for context switching, network latency, etc. - api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS - api_timeout = max(min(api_timeout, 10), 0) - self._done_timeout -= api_timeout - self._done_timeout = max(0, self._done_timeout) - timeout_ms = int(api_timeout * 1000) + self._reload_query_results(retry=retry, timeout=timeout) # If an explicit timeout is not given, fall back to the transport timeout # stored in _blocking_poll() in the process of polling for job completion. transport_timeout = timeout if timeout is not None else self._transport_timeout - self._query_results = self._client._get_query_results( - self.job_id, - retry, - project=self.project, - timeout_ms=timeout_ms, - location=self.location, - timeout=transport_timeout, - ) - # Only reload the job once we know the query is complete. # This will ensure that fields such as the destination table are # correctly populated. - if self._query_results.complete and self.state != _DONE_STATE: + if self._query_results.complete: self.reload(retry=retry, timeout=transport_timeout) return self.state == _DONE_STATE @@ -1099,6 +1073,44 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): exc.query_job = self raise + def _reload_query_results(self, retry=DEFAULT_RETRY, timeout=None): + """Refresh the cached query results. + Args: + retry (Optional[google.api_core.retry.Retry]): + How to retry the call that retrieves query results. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + """ + if self._query_results and self._query_results.complete: + return + + # Since the API to getQueryResults can hang up to the timeout value + # (default of 10 seconds), set the timeout parameter to ensure that + # the timeout from the futures API is respected. See: + # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135 + timeout_ms = None + if self._done_timeout is not None: + # Subtract a buffer for context switching, network latency, etc. + api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS + api_timeout = max(min(api_timeout, 10), 0) + self._done_timeout -= api_timeout + self._done_timeout = max(0, self._done_timeout) + timeout_ms = int(api_timeout * 1000) + + # If an explicit timeout is not given, fall back to the transport timeout + # stored in _blocking_poll() in the process of polling for job completion. + transport_timeout = timeout if timeout is not None else self._transport_timeout + + self._query_results = self._client._get_query_results( + self.job_id, + retry, + project=self.project, + timeout_ms=timeout_ms, + location=self.location, + timeout=transport_timeout, + ) + def result( self, page_size=None, @@ -1145,6 +1157,11 @@ def result( """ try: super(QueryJob, self).result(retry=retry, timeout=timeout) + + # Since the job could already be "done" (e.g. got a finished job + # via client.get_job), the superclass call to done() might not + # set the self._query_results cache. + self._reload_query_results(retry=retry, timeout=timeout) except exceptions.GoogleAPICallError as exc: exc.message += self._format_for_exception(self.query, self.job_id) exc.query_job = self @@ -1160,9 +1177,9 @@ def result( return _EmptyRowIterator() rows = self._client._list_rows_from_query_results( - self._query_results.job_id, + self.job_id, self.location, - self._query_results.project, + self.project, self._query_results.schema, total_rows=self._query_results.total_rows, destination=self.destination, From 1727bf653300c5340afd8e1bbd1652fd8543c06d Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 24 Nov 2020 13:43:45 -0600 Subject: [PATCH 3/6] revert changes to table --- google/cloud/bigquery/job/query.py | 1 + google/cloud/bigquery/table.py | 11 ++++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 89e68550c..9e8908613 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1075,6 +1075,7 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None): def _reload_query_results(self, retry=DEFAULT_RETRY, timeout=None): """Refresh the cached query results. + Args: retry (Optional[google.api_core.retry.Retry]): How to retry the call that retrieves query results. diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 80d1fd13b..4bfedd758 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -1301,7 +1301,9 @@ class RowIterator(HTTPIterator): A subset of columns to select from this table. total_rows (Optional[int]): Total number of rows in the table. - + first_page_response (Optional[dict]): + API response for the first page of results. These are returned when + the first page is requested. """ def __init__( @@ -1317,6 +1319,7 @@ def __init__( table=None, selected_fields=None, total_rows=None, + first_page_response=None, ): super(RowIterator, self).__init__( client, @@ -1339,6 +1342,7 @@ def __init__( self._selected_fields = selected_fields self._table = table self._total_rows = total_rows + self._first_page_response = first_page_response def _is_completely_cached(self): """Check if all results are completely cached. @@ -1382,6 +1386,11 @@ def _get_next_page_response(self): Dict[str, object]: The parsed JSON response of the next page's contents. """ + if self._first_page_response: + response = self._first_page_response + self._first_page_response = None + return response + params = self._get_query_params() if self._page_size is not None: if self.page_number and "startIndex" in params: From b838fb23c647e29cbed447260aa725d8086b1d8c Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 24 Nov 2020 13:56:28 -0600 Subject: [PATCH 4/6] revert all changes to tests --- tests/unit/job/test_query_pandas.py | 289 +--------------------------- 1 file changed, 4 insertions(+), 285 deletions(-) diff --git a/tests/unit/job/test_query_pandas.py b/tests/unit/job/test_query_pandas.py index 43651887f..37f4a6dec 100644 --- a/tests/unit/job/test_query_pandas.py +++ b/tests/unit/job/test_query_pandas.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import concurrent.futures import copy import json @@ -100,7 +99,6 @@ def test_to_dataframe_bqstorage_preserve_order(query): ] }, "totalRows": "4", - "pageToken": "next-page", } connection = _make_connection(get_query_results_resource, job_resource) client = _make_client(connection=connection) @@ -135,16 +133,7 @@ def test_to_dataframe_bqstorage_preserve_order(query): @pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") -@pytest.mark.parametrize( - "method_kwargs", - [ - {"create_bqstorage_client": False}, - # Since all rows are contained in the first page of results, the BigQuery - # Storage API won't actually be used. - {"create_bqstorage_client": True}, - ], -) -def test_to_arrow(method_kwargs): +def test_to_arrow(): from google.cloud.bigquery.job import QueryJob as target_class begun_resource = _make_job_resource(job_type="query") @@ -197,7 +186,7 @@ def test_to_arrow(method_kwargs): client = _make_client(connection=connection) job = target_class.from_api_repr(begun_resource, client) - tbl = job.to_arrow(**method_kwargs) + tbl = job.to_arrow(create_bqstorage_client=False) assert isinstance(tbl, pyarrow.Table) assert tbl.num_rows == 2 @@ -230,165 +219,8 @@ def test_to_arrow(method_kwargs): ] -@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") -@pytest.mark.skipif(tqdm is None, reason="Requires `tqdm`") -def test_to_arrow_w_tqdm_w_query_plan(): - from google.cloud.bigquery import table - from google.cloud.bigquery.job import QueryJob as target_class - from google.cloud.bigquery.schema import SchemaField - from google.cloud.bigquery._tqdm_helpers import _PROGRESS_BAR_UPDATE_INTERVAL - - begun_resource = _make_job_resource(job_type="query") - rows = [ - {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, - {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, - ] - - schema = [ - SchemaField("name", "STRING", mode="REQUIRED"), - SchemaField("age", "INTEGER", mode="REQUIRED"), - ] - connection = _make_connection({}) - client = _make_client(connection=connection) - job = target_class.from_api_repr(begun_resource, client) - - path = "/foo" - api_request = mock.Mock(return_value={"rows": rows}) - row_iterator = table.RowIterator(client, api_request, path, schema) - - job._properties["statistics"] = { - "query": { - "queryPlan": [ - {"name": "S00: Input", "id": "0", "status": "COMPLETE"}, - {"name": "S01: Output", "id": "1", "status": "COMPLETE"}, - ] - }, - } - reload_patch = mock.patch( - "google.cloud.bigquery.job._AsyncJob.reload", autospec=True - ) - result_patch = mock.patch( - "google.cloud.bigquery.job.QueryJob.result", - side_effect=[ - concurrent.futures.TimeoutError, - concurrent.futures.TimeoutError, - row_iterator, - ], - ) - - with result_patch as result_patch_tqdm, reload_patch: - tbl = job.to_arrow(progress_bar_type="tqdm", create_bqstorage_client=False) - - assert result_patch_tqdm.call_count == 3 - assert isinstance(tbl, pyarrow.Table) - assert tbl.num_rows == 2 - result_patch_tqdm.assert_called_with(timeout=_PROGRESS_BAR_UPDATE_INTERVAL) - - -@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") -@pytest.mark.skipif(tqdm is None, reason="Requires `tqdm`") -def test_to_arrow_w_tqdm_w_pending_status(): - from google.cloud.bigquery import table - from google.cloud.bigquery.job import QueryJob as target_class - from google.cloud.bigquery.schema import SchemaField - from google.cloud.bigquery._tqdm_helpers import _PROGRESS_BAR_UPDATE_INTERVAL - - begun_resource = _make_job_resource(job_type="query") - rows = [ - {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, - {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, - ] - - schema = [ - SchemaField("name", "STRING", mode="REQUIRED"), - SchemaField("age", "INTEGER", mode="REQUIRED"), - ] - connection = _make_connection({}) - client = _make_client(connection=connection) - job = target_class.from_api_repr(begun_resource, client) - - path = "/foo" - api_request = mock.Mock(return_value={"rows": rows}) - row_iterator = table.RowIterator(client, api_request, path, schema) - - job._properties["statistics"] = { - "query": { - "queryPlan": [ - {"name": "S00: Input", "id": "0", "status": "PENDING"}, - {"name": "S00: Input", "id": "1", "status": "COMPLETE"}, - ] - }, - } - reload_patch = mock.patch( - "google.cloud.bigquery.job._AsyncJob.reload", autospec=True - ) - result_patch = mock.patch( - "google.cloud.bigquery.job.QueryJob.result", - side_effect=[concurrent.futures.TimeoutError, row_iterator], - ) - - with result_patch as result_patch_tqdm, reload_patch: - tbl = job.to_arrow(progress_bar_type="tqdm", create_bqstorage_client=False) - - assert result_patch_tqdm.call_count == 2 - assert isinstance(tbl, pyarrow.Table) - assert tbl.num_rows == 2 - result_patch_tqdm.assert_called_with(timeout=_PROGRESS_BAR_UPDATE_INTERVAL) - - -@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") -@pytest.mark.skipif(tqdm is None, reason="Requires `tqdm`") -def test_to_arrow_w_tqdm_wo_query_plan(): - from google.cloud.bigquery import table - from google.cloud.bigquery.job import QueryJob as target_class - from google.cloud.bigquery.schema import SchemaField - - begun_resource = _make_job_resource(job_type="query") - rows = [ - {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, - {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, - ] - - schema = [ - SchemaField("name", "STRING", mode="REQUIRED"), - SchemaField("age", "INTEGER", mode="REQUIRED"), - ] - connection = _make_connection({}) - client = _make_client(connection=connection) - job = target_class.from_api_repr(begun_resource, client) - - path = "/foo" - api_request = mock.Mock(return_value={"rows": rows}) - row_iterator = table.RowIterator(client, api_request, path, schema) - - reload_patch = mock.patch( - "google.cloud.bigquery.job._AsyncJob.reload", autospec=True - ) - result_patch = mock.patch( - "google.cloud.bigquery.job.QueryJob.result", - side_effect=[concurrent.futures.TimeoutError, row_iterator], - ) - - with result_patch as result_patch_tqdm, reload_patch: - tbl = job.to_arrow(progress_bar_type="tqdm", create_bqstorage_client=False) - - assert result_patch_tqdm.call_count == 2 - assert isinstance(tbl, pyarrow.Table) - assert tbl.num_rows == 2 - result_patch_tqdm.assert_called() - - @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -@pytest.mark.parametrize( - "method_kwargs", - [ - {"create_bqstorage_client": False}, - # Since all rows are contained in the first page of results, the BigQuery - # Storage API won't actually be used. - {"create_bqstorage_client": True}, - ], -) -def test_to_dataframe(method_kwargs): +def test_to_dataframe(): from google.cloud.bigquery.job import QueryJob as target_class begun_resource = _make_job_resource(job_type="query") @@ -419,7 +251,7 @@ def test_to_dataframe(method_kwargs): client = _make_client(connection=connection) job = target_class.from_api_repr(begun_resource, client) - df = job.to_dataframe(**method_kwargs) + df = job.to_dataframe(create_bqstorage_client=False) assert isinstance(df, pandas.DataFrame) assert len(df) == 4 # verify the number of rows @@ -464,7 +296,6 @@ def test_to_dataframe_bqstorage(): {"name": "age", "type": "INTEGER", "mode": "NULLABLE"}, ] }, - "pageToken": "next-page", } connection = _make_connection(query_resource) client = _make_client(connection=connection) @@ -617,115 +448,3 @@ def test_to_dataframe_with_progress_bar(tqdm_mock): job.to_dataframe(progress_bar_type="tqdm", create_bqstorage_client=False) tqdm_mock.assert_called() - - -@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -@pytest.mark.skipif(tqdm is None, reason="Requires `tqdm`") -def test_to_dataframe_w_tqdm_pending(): - from google.cloud.bigquery import table - from google.cloud.bigquery.job import QueryJob as target_class - from google.cloud.bigquery.schema import SchemaField - from google.cloud.bigquery._tqdm_helpers import _PROGRESS_BAR_UPDATE_INTERVAL - - begun_resource = _make_job_resource(job_type="query") - schema = [ - SchemaField("name", "STRING", mode="NULLABLE"), - SchemaField("age", "INTEGER", mode="NULLABLE"), - ] - rows = [ - {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, - {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, - {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, - {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, - ] - - connection = _make_connection({}) - client = _make_client(connection=connection) - job = target_class.from_api_repr(begun_resource, client) - - path = "/foo" - api_request = mock.Mock(return_value={"rows": rows}) - row_iterator = table.RowIterator(client, api_request, path, schema) - - job._properties["statistics"] = { - "query": { - "queryPlan": [ - {"name": "S00: Input", "id": "0", "status": "PRNDING"}, - {"name": "S01: Output", "id": "1", "status": "COMPLETE"}, - ] - }, - } - reload_patch = mock.patch( - "google.cloud.bigquery.job._AsyncJob.reload", autospec=True - ) - result_patch = mock.patch( - "google.cloud.bigquery.job.QueryJob.result", - side_effect=[concurrent.futures.TimeoutError, row_iterator], - ) - - with result_patch as result_patch_tqdm, reload_patch: - df = job.to_dataframe(progress_bar_type="tqdm", create_bqstorage_client=False) - - assert result_patch_tqdm.call_count == 2 - assert isinstance(df, pandas.DataFrame) - assert len(df) == 4 # verify the number of rows - assert list(df) == ["name", "age"] # verify the column names - result_patch_tqdm.assert_called_with(timeout=_PROGRESS_BAR_UPDATE_INTERVAL) - - -@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -@pytest.mark.skipif(tqdm is None, reason="Requires `tqdm`") -def test_to_dataframe_w_tqdm(): - from google.cloud.bigquery import table - from google.cloud.bigquery.job import QueryJob as target_class - from google.cloud.bigquery.schema import SchemaField - from google.cloud.bigquery._tqdm_helpers import _PROGRESS_BAR_UPDATE_INTERVAL - - begun_resource = _make_job_resource(job_type="query") - schema = [ - SchemaField("name", "STRING", mode="NULLABLE"), - SchemaField("age", "INTEGER", mode="NULLABLE"), - ] - rows = [ - {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, - {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, - {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, - {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, - ] - - connection = _make_connection({}) - client = _make_client(connection=connection) - job = target_class.from_api_repr(begun_resource, client) - - path = "/foo" - api_request = mock.Mock(return_value={"rows": rows}) - row_iterator = table.RowIterator(client, api_request, path, schema) - - job._properties["statistics"] = { - "query": { - "queryPlan": [ - {"name": "S00: Input", "id": "0", "status": "COMPLETE"}, - {"name": "S01: Output", "id": "1", "status": "COMPLETE"}, - ] - }, - } - reload_patch = mock.patch( - "google.cloud.bigquery.job._AsyncJob.reload", autospec=True - ) - result_patch = mock.patch( - "google.cloud.bigquery.job.QueryJob.result", - side_effect=[ - concurrent.futures.TimeoutError, - concurrent.futures.TimeoutError, - row_iterator, - ], - ) - - with result_patch as result_patch_tqdm, reload_patch: - df = job.to_dataframe(progress_bar_type="tqdm", create_bqstorage_client=False) - - assert result_patch_tqdm.call_count == 3 - assert isinstance(df, pandas.DataFrame) - assert len(df) == 4 # verify the number of rows - assert list(df), ["name", "age"] # verify the column names - result_patch_tqdm.assert_called_with(timeout=_PROGRESS_BAR_UPDATE_INTERVAL) From d1aab244bf7ec3bbf620e783c8862f631f545cbf Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 24 Nov 2020 14:40:04 -0600 Subject: [PATCH 5/6] add tests for first page cache --- tests/unit/test_table.py | 57 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index be67eafcd..1dd5fab46 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -1630,6 +1630,40 @@ def test_iterate(self): api_request.assert_called_once_with(method="GET", path=path, query_params={}) + def test_iterate_with_cached_first_page(self): + from google.cloud.bigquery.schema import SchemaField + + first_page = { + "rows": [ + {"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]}, + {"f": [{"v": "Bhetty Rhubble"}, {"v": "28"}]}, + ], + "pageToken": "next-page", + } + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = self._make_one( + _mock_client(), api_request, path, schema, first_page_response=first_page + ) + rows = list(row_iterator) + self.assertEqual(len(rows), 4) + self.assertEqual(rows[0].age, 27) + self.assertEqual(rows[1].age, 28) + self.assertEqual(rows[2].age, 32) + self.assertEqual(rows[3].age, 33) + + api_request.assert_called_once_with( + method="GET", path=path, query_params={"pageToken": "next-page"} + ) + def test_page_size(self): from google.cloud.bigquery.schema import SchemaField @@ -1655,6 +1689,29 @@ def test_page_size(self): query_params={"maxResults": row_iterator._page_size}, ) + def test__is_completely_cached_returns_false_without_first_page(self): + iterator = self._make_one(first_page_response=None) + self.assertFalse(iterator._is_completely_cached()) + + def test__is_completely_cached_returns_false_with_page_token(self): + first_page = {"pageToken": "next-page"} + iterator = self._make_one(first_page_response=first_page) + self.assertFalse(iterator._is_completely_cached()) + + def test__is_completely_cached_returns_true(self): + first_page = {"rows": []} + iterator = self._make_one(first_page_response=first_page) + self.assertTrue(iterator._is_completely_cached()) + + def test__validate_bqstorage_returns_false_when_completely_cached(self): + first_page = {"rows": []} + iterator = self._make_one(first_page_response=first_page) + self.assertFalse( + iterator._validate_bqstorage( + bqstorage_client=None, create_bqstorage_client=True + ) + ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_arrow(self): from google.cloud.bigquery.schema import SchemaField From 989d20bdae9c056ca8f70265bff2d558fffd2316 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 24 Nov 2020 14:51:25 -0600 Subject: [PATCH 6/6] restore tqdm tests --- tests/unit/job/test_query_pandas.py | 261 ++++++++++++++++++++++++++++ 1 file changed, 261 insertions(+) diff --git a/tests/unit/job/test_query_pandas.py b/tests/unit/job/test_query_pandas.py index 37f4a6dec..cdd6f2b3c 100644 --- a/tests/unit/job/test_query_pandas.py +++ b/tests/unit/job/test_query_pandas.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures import copy import json @@ -219,6 +220,154 @@ def test_to_arrow(): ] +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +@pytest.mark.skipif(tqdm is None, reason="Requires `tqdm`") +def test_to_arrow_w_tqdm_w_query_plan(): + from google.cloud.bigquery import table + from google.cloud.bigquery.job import QueryJob as target_class + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery._tqdm_helpers import _PROGRESS_BAR_UPDATE_INTERVAL + + begun_resource = _make_job_resource(job_type="query") + rows = [ + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, + ] + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + connection = _make_connection({}) + client = _make_client(connection=connection) + job = target_class.from_api_repr(begun_resource, client) + + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = table.RowIterator(client, api_request, path, schema) + + job._properties["statistics"] = { + "query": { + "queryPlan": [ + {"name": "S00: Input", "id": "0", "status": "COMPLETE"}, + {"name": "S01: Output", "id": "1", "status": "COMPLETE"}, + ] + }, + } + reload_patch = mock.patch( + "google.cloud.bigquery.job._AsyncJob.reload", autospec=True + ) + result_patch = mock.patch( + "google.cloud.bigquery.job.QueryJob.result", + side_effect=[ + concurrent.futures.TimeoutError, + concurrent.futures.TimeoutError, + row_iterator, + ], + ) + + with result_patch as result_patch_tqdm, reload_patch: + tbl = job.to_arrow(progress_bar_type="tqdm", create_bqstorage_client=False) + + assert result_patch_tqdm.call_count == 3 + assert isinstance(tbl, pyarrow.Table) + assert tbl.num_rows == 2 + result_patch_tqdm.assert_called_with(timeout=_PROGRESS_BAR_UPDATE_INTERVAL) + + +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +@pytest.mark.skipif(tqdm is None, reason="Requires `tqdm`") +def test_to_arrow_w_tqdm_w_pending_status(): + from google.cloud.bigquery import table + from google.cloud.bigquery.job import QueryJob as target_class + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery._tqdm_helpers import _PROGRESS_BAR_UPDATE_INTERVAL + + begun_resource = _make_job_resource(job_type="query") + rows = [ + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, + ] + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + connection = _make_connection({}) + client = _make_client(connection=connection) + job = target_class.from_api_repr(begun_resource, client) + + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = table.RowIterator(client, api_request, path, schema) + + job._properties["statistics"] = { + "query": { + "queryPlan": [ + {"name": "S00: Input", "id": "0", "status": "PENDING"}, + {"name": "S00: Input", "id": "1", "status": "COMPLETE"}, + ] + }, + } + reload_patch = mock.patch( + "google.cloud.bigquery.job._AsyncJob.reload", autospec=True + ) + result_patch = mock.patch( + "google.cloud.bigquery.job.QueryJob.result", + side_effect=[concurrent.futures.TimeoutError, row_iterator], + ) + + with result_patch as result_patch_tqdm, reload_patch: + tbl = job.to_arrow(progress_bar_type="tqdm", create_bqstorage_client=False) + + assert result_patch_tqdm.call_count == 2 + assert isinstance(tbl, pyarrow.Table) + assert tbl.num_rows == 2 + result_patch_tqdm.assert_called_with(timeout=_PROGRESS_BAR_UPDATE_INTERVAL) + + +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +@pytest.mark.skipif(tqdm is None, reason="Requires `tqdm`") +def test_to_arrow_w_tqdm_wo_query_plan(): + from google.cloud.bigquery import table + from google.cloud.bigquery.job import QueryJob as target_class + from google.cloud.bigquery.schema import SchemaField + + begun_resource = _make_job_resource(job_type="query") + rows = [ + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, + ] + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + connection = _make_connection({}) + client = _make_client(connection=connection) + job = target_class.from_api_repr(begun_resource, client) + + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = table.RowIterator(client, api_request, path, schema) + + reload_patch = mock.patch( + "google.cloud.bigquery.job._AsyncJob.reload", autospec=True + ) + result_patch = mock.patch( + "google.cloud.bigquery.job.QueryJob.result", + side_effect=[concurrent.futures.TimeoutError, row_iterator], + ) + + with result_patch as result_patch_tqdm, reload_patch: + tbl = job.to_arrow(progress_bar_type="tqdm", create_bqstorage_client=False) + + assert result_patch_tqdm.call_count == 2 + assert isinstance(tbl, pyarrow.Table) + assert tbl.num_rows == 2 + result_patch_tqdm.assert_called() + + @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") def test_to_dataframe(): from google.cloud.bigquery.job import QueryJob as target_class @@ -448,3 +597,115 @@ def test_to_dataframe_with_progress_bar(tqdm_mock): job.to_dataframe(progress_bar_type="tqdm", create_bqstorage_client=False) tqdm_mock.assert_called() + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(tqdm is None, reason="Requires `tqdm`") +def test_to_dataframe_w_tqdm_pending(): + from google.cloud.bigquery import table + from google.cloud.bigquery.job import QueryJob as target_class + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery._tqdm_helpers import _PROGRESS_BAR_UPDATE_INTERVAL + + begun_resource = _make_job_resource(job_type="query") + schema = [ + SchemaField("name", "STRING", mode="NULLABLE"), + SchemaField("age", "INTEGER", mode="NULLABLE"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, + {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, + ] + + connection = _make_connection({}) + client = _make_client(connection=connection) + job = target_class.from_api_repr(begun_resource, client) + + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = table.RowIterator(client, api_request, path, schema) + + job._properties["statistics"] = { + "query": { + "queryPlan": [ + {"name": "S00: Input", "id": "0", "status": "PRNDING"}, + {"name": "S01: Output", "id": "1", "status": "COMPLETE"}, + ] + }, + } + reload_patch = mock.patch( + "google.cloud.bigquery.job._AsyncJob.reload", autospec=True + ) + result_patch = mock.patch( + "google.cloud.bigquery.job.QueryJob.result", + side_effect=[concurrent.futures.TimeoutError, row_iterator], + ) + + with result_patch as result_patch_tqdm, reload_patch: + df = job.to_dataframe(progress_bar_type="tqdm", create_bqstorage_client=False) + + assert result_patch_tqdm.call_count == 2 + assert isinstance(df, pandas.DataFrame) + assert len(df) == 4 # verify the number of rows + assert list(df) == ["name", "age"] # verify the column names + result_patch_tqdm.assert_called_with(timeout=_PROGRESS_BAR_UPDATE_INTERVAL) + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(tqdm is None, reason="Requires `tqdm`") +def test_to_dataframe_w_tqdm(): + from google.cloud.bigquery import table + from google.cloud.bigquery.job import QueryJob as target_class + from google.cloud.bigquery.schema import SchemaField + from google.cloud.bigquery._tqdm_helpers import _PROGRESS_BAR_UPDATE_INTERVAL + + begun_resource = _make_job_resource(job_type="query") + schema = [ + SchemaField("name", "STRING", mode="NULLABLE"), + SchemaField("age", "INTEGER", mode="NULLABLE"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, + {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, + ] + + connection = _make_connection({}) + client = _make_client(connection=connection) + job = target_class.from_api_repr(begun_resource, client) + + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = table.RowIterator(client, api_request, path, schema) + + job._properties["statistics"] = { + "query": { + "queryPlan": [ + {"name": "S00: Input", "id": "0", "status": "COMPLETE"}, + {"name": "S01: Output", "id": "1", "status": "COMPLETE"}, + ] + }, + } + reload_patch = mock.patch( + "google.cloud.bigquery.job._AsyncJob.reload", autospec=True + ) + result_patch = mock.patch( + "google.cloud.bigquery.job.QueryJob.result", + side_effect=[ + concurrent.futures.TimeoutError, + concurrent.futures.TimeoutError, + row_iterator, + ], + ) + + with result_patch as result_patch_tqdm, reload_patch: + df = job.to_dataframe(progress_bar_type="tqdm", create_bqstorage_client=False) + + assert result_patch_tqdm.call_count == 3 + assert isinstance(df, pandas.DataFrame) + assert len(df) == 4 # verify the number of rows + assert list(df), ["name", "age"] # verify the column names + result_patch_tqdm.assert_called_with(timeout=_PROGRESS_BAR_UPDATE_INTERVAL)