Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: don't fetch rows when waiting for query to finish #400

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions google/cloud/bigquery/client.py
Expand Up @@ -1534,7 +1534,7 @@ def _get_query_results(
A new ``_QueryResults`` instance.
"""

extra_params = {}
extra_params = {"maxResults": 0}

if project is None:
project = self.project
Expand Down Expand Up @@ -3187,7 +3187,6 @@ def _list_rows_from_query_results(
page_size=None,
retry=DEFAULT_RETRY,
timeout=None,
first_page_response=None,
):
"""List the rows of a completed query.
See
Expand Down Expand Up @@ -3248,7 +3247,6 @@ def _list_rows_from_query_results(
table=destination,
extra_params=params,
total_rows=total_rows,
first_page_response=first_page_response,
)
return row_iterator

Expand Down
5 changes: 0 additions & 5 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -1177,10 +1177,6 @@ def result(
if self._query_results.total_rows is None:
return _EmptyRowIterator()

first_page_response = None
if max_results is None and page_size is None and start_index is None:
first_page_response = self._query_results._properties

rows = self._client._list_rows_from_query_results(
self.job_id,
self.location,
Expand All @@ -1193,7 +1189,6 @@ def result(
start_index=start_index,
retry=retry,
timeout=timeout,
first_page_response=first_page_response,
)
rows._preserve_order = _contains_order_by(self.query)
return rows
Expand Down
55 changes: 13 additions & 42 deletions tests/unit/job/test_query.py
Expand Up @@ -787,9 +787,7 @@ def test_result(self):
"location": "EU",
},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "3",
"rows": [{"f": [{"v": "abc"}]}],
"pageToken": "next-page",
"totalRows": "2",
}
job_resource = self._make_resource(started=True, location="EU")
job_resource_done = self._make_resource(started=True, ended=True, location="EU")
Expand All @@ -801,9 +799,9 @@ def test_result(self):
query_page_resource = {
# Explicitly set totalRows to be different from the initial
# response to test update during iteration.
"totalRows": "2",
"totalRows": "1",
"pageToken": None,
"rows": [{"f": [{"v": "def"}]}],
"rows": [{"f": [{"v": "abc"}]}],
}
conn = _make_connection(
query_resource, query_resource_done, job_resource_done, query_page_resource
Expand All @@ -814,20 +812,19 @@ def test_result(self):
result = job.result()

self.assertIsInstance(result, RowIterator)
self.assertEqual(result.total_rows, 3)
self.assertEqual(result.total_rows, 2)
rows = list(result)
self.assertEqual(len(rows), 2)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(rows[1].col1, "def")
# Test that the total_rows property has changed during iteration, based
# on the response from tabledata.list.
self.assertEqual(result.total_rows, 2)
self.assertEqual(result.total_rows, 1)

query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
query_results_call = mock.call(
method="GET",
path=query_results_path,
query_params={"location": "EU"},
query_params={"maxResults": 0, "location": "EU"},
timeout=None,
)
reload_call = mock.call(
Expand All @@ -842,7 +839,6 @@ def test_result(self):
query_params={
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "EU",
"pageToken": "next-page",
},
timeout=None,
)
Expand All @@ -855,9 +851,7 @@ def test_result_with_done_job_calls_get_query_results(self):
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "2",
"rows": [{"f": [{"v": "abc"}]}],
"pageToken": "next-page",
"totalRows": "1",
}
job_resource = self._make_resource(started=True, ended=True, location="EU")
job_resource["configuration"]["query"]["destinationTable"] = {
Expand All @@ -866,9 +860,9 @@ def test_result_with_done_job_calls_get_query_results(self):
"tableId": "dest_table",
}
results_page_resource = {
"totalRows": "2",
"totalRows": "1",
"pageToken": None,
"rows": [{"f": [{"v": "def"}]}],
"rows": [{"f": [{"v": "abc"}]}],
}
conn = _make_connection(query_resource_done, results_page_resource)
client = _make_client(self.PROJECT, connection=conn)
Expand All @@ -877,15 +871,14 @@ def test_result_with_done_job_calls_get_query_results(self):
result = job.result()

rows = list(result)
self.assertEqual(len(rows), 2)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(rows[1].col1, "def")

query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
query_results_call = mock.call(
method="GET",
path=query_results_path,
query_params={"location": "EU"},
query_params={"maxResults": 0, "location": "EU"},
timeout=None,
)
query_results_page_call = mock.call(
Expand All @@ -894,7 +887,6 @@ def test_result_with_done_job_calls_get_query_results(self):
query_params={
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "EU",
"pageToken": "next-page",
},
timeout=None,
)
Expand All @@ -908,12 +900,6 @@ def test_result_with_max_results(self):
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "5",
# These rows are discarded because max_results is set.
"rows": [
{"f": [{"v": "xyz"}]},
{"f": [{"v": "uvw"}]},
{"f": [{"v": "rst"}]},
],
}
query_page_resource = {
"totalRows": "5",
Expand All @@ -939,7 +925,6 @@ def test_result_with_max_results(self):
rows = list(result)

self.assertEqual(len(rows), 3)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(len(connection.api_request.call_args_list), 2)
query_page_request = connection.api_request.call_args_list[1]
self.assertEqual(
Expand Down Expand Up @@ -994,7 +979,7 @@ def test_result_w_retry(self):
query_results_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
query_params={"location": "asia-northeast1"},
query_params={"maxResults": 0, "location": "asia-northeast1"},
timeout=None,
)
reload_call = mock.call(
Expand Down Expand Up @@ -1094,12 +1079,6 @@ def test_result_w_page_size(self):
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "4",
# These rows are discarded because page_size is set.
"rows": [
{"f": [{"v": "xyz"}]},
{"f": [{"v": "uvw"}]},
{"f": [{"v": "rst"}]},
],
}
job_resource = self._make_resource(started=True, ended=True, location="US")
q_config = job_resource["configuration"]["query"]
Expand Down Expand Up @@ -1130,7 +1109,6 @@ def test_result_w_page_size(self):
# Assert
actual_rows = list(result)
self.assertEqual(len(actual_rows), 4)
self.assertEqual(actual_rows[0].col1, "row1")

query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
query_page_1_call = mock.call(
Expand Down Expand Up @@ -1164,12 +1142,6 @@ def test_result_with_start_index(self):
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "5",
# These rows are discarded because start_index is set.
"rows": [
{"f": [{"v": "xyz"}]},
{"f": [{"v": "uvw"}]},
{"f": [{"v": "rst"}]},
],
}
tabledata_resource = {
"totalRows": "5",
Expand All @@ -1196,7 +1168,6 @@ def test_result_with_start_index(self):
rows = list(result)

self.assertEqual(len(rows), 4)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(len(connection.api_request.call_args_list), 2)
tabledata_list_request = connection.api_request.call_args_list[1]
self.assertEqual(
Expand Down
44 changes: 16 additions & 28 deletions tests/unit/job/test_query_pandas.py
Expand Up @@ -100,7 +100,6 @@ def test_to_dataframe_bqstorage_preserve_order(query):
]
},
"totalRows": "4",
"pageToken": "next-page",
}
connection = _make_connection(get_query_results_resource, job_resource)
client = _make_client(connection=connection)
Expand Down Expand Up @@ -135,16 +134,7 @@ def test_to_dataframe_bqstorage_preserve_order(query):


