Skip to content

Commit

Permalink
feat: add max_queue_size option for BQ Storage API
Browse files Browse the repository at this point in the history
The new parameter allows configuring the maximum size of the internal
queue used to hold result pages when query data is streamed over the
BigQuery Storage API.
  • Loading branch information
plamut committed Mar 29, 2021
1 parent e175d3a commit 05cd336
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 5 deletions.
20 changes: 18 additions & 2 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -45,6 +45,8 @@

_PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds.

_DEFAULT_MAX_QUEUE_SIZE = 1 # max queue size for BQ Storage downloads

_PANDAS_DTYPE_TO_BQ = {
"bool": "BOOLEAN",
"datetime64[ns, UTC]": "TIMESTAMP",
Expand Down Expand Up @@ -608,6 +610,7 @@ def _download_table_bqstorage(
preserve_order=False,
selected_fields=None,
page_to_item=None,
max_queue_size=_DEFAULT_MAX_QUEUE_SIZE,
):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""

Expand Down Expand Up @@ -654,7 +657,12 @@ def _download_table_bqstorage(
download_state = _DownloadState()

# Create a queue to collect frames as they are created in each thread.
worker_queue = queue.Queue()
#
# The queue needs to be bounded, because if the user code processes the fetched
# result pages to slowly, but at the same time new pages are rapidly being fetched
# from the server, the queue can grow to the point where the process runs
# out of memory.
worker_queue = queue.Queue(maxsize=max_queue_size)

with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool:
try:
Expand Down Expand Up @@ -716,7 +724,12 @@ def _download_table_bqstorage(


def download_arrow_bqstorage(
project_id, table, bqstorage_client, preserve_order=False, selected_fields=None
project_id,
table,
bqstorage_client,
preserve_order=False,
selected_fields=None,
max_queue_size=_DEFAULT_MAX_QUEUE_SIZE,
):
return _download_table_bqstorage(
project_id,
Expand All @@ -725,6 +738,7 @@ def download_arrow_bqstorage(
preserve_order=preserve_order,
selected_fields=selected_fields,
page_to_item=_bqstorage_page_to_arrow,
max_queue_size=max_queue_size,
)


Expand All @@ -736,6 +750,7 @@ def download_dataframe_bqstorage(
dtypes,
preserve_order=False,
selected_fields=None,
max_queue_size=_DEFAULT_MAX_QUEUE_SIZE,
):
page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes)
return _download_table_bqstorage(
Expand All @@ -745,6 +760,7 @@ def download_dataframe_bqstorage(
preserve_order=preserve_order,
selected_fields=selected_fields,
page_to_item=page_to_item,
max_queue_size=max_queue_size,
)


Expand Down
19 changes: 19 additions & 0 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -28,6 +28,7 @@
from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery import _helpers
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.query import _query_param_from_api_repr
from google.cloud.bigquery.query import ArrayQueryParameter
from google.cloud.bigquery.query import ScalarQueryParameter
Expand Down Expand Up @@ -1199,6 +1200,7 @@ def to_arrow(
progress_bar_type=None,
bqstorage_client=None,
create_bqstorage_client=True,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
table or query.
Expand Down Expand Up @@ -1242,6 +1244,13 @@ def to_arrow(
..versionadded:: 1.24.0
max_queue_size (Optional[int]):
The maximum number of result pages to hold in the internal queue when
streaming query results over the BigQuery Storage API. Ignored if
Storage API is not used.
..versionadded:: 2.14.0
Returns:
pyarrow.Table
A :class:`pyarrow.Table` populated with row data and column
Expand All @@ -1259,6 +1268,7 @@ def to_arrow(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
max_queue_size=max_queue_size,
)

# If changing the signature of this method, make sure to apply the same
Expand All @@ -1270,6 +1280,7 @@ def to_dataframe(
progress_bar_type=None,
create_bqstorage_client=True,
date_as_object=True,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""Return a pandas DataFrame from a QueryJob
Expand Down Expand Up @@ -1316,6 +1327,13 @@ def to_dataframe(
..versionadded:: 1.26.0
max_queue_size (Optional[int]):
The maximum number of result pages to hold in the internal queue when
streaming query results over the BigQuery Storage API. Ignored if
Storage API is not used.
..versionadded:: 2.14.0
Returns:
A :class:`~pandas.DataFrame` populated with row data and column
headers from the query results. The column headers are derived
Expand All @@ -1331,6 +1349,7 @@ def to_dataframe(
progress_bar_type=progress_bar_type,
create_bqstorage_client=create_bqstorage_client,
date_as_object=date_as_object,
max_queue_size=max_queue_size,
)

def __iter__(self):
Expand Down
45 changes: 42 additions & 3 deletions google/cloud/bigquery/table.py
Expand Up @@ -1498,7 +1498,11 @@ def _to_page_iterable(
for item in tabledata_list_download():
yield item

def _to_arrow_iterable(self, bqstorage_client=None):
def _to_arrow_iterable(
self,
bqstorage_client=None,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""Create an iterable of arrow RecordBatches, to process the table as a stream."""
bqstorage_download = functools.partial(
_pandas_helpers.download_arrow_bqstorage,
Expand All @@ -1507,6 +1511,7 @@ def _to_arrow_iterable(self, bqstorage_client=None):
bqstorage_client,
preserve_order=self._preserve_order,
selected_fields=self._selected_fields,
max_queue_size=max_queue_size,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema
Expand All @@ -1524,6 +1529,7 @@ def to_arrow(
progress_bar_type=None,
bqstorage_client=None,
create_bqstorage_client=True,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
table or query.
Expand Down Expand Up @@ -1567,6 +1573,13 @@ def to_arrow(
..versionadded:: 1.24.0
max_queue_size (Optional[int]):
The maximum number of result pages to hold in the internal queue when
streaming query results over the BigQuery Storage API. Ignored if
Storage API is not used.
..versionadded:: 2.14.0
Returns:
pyarrow.Table
A :class:`pyarrow.Table` populated with row data and column
Expand Down Expand Up @@ -1597,7 +1610,7 @@ def to_arrow(

record_batches = []
for record_batch in self._to_arrow_iterable(
bqstorage_client=bqstorage_client
bqstorage_client=bqstorage_client, max_queue_size=max_queue_size,
):
record_batches.append(record_batch)

Expand All @@ -1622,7 +1635,12 @@ def to_arrow(
arrow_schema = _pandas_helpers.bq_to_arrow_schema(self._schema)
return pyarrow.Table.from_batches(record_batches, schema=arrow_schema)

def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
def to_dataframe_iterable(
self,
bqstorage_client=None,
dtypes=None,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""Create an iterable of pandas DataFrames, to process the table as a stream.
Args:
Expand All @@ -1642,6 +1660,13 @@ def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
``dtype`` is used when constructing the series for the column
specified. Otherwise, the default pandas behavior is used.
max_queue_size (Optional[int]):
The maximum number of result pages to hold in the internal queue when
streaming query results over the BigQuery Storage API. Ignored if
Storage API is not used.
..versionadded:: 2.14.0
Returns:
pandas.DataFrame:
A generator of :class:`~pandas.DataFrame`.
Expand All @@ -1665,6 +1690,7 @@ def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
dtypes,
preserve_order=self._preserve_order,
selected_fields=self._selected_fields,
max_queue_size=max_queue_size,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_dataframe_row_iterator,
Expand All @@ -1687,6 +1713,7 @@ def to_dataframe(
progress_bar_type=None,
create_bqstorage_client=True,
date_as_object=True,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""Create a pandas DataFrame by loading all pages of a query.
Expand Down Expand Up @@ -1742,6 +1769,13 @@ def to_dataframe(
..versionadded:: 1.26.0
max_queue_size (Optional[int]):
The maximum number of result pages to hold in the internal queue when
streaming query results over the BigQuery Storage API. Ignored if
Storage API is not used.
..versionadded:: 2.14.0
Returns:
pandas.DataFrame:
A :class:`~pandas.DataFrame` populated with row data and column
Expand All @@ -1768,6 +1802,7 @@ def to_dataframe(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
max_queue_size=max_queue_size,
)

# When converting timestamp values to nanosecond precision, the result
Expand Down Expand Up @@ -1815,13 +1850,15 @@ def to_arrow(
progress_bar_type=None,
bqstorage_client=None,
create_bqstorage_client=True,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""[Beta] Create an empty class:`pyarrow.Table`.
Args:
progress_bar_type (str): Ignored. Added for compatibility with RowIterator.
bqstorage_client (Any): Ignored. Added for compatibility with RowIterator.
create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator.
max_queue_size (int): Ignored. Added for compatibility with RowIterator.
Returns:
pyarrow.Table: An empty :class:`pyarrow.Table`.
Expand All @@ -1837,6 +1874,7 @@ def to_dataframe(
progress_bar_type=None,
create_bqstorage_client=True,
date_as_object=True,
max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE,
):
"""Create an empty dataframe.
Expand All @@ -1846,6 +1884,7 @@ def to_dataframe(
progress_bar_type (Any): Ignored. Added for compatibility with RowIterator.
create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator.
date_as_object (bool): Ignored. Added for compatibility with RowIterator.
max_queue_size (int): Ignored. Added for compatibility with RowIterator.
Returns:
pandas.DataFrame: An empty :class:`~pandas.DataFrame`.
Expand Down
40 changes: 40 additions & 0 deletions tests/unit/test__pandas_helpers.py
Expand Up @@ -17,6 +17,7 @@
import decimal
import functools
import operator
import queue
import warnings

import mock
Expand Down Expand Up @@ -1265,6 +1266,45 @@ def test_dataframe_to_parquet_dict_sequence_schema(module_under_test):
assert schema_arg == expected_schema_arg


def test__download_table_bqstorage_bounded_queue_size(module_under_test):
from google.cloud.bigquery import dataset
from google.cloud.bigquery import table

table_ref = table.TableReference(
dataset.DatasetReference("project-x", "dataset-y"), "table-z",
)

bigquery_storage = pytest.importorskip(
"google.cloud.bigquery_storage",
reason="Requires BigQuery Storage dependency."
)
bqstorage_client = mock.create_autospec(
bigquery_storage.BigQueryReadClient, instance=True
)
fake_session = mock.Mock(streams=["stream/s1", "stream/s2", "stream/s3"])
bqstorage_client.create_read_session.return_value = fake_session

def fake_download_stream(
download_state, bqstorage_client, session, stream, worker_queue, page_to_item
):
worker_queue.put_nowait("result")

download_stream = mock.Mock(side_effect=fake_download_stream)

with pytest.raises(queue.Full), mock.patch.object(
module_under_test, "_download_table_bqstorage_stream", new=download_stream
):
result_gen = module_under_test._download_table_bqstorage(
"some-project", table_ref, bqstorage_client, max_queue_size=2,
)
list(result_gen)

# Timing-safe, as the method under test should block until the pool shutdown is
# complete, at which point all download stream workers have already been submitted
# to the thread pool.
assert download_stream.call_count == 3


@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`")
def test_download_arrow_row_iterator_unknown_field_type(module_under_test):
fake_page = api_core.page_iterator.Page(
Expand Down

0 comments on commit 05cd336

Please sign in to comment.