Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: use jobs.getQueryResults to download result sets #363

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 8 additions & 8 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -474,7 +474,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)


def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
def _row_iterator_page_to_arrow(page, column_names, arrow_types):
# Iterate over the page to force the API request to get the page data.
try:
next(iter(page))
Expand All @@ -490,8 +490,8 @@ def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)


def download_arrow_tabledata_list(pages, bq_schema):
"""Use tabledata.list to construct an iterable of RecordBatches.
def download_arrow_row_iterator(pages, bq_schema):
"""Use HTTP JSON RowIterator to construct an iterable of RecordBatches.

Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
Expand All @@ -510,10 +510,10 @@ def download_arrow_tabledata_list(pages, bq_schema):
arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]

for page in pages:
yield _tabledata_list_page_to_arrow(page, column_names, arrow_types)
yield _row_iterator_page_to_arrow(page, column_names, arrow_types)


def _tabledata_list_page_to_dataframe(page, column_names, dtypes):
def _row_iterator_page_to_dataframe(page, column_names, dtypes):
# Iterate over the page to force the API request to get the page data.
try:
next(iter(page))
Expand All @@ -528,8 +528,8 @@ def _tabledata_list_page_to_dataframe(page, column_names, dtypes):
return pandas.DataFrame(columns, columns=column_names)


def download_dataframe_tabledata_list(pages, bq_schema, dtypes):
"""Use (slower, but free) tabledata.list to construct a DataFrame.
def download_dataframe_row_iterator(pages, bq_schema, dtypes):
"""Use HTTP JSON RowIterator to construct a DataFrame.

Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
Expand All @@ -549,7 +549,7 @@ def download_dataframe_tabledata_list(pages, bq_schema, dtypes):
bq_schema = schema._to_schema_fields(bq_schema)
column_names = [field.name for field in bq_schema]
for page in pages:
yield _tabledata_list_page_to_dataframe(page, column_names, dtypes)
yield _row_iterator_page_to_dataframe(page, column_names, dtypes)


def _bqstorage_page_to_arrow(page):
Expand Down
104 changes: 91 additions & 13 deletions google/cloud/bigquery/client.py
Expand Up @@ -80,18 +80,19 @@
_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
_DEFAULT_NUM_RETRIES = 6
_BASE_UPLOAD_TEMPLATE = (
u"https://bigquery.googleapis.com/upload/bigquery/v2/projects/"
u"{project}/jobs?uploadType="
"https://bigquery.googleapis.com/upload/bigquery/v2/projects/"
"{project}/jobs?uploadType="
)
_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u"multipart"
_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u"resumable"
_GENERIC_CONTENT_TYPE = u"*/*"
_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "multipart"
_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "resumable"
_GENERIC_CONTENT_TYPE = "*/*"
_READ_LESS_THAN_SIZE = (
"Size {:d} was specified but the file-like object only had " "{:d} bytes remaining."
)
_NEED_TABLE_ARGUMENT = (
"The table argument should be a table ID string, Table, or TableReference"
)
_LIST_ROWS_FROM_QUERY_RESULTS_FIELDS = "jobReference,totalRows,pageToken,rows"


class Project(object):
Expand Down Expand Up @@ -293,7 +294,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