@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`")
@pytest.mark.parametrize(
"method_kwargs",
[
{"create_bqstorage_client": False},
# Since all rows are contained in the first page of results, the BigQuery
# Storage API won't actually be used.
{"create_bqstorage_client": True},
],
)
def test_to_arrow(method_kwargs):
def test_to_arrow():
from google.cloud.bigquery.job import QueryJob as target_class

begun_resource = _make_job_resource(job_type="query")
Expand Down Expand Up @@ -172,6 +162,8 @@ def test_to_arrow(method_kwargs):
},
]
},
}
tabledata_resource = {
"rows": [
{
"f": [
Expand All @@ -185,15 +177,17 @@ def test_to_arrow(method_kwargs):
{"v": {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}},
]
},
],
]
}
done_resource = copy.deepcopy(begun_resource)
done_resource["status"] = {"state": "DONE"}
connection = _make_connection(begun_resource, query_resource, done_resource)
connection = _make_connection(
begun_resource, query_resource, done_resource, tabledata_resource
)
client = _make_client(connection=connection)
job = target_class.from_api_repr(begun_resource, client)

tbl = job.to_arrow(**method_kwargs)
tbl = job.to_arrow(create_bqstorage_client=False)

assert isinstance(tbl, pyarrow.Table)
assert tbl.num_rows == 2
Expand Down Expand Up @@ -375,16 +369,7 @@ def test_to_arrow_w_tqdm_wo_query_plan():


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.parametrize(
"method_kwargs",
[
{"create_bqstorage_client": False},
# Since all rows are contained in the first page of results, the BigQuery
# Storage API won't actually be used.
{"create_bqstorage_client": True},
],
)
def test_to_dataframe(method_kwargs):
def test_to_dataframe():
from google.cloud.bigquery.job import QueryJob as target_class

begun_resource = _make_job_resource(job_type="query")
Expand All @@ -398,20 +383,24 @@ def test_to_dataframe(method_kwargs):
{"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
]
},
}
tabledata_resource = {
"rows": [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
{"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]},
{"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]},
],
]
}
done_resource = copy.deepcopy(begun_resource)
done_resource["status"] = {"state": "DONE"}
connection = _make_connection(begun_resource, query_resource, done_resource)
connection = _make_connection(
begun_resource, query_resource, done_resource, tabledata_resource
)
client = _make_client(connection=connection)
job = target_class.from_api_repr(begun_resource, client)

df = job.to_dataframe(**method_kwargs)
df = job.to_dataframe(create_bqstorage_client=False)

assert isinstance(df, pandas.DataFrame)
assert len(df) == 4 # verify the number of rows
Expand Down Expand Up @@ -456,7 +445,6 @@ def test_to_dataframe_bqstorage():
{"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
]
},
"pageToken": "next-page",
}
connection = _make_connection(query_resource)
client = _make_client(connection=connection)
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_client.py
Expand Up @@ -319,7 +319,7 @@ def test__get_query_results_miss_w_explicit_project_and_timeout(self):
conn.api_request.assert_called_once_with(
method="GET",
path=path,
query_params={"timeoutMs": 500, "location": self.LOCATION},
query_params={"maxResults": 0, "timeoutMs": 500, "location": self.LOCATION},
timeout=42,
)

Expand All @@ -336,7 +336,7 @@ def test__get_query_results_miss_w_client_location(self):
conn.api_request.assert_called_once_with(
method="GET",
path="/projects/PROJECT/queries/nothere",
query_params={"location": self.LOCATION},
query_params={"maxResults": 0, "location": self.LOCATION},
timeout=None,
)

Expand Down