Skip to content

Commit

Permalink
feat: add max_queue_size option for BQ Storage API
Browse files Browse the repository at this point in the history
The new parameter allows configuring the maximum size of the internal
queue used to hold result pages when query data is streamed over the
BigQuery Storage API.
  • Loading branch information
plamut committed Mar 24, 2021
1 parent e175d3a commit 4049e61
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 5 deletions.
20 changes: 18 additions & 2 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -45,6 +45,8 @@

_PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds.

_DEFAULT_MAX_QUEUE_SIZE = 1 # max queue size for BQ Stroage downloads

_PANDAS_DTYPE_TO_BQ = {
"bool": "BOOLEAN",
"datetime64[ns, UTC]": "TIMESTAMP",
Expand Down Expand Up @@ -608,6 +610,7 @@ def _download_table_bqstorage(
preserve_order=False,
selected_fields=None,
page_to_item=None,
max_queue_size=_DEFAULT_MAX_QUEUE_SIZE,
):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""

Expand Down Expand Up @@ -654,7 +657,12 @@ def _download_table_bqstorage(
download_state = _DownloadState()

# Create a queue to collect frames as they are created in each thread.
worker_queue = queue.Queue()
#
# The queue needs to be bounded, because if the user code processes the fetched
# result pages to slowly, but at the same time new pages are rapidly being fetched
# from the server, the queue can grow to the point where the process runs
# out of memory.
worker_queue = queue.Queue(maxsize=max_queue_size)

with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool:
try:
Expand Down Expand Up @@ -716,7 +724,12 @@ def _download_table_bqstorage(


def download_arrow_bqstorage(
project_id, table, bqstorage_client, preserve_order=False, selected_fields=None
project_id,
table,
bqstorage_client,
preserve_order=False,
selected_fields=None,
max_queue_size=_DEFAULT_MAX_QUEUE_SIZE,
):
return _download_table_bqstorage(
project_id,
Expand All @@ -725,6 +738,7 @@ def download_arrow_bqstorage(
preserve_order=preserve_order,
selected_fields=selected_fields,
page_to_item=_bqstorage_page_to_arrow,
max_queue_size=max_queue_size,
)


Expand All @@ -736,6 +750,7 @@ def download_dataframe_bqstorage(
dtypes,
preserve_order=False,
selected_fields=None,
max_queue_size=_DEFAULT_MAX_QUEUE_SIZE,
):
page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes)
return _download_table_bqstorage(
Expand All @@ -745,6 +760,7 @@ def download_dataframe_bqstorage(
preserve_order=preserve_order,
selected_fields=selected_fields,
page_to_item=page_to_item,
max_queue_size=max_queue_size,
)


Expand Down
19 changes: 19 additions & 0 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -28,6 +28,7 @@
from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery import _helpers
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.query import _query_param_from_api_repr
from google.cloud.bigquery.query import ArrayQueryParameter
from google.cloud.bigquery.query import ScalarQueryParameter
Expand Down Expand Up @@ -1199,6 +1200,7 @@ def to_arrow(
progress_bar_type=None,
bqstorage_client=None,
create_bqstorage_client=True,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
table or query.
Expand Down Expand Up @@ -1242,6 +1244,13 @@ def to_arrow(
..versionadded:: 1.24.0
max_queue_size (Optional[int]):
The maximum number of result pages to hold in the internal queue when
streaming query results over the BigQuery Storage API. Ignored if
Storage API is not used.
..versionadded:: 2.14.0
Returns:
pyarrow.Table
A :class:`pyarrow.Table` populated with row data and column
Expand All @@ -1259,6 +1268,7 @@ def to_arrow(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
max_queue_size=max_queue_size,
)

# If changing the signature of this method, make sure to apply the same
Expand All @@ -1270,6 +1280,7 @@ def to_dataframe(
progress_bar_type=None,
create_bqstorage_client=True,
date_as_object=True,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""Return a pandas DataFrame from a QueryJob
Expand Down Expand Up @@ -1316,6 +1327,13 @@ def to_dataframe(
..versionadded:: 1.26.0
max_queue_size (Optional[int]):
The maximum number of result pages to hold in the internal queue when
streaming query results over the BigQuery Storage API. Ignored if
Storage API is not used.
..versionadded:: 2.14.0
Returns:
A :class:`~pandas.DataFrame` populated with row data and column
headers from the query results. The column headers are derived
Expand All @@ -1331,6 +1349,7 @@ def to_dataframe(
progress_bar_type=progress_bar_type,
create_bqstorage_client=create_bqstorage_client,
date_as_object=date_as_object,
max_queue_size=max_queue_size,
)

def __iter__(self):
Expand Down
45 changes: 42 additions & 3 deletions google/cloud/bigquery/table.py
Expand Up @@ -1498,7 +1498,11 @@ def _to_page_iterable(
for item in tabledata_list_download():
yield item

def _to_arrow_iterable(self, bqstorage_client=None):
def _to_arrow_iterable(
self,
bqstorage_client=None,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""Create an iterable of arrow RecordBatches, to process the table as a stream."""
bqstorage_download = functools.partial(
_pandas_helpers.download_arrow_bqstorage,
Expand All @@ -1507,6 +1511,7 @@ def _to_arrow_iterable(self, bqstorage_client=None):
bqstorage_client,
preserve_order=self._preserve_order,
selected_fields=self._selected_fields,
max_queue_size=max_queue_size,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema
Expand All @@ -1524,6 +1529,7 @@ def to_arrow(
progress_bar_type=None,
bqstorage_client=None,
create_bqstorage_client=True,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
table or query.
Expand Down Expand Up @@ -1567,6 +1573,13 @@ def to_arrow(
..versionadded:: 1.24.0
max_queue_size (Optional[int]):
The maximum number of result pages to hold in the internal queue when
streaming query results over the BigQuery Storage API. Ignored if
Storage API is not used.
..versionadded:: 2.14.0
Returns:
pyarrow.Table
A :class:`pyarrow.Table` populated with row data and column
Expand Down Expand Up @@ -1597,7 +1610,7 @@ def to_arrow(

record_batches = []
for record_batch in self._to_arrow_iterable(
bqstorage_client=bqstorage_client
tbqstorage_client=bqstorage_client, max_queue_size=max_queue_size,
):
record_batches.append(record_batch)

Expand All @@ -1622,7 +1635,12 @@ def to_arrow(
arrow_schema = _pandas_helpers.bq_to_arrow_schema(self._schema)
return pyarrow.Table.from_batches(record_batches, schema=arrow_schema)

def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
def to_dataframe_iterable(
self,
bqstorage_client=None,
dtypes=None,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""Create an iterable of pandas DataFrames, to process the table as a stream.
Args:
Expand All @@ -1642,6 +1660,13 @@ def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
``dtype`` is used when constructing the series for the column
specified. Otherwise, the default pandas behavior is used.
max_queue_size (Optional[int]):
The maximum number of result pages to hold in the internal queue when
streaming query results over the BigQuery Storage API. Ignored if
Storage API is not used.
..versionadded:: 2.14.0
Returns:
pandas.DataFrame:
A generator of :class:`~pandas.DataFrame`.
Expand All @@ -1665,6 +1690,7 @@ def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
dtypes,
preserve_order=self._preserve_order,
selected_fields=self._selected_fields,
max_queue_size=max_queue_size,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_dataframe_row_iterator,
Expand All @@ -1687,6 +1713,7 @@ def to_dataframe(
progress_bar_type=None,
create_bqstorage_client=True,
date_as_object=True,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""Create a pandas DataFrame by loading all pages of a query.
Expand Down Expand Up @@ -1742,6 +1769,13 @@ def to_dataframe(
..versionadded:: 1.26.0
max_queue_size (Optional[int]):
The maximum number of result pages to hold in the internal queue when
streaming query results over the BigQuery Storage API. Ignored if
Storage API is not used.
..versionadded:: 2.14.0
Returns:
pandas.DataFrame:
A :class:`~pandas.DataFrame` populated with row data and column
Expand All @@ -1768,6 +1802,7 @@ def to_dataframe(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
max_queue_size=max_queue_size,
)

# When converting timestamp values to nanosecond precision, the result
Expand Down Expand Up @@ -1815,13 +1850,15 @@ def to_arrow(
progress_bar_type=None,
bqstorage_client=None,
create_bqstorage_client=True,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""[Beta] Create an empty class:`pyarrow.Table`.
Args:
progress_bar_type (str): Ignored. Added for compatibility with RowIterator.
bqstorage_client (Any): Ignored. Added for compatibility with RowIterator.
create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator.
max_queue_size (int): Ignored. Added for compatibility with RowIterator.
Returns:
pyarrow.Table: An empty :class:`pyarrow.Table`.
Expand All @@ -1837,6 +1874,7 @@ def to_dataframe(
progress_bar_type=None,
create_bqstorage_client=True,
date_as_object=True,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""Create an empty dataframe.
Expand All @@ -1846,6 +1884,7 @@ def to_dataframe(
progress_bar_type (Any): Ignored. Added for compatibility with RowIterator.
create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator.
date_as_object (bool): Ignored. Added for compatibility with RowIterator.
max_queue_size (int): Ignored. Added for compatibility with RowIterator.
Returns:
pandas.DataFrame: An empty :class:`~pandas.DataFrame`.
Expand Down

0 comments on commit 4049e61

Please sign in to comment.