return page_iterator.HTTPIterator(
Expand Down Expand Up @@ -371,7 +372,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

return page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1129,7 +1130,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

result = page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1207,7 +1208,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

result = page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1284,7 +1285,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

result = page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1510,7 +1511,7 @@ def delete_table(
raise

def _get_query_results(
self, job_id, retry, project=None, timeout_ms=None, location=None, timeout=None
self, job_id, retry, project=None, timeout_ms=None, location=None, timeout=None,
):
"""Get the query results object for a query job.

Expand Down Expand Up @@ -1890,7 +1891,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

return page_iterator.HTTPIterator(
Expand Down Expand Up @@ -2374,7 +2375,7 @@ def load_table_from_json(

destination = _table_arg_to_table_ref(destination, default_project=self.project)

data_str = u"\n".join(json.dumps(item) for item in json_rows)
data_str = "\n".join(json.dumps(item) for item in json_rows)
encoded_str = data_str.encode()
data_file = io.BytesIO(encoded_str)
return self.load_table_from_file(
Expand Down Expand Up @@ -3169,6 +3170,83 @@ def list_rows(
# Pass in selected_fields separately from schema so that full
# tables can be fetched without a column filter.
selected_fields=selected_fields,
total_rows=getattr(table, "num_rows", None),
)
return row_iterator

def _list_rows_from_query_results(
self,
job_id,
location,
project,
schema,
total_rows=None,
destination=None,
max_results=None,
start_index=None,
page_size=None,
retry=DEFAULT_RETRY,
timeout=None,
):
"""List the rows of a completed query.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults
Args:
job_id (str):
ID of a query job.
location (str): Location of the query job.
project (str):
ID of the project where the query job was run.
schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
The fields expected in these query results. Used to convert
from JSON to expected Python types.
total_rows (Optional[int]):
Total number of rows in the query results.
destination (Optional[Union[ \
google.cloud.bigquery.table.Table, \
google.cloud.bigquery.table.TableListItem, \
google.cloud.bigquery.table.TableReference, \
str, \
]]):
Destination table reference. Used to fetch the query results
with the BigQuery Storage API.
max_results (Optional[int]):
Maximum number of rows to return across the whole iterator.
start_index (Optional[int]):
The zero-based index of the starting row to read.
page_size (Optional[int]):
The maximum number of rows in each page of results from this request.
Non-positive values are ignored. Defaults to a sensible value set by the API.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
If multiple requests are made under the hood, ``timeout``
applies to each individual request.
Returns:
google.cloud.bigquery.table.RowIterator:
Iterator of row data
:class:`~google.cloud.bigquery.table.Row`-s.
"""
params = {
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": location,
}

if start_index is not None:
params["startIndex"] = start_index

row_iterator = RowIterator(
client=self,
api_request=functools.partial(self._call_api, retry, timeout=timeout),
path=f"/projects/{project}/queries/{job_id}",
schema=schema,
max_results=max_results,
page_size=page_size,
table=destination,
extra_params=params,
tswast marked this conversation as resolved.
Show resolved Hide resolved
total_rows=total_rows,
)
return row_iterator

Expand Down
14 changes: 7 additions & 7 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -38,7 +38,6 @@
from google.cloud.bigquery.table import _EmptyRowIterator
from google.cloud.bigquery.table import RangePartitioning
from google.cloud.bigquery.table import _table_arg_to_table_ref
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import TimePartitioning

Expand Down Expand Up @@ -1159,12 +1158,13 @@ def result(
if self._query_results.total_rows is None:
return _EmptyRowIterator()

schema = self._query_results.schema
dest_table_ref = self.destination
dest_table = Table(dest_table_ref, schema=schema)
dest_table._properties["numRows"] = self._query_results.total_rows
rows = self._client.list_rows(
dest_table,
rows = self._client._list_rows_from_query_results(
self._query_results.job_id,
self.location,
self._query_results.project,
self._query_results.schema,
total_rows=self._query_results.total_rows,
destination=self.destination,
page_size=page_size,
max_results=max_results,
start_index=start_index,
Expand Down
17 changes: 10 additions & 7 deletions google/cloud/bigquery/table.py
Expand Up @@ -1306,6 +1306,8 @@ class RowIterator(HTTPIterator):
call the BigQuery Storage API to fetch rows.
selected_fields (Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]):
A subset of columns to select from this table.
total_rows (Optional[int]):
Total number of rows in the table.

"""

Expand All @@ -1321,6 +1323,7 @@ def __init__(
extra_params=None,
table=None,
selected_fields=None,
total_rows=None,
):
super(RowIterator, self).__init__(
client,
Expand All @@ -1342,7 +1345,7 @@ def __init__(
self._schema = schema
self._selected_fields = selected_fields
self._table = table
self._total_rows = getattr(table, "num_rows", None)
self._total_rows = total_rows

def _get_next_page_response(self):
"""Requests the next page from the path provided.
Expand Down Expand Up @@ -1419,7 +1422,7 @@ def _to_arrow_iterable(self, bqstorage_client=None):
selected_fields=self._selected_fields,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_arrow_tabledata_list, iter(self.pages), self.schema
_pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema
)
return self._to_page_iterable(
bqstorage_download,
Expand Down Expand Up @@ -1496,7 +1499,7 @@ def to_arrow(
) and self.max_results is not None:
warnings.warn(
"Cannot use bqstorage_client if max_results is set, "
"reverting to fetching data with the tabledata.list endpoint.",
"reverting to fetching data with the REST endpoint.",
stacklevel=2,
)
create_bqstorage_client = False
Expand Down Expand Up @@ -1582,7 +1585,7 @@ def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
selected_fields=self._selected_fields,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_dataframe_tabledata_list,
_pandas_helpers.download_dataframe_row_iterator,
iter(self.pages),
self.schema,
dtypes,
Expand Down Expand Up @@ -1680,7 +1683,7 @@ def to_dataframe(
) and self.max_results is not None:
warnings.warn(
"Cannot use bqstorage_client if max_results is set, "
"reverting to fetching data with the tabledata.list endpoint.",
"reverting to fetching data with the REST endpoint.",
stacklevel=2,
)
create_bqstorage_client = False
Expand Down Expand Up @@ -2167,7 +2170,7 @@ def _item_to_row(iterator, resource):
)


def _tabledata_list_page_columns(schema, response):
def _row_iterator_page_columns(schema, response):
"""Make a generator of all the columns in a page from tabledata.list.

This enables creating a :class:`pandas.DataFrame` and other
Expand Down Expand Up @@ -2197,7 +2200,7 @@ def _rows_page_start(iterator, page, response):
"""
# Make a (lazy) copy of the page in column-oriented format for use in data
# science packages.
page._columns = _tabledata_list_page_columns(iterator._schema, response)
page._columns = _row_iterator_page_columns(iterator._schema, response)

total_rows = response.get("totalRows")
if total_rows is not None:
Expand Down
10 changes: 8 additions & 2 deletions tests/unit/job/helpers.py
Expand Up @@ -60,6 +60,7 @@ def _make_job_resource(
endpoint="https://bigquery.googleapis.com",
job_type="load",
job_id="a-random-id",
location="US",
project_id="some-project",
user_email="bq-user@example.com",
):
Expand All @@ -69,7 +70,11 @@ def _make_job_resource(
"statistics": {"creationTime": creation_time_ms, job_type: {}},
"etag": etag,
"id": "{}:{}".format(project_id, job_id),
"jobReference": {"projectId": project_id, "jobId": job_id},
"jobReference": {
"projectId": project_id,
"jobId": job_id,
"location": location,
},
"selfLink": "{}/bigquery/v2/projects/{}/jobs/{}".format(
endpoint, project_id, job_id
),
Expand Down Expand Up @@ -130,7 +135,7 @@ def _table_ref(self, table_id):

return TableReference(self.DS_REF, table_id)

def _make_resource(self, started=False, ended=False):
def _make_resource(self, started=False, ended=False, location="US"):
self._setUpConstants()
return _make_job_resource(
creation_time_ms=int(self.WHEN_TS * 1000),
Expand All @@ -144,6 +149,7 @@ def _make_resource(self, started=False, ended=False):
job_id=self.JOB_ID,
project_id=self.PROJECT,
user_email=self.USER_EMAIL,
location=location,
)

def _verifyInitialReadonlyProperties(self, job):
Expand Down