Skip to content

Commit

Permalink
perf: don't fetch rows when waiting for query to finish (#400)
Browse files Browse the repository at this point in the history
When there are large result sets, fetching rows while waiting for the
query to finish can cause the API to hang indefinitely. (This may be due
to an interaction between connection timeout and API timeout.)

This reverts commit 86f6a51 (#374).

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)

Fixes googleapis/python-bigquery-pandas#343
Fixes #394 🦕
  • Loading branch information
tswast committed Nov 24, 2020
1 parent 673a9cb commit 730df17
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 80 deletions.
4 changes: 1 addition & 3 deletions google/cloud/bigquery/client.py
Expand Up @@ -1534,7 +1534,7 @@ def _get_query_results(
A new ``_QueryResults`` instance.
"""

extra_params = {}
extra_params = {"maxResults": 0}

if project is None:
project = self.project
Expand Down Expand Up @@ -3187,7 +3187,6 @@ def _list_rows_from_query_results(
page_size=None,
retry=DEFAULT_RETRY,
timeout=None,
first_page_response=None,
):
"""List the rows of a completed query.
See
Expand Down Expand Up @@ -3248,7 +3247,6 @@ def _list_rows_from_query_results(
table=destination,
extra_params=params,
total_rows=total_rows,
first_page_response=first_page_response,
)
return row_iterator

Expand Down
5 changes: 0 additions & 5 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -1177,10 +1177,6 @@ def result(
if self._query_results.total_rows is None:
return _EmptyRowIterator()

first_page_response = None
if max_results is None and page_size is None and start_index is None:
first_page_response = self._query_results._properties

rows = self._client._list_rows_from_query_results(
self.job_id,
self.location,
Expand All @@ -1193,7 +1189,6 @@ def result(
start_index=start_index,
retry=retry,
timeout=timeout,
first_page_response=first_page_response,
)
rows._preserve_order = _contains_order_by(self.query)
return rows
Expand Down
55 changes: 13 additions & 42 deletions tests/unit/job/test_query.py
Expand Up @@ -787,9 +787,7 @@ def test_result(self):
"location": "EU",
},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "3",
"rows": [{"f": [{"v": "abc"}]}],
"pageToken": "next-page",
"totalRows": "2",
}
job_resource = self._make_resource(started=True, location="EU")
job_resource_done = self._make_resource(started=True, ended=True, location="EU")
Expand All @@ -801,9 +799,9 @@ def test_result(self):
query_page_resource = {
# Explicitly set totalRows to be different from the initial
# response to test update during iteration.
"totalRows": "2",
"totalRows": "1",
"pageToken": None,
"rows": [{"f": [{"v": "def"}]}],
"rows": [{"f": [{"v": "abc"}]}],
}
conn = _make_connection(
query_resource, query_resource_done, job_resource_done, query_page_resource
Expand All @@ -814,20 +812,19 @@ def test_result(self):
result = job.result()

self.assertIsInstance(result, RowIterator)
self.assertEqual(result.total_rows, 3)
self.assertEqual(result.total_rows, 2)
rows = list(result)
self.assertEqual(len(rows), 2)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(rows[1].col1, "def")
# Test that the total_rows property has changed during iteration, based
# on the response from tabledata.list.
self.assertEqual(result.total_rows, 2)
self.assertEqual(result.total_rows, 1)

query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
query_results_call = mock.call(
method="GET",
path=query_results_path,
query_params={"location": "EU"},
query_params={"maxResults": 0, "location": "EU"},
timeout=None,
)
reload_call = mock.call(
Expand All @@ -842,7 +839,6 @@ def test_result(self):
query_params={
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "EU",
"pageToken": "next-page",
},
timeout=None,
)
Expand All @@ -855,9 +851,7 @@ def test_result_with_done_job_calls_get_query_results(self):
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "2",
"rows": [{"f": [{"v": "abc"}]}],
"pageToken": "next-page",
"totalRows": "1",
}
job_resource = self._make_resource(started=True, ended=True, location="EU")
job_resource["configuration"]["query"]["destinationTable"] = {
Expand All @@ -866,9 +860,9 @@ def test_result_with_done_job_calls_get_query_results(self):
"tableId": "dest_table",
}
results_page_resource = {
"totalRows": "2",
"totalRows": "1",
"pageToken": None,
"rows": [{"f": [{"v": "def"}]}],
"rows": [{"f": [{"v": "abc"}]}],
}
conn = _make_connection(query_resource_done, results_page_resource)
client = _make_client(self.PROJECT, connection=conn)
Expand All @@ -877,15 +871,14 @@ def test_result_with_done_job_calls_get_query_results(self):
result = job.result()

rows = list(result)
self.assertEqual(len(rows), 2)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(rows[1].col1, "def")

query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
query_results_call = mock.call(
method="GET",
path=query_results_path,
query_params={"location": "EU"},
query_params={"maxResults": 0, "location": "EU"},
timeout=None,
)
query_results_page_call = mock.call(
Expand All @@ -894,7 +887,6 @@ def test_result_with_done_job_calls_get_query_results(self):
query_params={
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "EU",
"pageToken": "next-page",
},
timeout=None,
)
Expand All @@ -908,12 +900,6 @@ def test_result_with_max_results(self):
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "5",
# These rows are discarded because max_results is set.
"rows": [
{"f": [{"v": "xyz"}]},
{"f": [{"v": "uvw"}]},
{"f": [{"v": "rst"}]},
],
}
query_page_resource = {
"totalRows": "5",
Expand All @@ -939,7 +925,6 @@ def test_result_with_max_results(self):
rows = list(result)

self.assertEqual(len(rows), 3)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(len(connection.api_request.call_args_list), 2)
query_page_request = connection.api_request.call_args_list[1]
self.assertEqual(
Expand Down Expand Up @@ -994,7 +979,7 @@ def test_result_w_retry(self):
query_results_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
query_params={"location": "asia-northeast1"},
query_params={"maxResults": 0, "location": "asia-northeast1"},
timeout=None,
)
reload_call = mock.call(
Expand Down Expand Up @@ -1094,12 +1079,6 @@ def test_result_w_page_size(self):
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "4",
# These rows are discarded because page_size is set.
"rows": [
{"f": [{"v": "xyz"}]},
{"f": [{"v": "uvw"}]},
{"f": [{"v": "rst"}]},
],
}
job_resource = self._make_resource(started=True, ended=True, location="US")
q_config = job_resource["configuration"]["query"]
Expand Down Expand Up @@ -1130,7 +1109,6 @@ def test_result_w_page_size(self):
# Assert
actual_rows = list(result)
self.assertEqual(len(actual_rows), 4)
self.assertEqual(actual_rows[0].col1, "row1")

query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
query_page_1_call = mock.call(
Expand Down Expand Up @@ -1164,12 +1142,6 @@ def test_result_with_start_index(self):
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "5",
# These rows are discarded because start_index is set.
"rows": [
{"f": [{"v": "xyz"}]},
{"f": [{"v": "uvw"}]},
{"f": [{"v": "rst"}]},
],
}
tabledata_resource = {
"totalRows": "5",
Expand All @@ -1196,7 +1168,6 @@ def test_result_with_start_index(self):
rows = list(result)

self.assertEqual(len(rows), 4)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(len(connection.api_request.call_args_list), 2)
tabledata_list_request = connection.api_request.call_args_list[1]
self.assertEqual(
Expand Down
44 changes: 16 additions & 28 deletions tests/unit/job/test_query_pandas.py
Expand Up @@ -100,7 +100,6 @@ def test_to_dataframe_bqstorage_preserve_order(query):
]
},
"totalRows": "4",
"pageToken": "next-page",
}
connection = _make_connection(get_query_results_resource, job_resource)
client = _make_client(connection=connection)
Expand Down Expand Up @@ -135,16 +134,7 @@ def test_to_dataframe_bqstorage_preserve_order(query):


@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`")
@pytest.mark.parametrize(
"method_kwargs",
[
{"create_bqstorage_client": False},
# Since all rows are contained in the first page of results, the BigQuery
# Storage API won't actually be used.
{"create_bqstorage_client": True},
],
)
def test_to_arrow(method_kwargs):
def test_to_arrow():
from google.cloud.bigquery.job import QueryJob as target_class

begun_resource = _make_job_resource(job_type="query")
Expand Down Expand Up @@ -172,6 +162,8 @@ def test_to_arrow(method_kwargs):
},
]
},
}
tabledata_resource = {
"rows": [
{
"f": [
Expand All @@ -185,15 +177,17 @@ def test_to_arrow(method_kwargs):
{"v": {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}},
]
},
],
]
}
done_resource = copy.deepcopy(begun_resource)
done_resource["status"] = {"state": "DONE"}
connection = _make_connection(begun_resource, query_resource, done_resource)
connection = _make_connection(
begun_resource, query_resource, done_resource, tabledata_resource
)
client = _make_client(connection=connection)
job = target_class.from_api_repr(begun_resource, client)

tbl = job.to_arrow(**method_kwargs)
tbl = job.to_arrow(create_bqstorage_client=False)

assert isinstance(tbl, pyarrow.Table)
assert tbl.num_rows == 2
Expand Down Expand Up @@ -375,16 +369,7 @@ def test_to_arrow_w_tqdm_wo_query_plan():


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.parametrize(
"method_kwargs",
[
{"create_bqstorage_client": False},
# Since all rows are contained in the first page of results, the BigQuery
# Storage API won't actually be used.
{"create_bqstorage_client": True},
],
)
def test_to_dataframe(method_kwargs):
def test_to_dataframe():
from google.cloud.bigquery.job import QueryJob as target_class

begun_resource = _make_job_resource(job_type="query")
Expand All @@ -398,20 +383,24 @@ def test_to_dataframe(method_kwargs):
{"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
]
},
}
tabledata_resource = {
"rows": [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]},
],
]
}
done_resource = copy.deepcopy(begun_resource)
done_resource["status"] = {"state": "DONE"}
connection = _make_connection(begun_resource, query_resource, done_resource)
connection = _make_connection(
begun_resource, query_resource, done_resource, tabledata_resource
)
client = _make_client(connection=connection)
job = target_class.from_api_repr(begun_resource, client)

df = job.to_dataframe(**method_kwargs)
df = job.to_dataframe(create_bqstorage_client=False)

assert isinstance(df, pandas.DataFrame)
assert len(df) == 4 # verify the number of rows
Expand Down Expand Up @@ -456,7 +445,6 @@ def test_to_dataframe_bqstorage():
{"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
]
},
"pageToken": "next-page",
}
connection = _make_connection(query_resource)
client = _make_client(connection=connection)
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_client.py
Expand Up @@ -319,7 +319,7 @@ def test__get_query_results_miss_w_explicit_project_and_timeout(self):
conn.api_request.assert_called_once_with(
method="GET",
path=path,
query_params={"timeoutMs": 500, "location": self.LOCATION},
query_params={"maxResults": 0, "timeoutMs": 500, "location": self.LOCATION},
timeout=42,
)

Expand All @@ -336,7 +336,7 @@ def test__get_query_results_miss_w_client_location(self):
conn.api_request.assert_called_once_with(
method="GET",
path="/projects/PROJECT/queries/nothere",
query_params={"location": self.LOCATION},
query_params={"maxResults": 0, "location": self.LOCATION},
timeout=None,
)

Expand Down

0 comments on commit 730df17

Please sign in to comment.