From 05cd336effa1f473408f184cbc6924cff106980a Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 24 Mar 2021 16:44:13 +0100 Subject: [PATCH 1/5] feat: add max_queue_size option for BQ Storage API The new parameter allows configuring the maximum size of the internal queue used to hold result pages when query data is streamed over the BigQuery Storage API. --- google/cloud/bigquery/_pandas_helpers.py | 20 +++++++++-- google/cloud/bigquery/job/query.py | 19 ++++++++++ google/cloud/bigquery/table.py | 45 ++++++++++++++++++++++-- tests/unit/test__pandas_helpers.py | 40 +++++++++++++++++++++ 4 files changed, 119 insertions(+), 5 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 7ad416e08..8da42dc83 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -45,6 +45,8 @@ _PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds. +_DEFAULT_MAX_QUEUE_SIZE = 1 # max queue size for BQ Storage downloads + _PANDAS_DTYPE_TO_BQ = { "bool": "BOOLEAN", "datetime64[ns, UTC]": "TIMESTAMP", @@ -608,6 +610,7 @@ def _download_table_bqstorage( preserve_order=False, selected_fields=None, page_to_item=None, + max_queue_size=_DEFAULT_MAX_QUEUE_SIZE, ): """Use (faster, but billable) BQ Storage API to construct DataFrame.""" @@ -654,7 +657,12 @@ def _download_table_bqstorage( download_state = _DownloadState() # Create a queue to collect frames as they are created in each thread. - worker_queue = queue.Queue() + # + # The queue needs to be bounded, because if the user code processes the fetched + # result pages to slowly, but at the same time new pages are rapidly being fetched + # from the server, the queue can grow to the point where the process runs + # out of memory. + worker_queue = queue.Queue(maxsize=max_queue_size) with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool: try: @@ -716,7 +724,12 @@ def _download_table_bqstorage( def download_arrow_bqstorage( - project_id, table, bqstorage_client, preserve_order=False, selected_fields=None + project_id, + table, + bqstorage_client, + preserve_order=False, + selected_fields=None, + max_queue_size=_DEFAULT_MAX_QUEUE_SIZE, ): return _download_table_bqstorage( project_id, @@ -725,6 +738,7 @@ def download_arrow_bqstorage( preserve_order=preserve_order, selected_fields=selected_fields, page_to_item=_bqstorage_page_to_arrow, + max_queue_size=max_queue_size, ) @@ -736,6 +750,7 @@ def download_dataframe_bqstorage( dtypes, preserve_order=False, selected_fields=None, + max_queue_size=_DEFAULT_MAX_QUEUE_SIZE, ): page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) return _download_table_bqstorage( @@ -745,6 +760,7 @@ def download_dataframe_bqstorage( preserve_order=preserve_order, selected_fields=selected_fields, page_to_item=page_to_item, + max_queue_size=max_queue_size, ) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 491983f8e..690ec9f47 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -28,6 +28,7 @@ from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration from google.cloud.bigquery.external_config import ExternalConfig from google.cloud.bigquery import _helpers +from google.cloud.bigquery import _pandas_helpers from google.cloud.bigquery.query import _query_param_from_api_repr from google.cloud.bigquery.query import ArrayQueryParameter from google.cloud.bigquery.query import ScalarQueryParameter @@ -1199,6 +1200,7 @@ def to_arrow( progress_bar_type=None, bqstorage_client=None, create_bqstorage_client=True, + max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, ): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1242,6 +1244,13 @@ def to_arrow( ..versionadded:: 1.24.0 + max_queue_size (Optional[int]): + The maximum number of result pages to hold in the internal queue when + streaming query results over the BigQuery Storage API. Ignored if + Storage API is not used. + + ..versionadded:: 2.14.0 + Returns: pyarrow.Table A :class:`pyarrow.Table` populated with row data and column @@ -1259,6 +1268,7 @@ def to_arrow( progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client, create_bqstorage_client=create_bqstorage_client, + max_queue_size=max_queue_size, ) # If changing the signature of this method, make sure to apply the same @@ -1270,6 +1280,7 @@ def to_dataframe( progress_bar_type=None, create_bqstorage_client=True, date_as_object=True, + max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, ): """Return a pandas DataFrame from a QueryJob @@ -1316,6 +1327,13 @@ def to_dataframe( ..versionadded:: 1.26.0 + max_queue_size (Optional[int]): + The maximum number of result pages to hold in the internal queue when + streaming query results over the BigQuery Storage API. Ignored if + Storage API is not used. + + ..versionadded:: 2.14.0 + Returns: A :class:`~pandas.DataFrame` populated with row data and column headers from the query results. The column headers are derived @@ -1331,6 +1349,7 @@ def to_dataframe( progress_bar_type=progress_bar_type, create_bqstorage_client=create_bqstorage_client, date_as_object=date_as_object, + max_queue_size=max_queue_size, ) def __iter__(self): diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index a2366b806..64c7ee8a4 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -1498,7 +1498,11 @@ def _to_page_iterable( for item in tabledata_list_download(): yield item - def _to_arrow_iterable(self, bqstorage_client=None): + def _to_arrow_iterable( + self, + bqstorage_client=None, + max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, + ): """Create an iterable of arrow RecordBatches, to process the table as a stream.""" bqstorage_download = functools.partial( _pandas_helpers.download_arrow_bqstorage, @@ -1507,6 +1511,7 @@ def _to_arrow_iterable(self, bqstorage_client=None): bqstorage_client, preserve_order=self._preserve_order, selected_fields=self._selected_fields, + max_queue_size=max_queue_size, ) tabledata_list_download = functools.partial( _pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema @@ -1524,6 +1529,7 @@ def to_arrow( progress_bar_type=None, bqstorage_client=None, create_bqstorage_client=True, + max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, ): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1567,6 +1573,13 @@ def to_arrow( ..versionadded:: 1.24.0 + max_queue_size (Optional[int]): + The maximum number of result pages to hold in the internal queue when + streaming query results over the BigQuery Storage API. Ignored if + Storage API is not used. + + ..versionadded:: 2.14.0 + Returns: pyarrow.Table A :class:`pyarrow.Table` populated with row data and column @@ -1597,7 +1610,7 @@ def to_arrow( record_batches = [] for record_batch in self._to_arrow_iterable( - bqstorage_client=bqstorage_client + bqstorage_client=bqstorage_client, max_queue_size=max_queue_size, ): record_batches.append(record_batch) @@ -1622,7 +1635,12 @@ def to_arrow( arrow_schema = _pandas_helpers.bq_to_arrow_schema(self._schema) return pyarrow.Table.from_batches(record_batches, schema=arrow_schema) - def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): + def to_dataframe_iterable( + self, + bqstorage_client=None, + dtypes=None, + max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, + ): """Create an iterable of pandas DataFrames, to process the table as a stream. Args: @@ -1642,6 +1660,13 @@ def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): ``dtype`` is used when constructing the series for the column specified. Otherwise, the default pandas behavior is used. + max_queue_size (Optional[int]): + The maximum number of result pages to hold in the internal queue when + streaming query results over the BigQuery Storage API. Ignored if + Storage API is not used. + + ..versionadded:: 2.14.0 + Returns: pandas.DataFrame: A generator of :class:`~pandas.DataFrame`. @@ -1665,6 +1690,7 @@ def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): dtypes, preserve_order=self._preserve_order, selected_fields=self._selected_fields, + max_queue_size=max_queue_size, ) tabledata_list_download = functools.partial( _pandas_helpers.download_dataframe_row_iterator, @@ -1687,6 +1713,7 @@ def to_dataframe( progress_bar_type=None, create_bqstorage_client=True, date_as_object=True, + max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, ): """Create a pandas DataFrame by loading all pages of a query. @@ -1742,6 +1769,13 @@ def to_dataframe( ..versionadded:: 1.26.0 + max_queue_size (Optional[int]): + The maximum number of result pages to hold in the internal queue when + streaming query results over the BigQuery Storage API. Ignored if + Storage API is not used. + + ..versionadded:: 2.14.0 + Returns: pandas.DataFrame: A :class:`~pandas.DataFrame` populated with row data and column @@ -1768,6 +1802,7 @@ def to_dataframe( progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client, create_bqstorage_client=create_bqstorage_client, + max_queue_size=max_queue_size, ) # When converting timestamp values to nanosecond precision, the result @@ -1815,6 +1850,7 @@ def to_arrow( progress_bar_type=None, bqstorage_client=None, create_bqstorage_client=True, + max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, ): """[Beta] Create an empty class:`pyarrow.Table`. @@ -1822,6 +1858,7 @@ def to_arrow( progress_bar_type (str): Ignored. Added for compatibility with RowIterator. bqstorage_client (Any): Ignored. Added for compatibility with RowIterator. create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. + max_queue_size (int): Ignored. Added for compatibility with RowIterator. Returns: pyarrow.Table: An empty :class:`pyarrow.Table`. @@ -1837,6 +1874,7 @@ def to_dataframe( progress_bar_type=None, create_bqstorage_client=True, date_as_object=True, + max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, ): """Create an empty dataframe. @@ -1846,6 +1884,7 @@ def to_dataframe( progress_bar_type (Any): Ignored. Added for compatibility with RowIterator. create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. date_as_object (bool): Ignored. Added for compatibility with RowIterator. + max_queue_size (int): Ignored. Added for compatibility with RowIterator. Returns: pandas.DataFrame: An empty :class:`~pandas.DataFrame`. diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index abd725820..bf965d132 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -17,6 +17,7 @@ import decimal import functools import operator +import queue import warnings import mock @@ -1265,6 +1266,45 @@ def test_dataframe_to_parquet_dict_sequence_schema(module_under_test): assert schema_arg == expected_schema_arg +def test__download_table_bqstorage_bounded_queue_size(module_under_test): + from google.cloud.bigquery import dataset + from google.cloud.bigquery import table + + table_ref = table.TableReference( + dataset.DatasetReference("project-x", "dataset-y"), "table-z", + ) + + bigquery_storage = pytest.importorskip( + "google.cloud.bigquery_storage", + reason="Requires BigQuery Storage dependency." + ) + bqstorage_client = mock.create_autospec( + bigquery_storage.BigQueryReadClient, instance=True + ) + fake_session = mock.Mock(streams=["stream/s1", "stream/s2", "stream/s3"]) + bqstorage_client.create_read_session.return_value = fake_session + + def fake_download_stream( + download_state, bqstorage_client, session, stream, worker_queue, page_to_item + ): + worker_queue.put_nowait("result") + + download_stream = mock.Mock(side_effect=fake_download_stream) + + with pytest.raises(queue.Full), mock.patch.object( + module_under_test, "_download_table_bqstorage_stream", new=download_stream + ): + result_gen = module_under_test._download_table_bqstorage( + "some-project", table_ref, bqstorage_client, max_queue_size=2, + ) + list(result_gen) + + # Timing-safe, as the method under test should block until the pool shutdown is + # complete, at which point all download stream workers have already been submitted + # to the thread pool. + assert download_stream.call_count == 3 + + @pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") def test_download_arrow_row_iterator_unknown_field_type(module_under_test): fake_page = api_core.page_iterator.Page( From 70553bc4314a78774cafda796e13267d1ffe8ca4 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 24 Mar 2021 17:34:58 +0100 Subject: [PATCH 2/5] Slightly simplify bits of page streaming logic --- google/cloud/bigquery/_pandas_helpers.py | 9 +++------ google/cloud/bigquery/table.py | 13 ++++++------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 8da42dc83..1920110de 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -703,15 +703,12 @@ def _download_table_bqstorage( continue # Return any remaining values after the workers finished. - while not worker_queue.empty(): # pragma: NO COVER + while True: # pragma: NO COVER try: - # Include a timeout because even though the queue is - # non-empty, it doesn't guarantee that a subsequent call to - # get() will not block. - frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) + frame = worker_queue.get_nowait() yield frame except queue.Empty: # pragma: NO COVER - continue + break finally: # No need for a lock because reading/replacing a variable is # defined to be an atomic operation in the Python language diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 64c7ee8a4..fa5b11f37 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -1490,13 +1490,12 @@ def _to_page_iterable( if not self._validate_bqstorage(bqstorage_client, False): bqstorage_client = None - if bqstorage_client is not None: - for item in bqstorage_download(): - yield item - return - - for item in tabledata_list_download(): - yield item + result_pages = ( + bqstorage_download() + if bqstorage_client is not None + else tabledata_list_download() + ) + yield from result_pages def _to_arrow_iterable( self, From 19215dc522ef1011e6223f6b595c46a10f92d6bd Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 31 Mar 2021 20:28:11 +0200 Subject: [PATCH 3/5] Only retain max_queue_size where most relevant --- google/cloud/bigquery/_pandas_helpers.py | 8 +------- google/cloud/bigquery/job/query.py | 19 ------------------- google/cloud/bigquery/table.py | 23 ++--------------------- 3 files changed, 3 insertions(+), 47 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 1920110de..81b33c7fa 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -721,12 +721,7 @@ def _download_table_bqstorage( def download_arrow_bqstorage( - project_id, - table, - bqstorage_client, - preserve_order=False, - selected_fields=None, - max_queue_size=_DEFAULT_MAX_QUEUE_SIZE, + project_id, table, bqstorage_client, preserve_order=False, selected_fields=None, ): return _download_table_bqstorage( project_id, @@ -735,7 +730,6 @@ def download_arrow_bqstorage( preserve_order=preserve_order, selected_fields=selected_fields, page_to_item=_bqstorage_page_to_arrow, - max_queue_size=max_queue_size, ) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 690ec9f47..491983f8e 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -28,7 +28,6 @@ from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration from google.cloud.bigquery.external_config import ExternalConfig from google.cloud.bigquery import _helpers -from google.cloud.bigquery import _pandas_helpers from google.cloud.bigquery.query import _query_param_from_api_repr from google.cloud.bigquery.query import ArrayQueryParameter from google.cloud.bigquery.query import ScalarQueryParameter @@ -1200,7 +1199,6 @@ def to_arrow( progress_bar_type=None, bqstorage_client=None, create_bqstorage_client=True, - max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, ): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1244,13 +1242,6 @@ def to_arrow( ..versionadded:: 1.24.0 - max_queue_size (Optional[int]): - The maximum number of result pages to hold in the internal queue when - streaming query results over the BigQuery Storage API. Ignored if - Storage API is not used. - - ..versionadded:: 2.14.0 - Returns: pyarrow.Table A :class:`pyarrow.Table` populated with row data and column @@ -1268,7 +1259,6 @@ def to_arrow( progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client, create_bqstorage_client=create_bqstorage_client, - max_queue_size=max_queue_size, ) # If changing the signature of this method, make sure to apply the same @@ -1280,7 +1270,6 @@ def to_dataframe( progress_bar_type=None, create_bqstorage_client=True, date_as_object=True, - max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, ): """Return a pandas DataFrame from a QueryJob @@ -1327,13 +1316,6 @@ def to_dataframe( ..versionadded:: 1.26.0 - max_queue_size (Optional[int]): - The maximum number of result pages to hold in the internal queue when - streaming query results over the BigQuery Storage API. Ignored if - Storage API is not used. - - ..versionadded:: 2.14.0 - Returns: A :class:`~pandas.DataFrame` populated with row data and column headers from the query results. The column headers are derived @@ -1349,7 +1331,6 @@ def to_dataframe( progress_bar_type=progress_bar_type, create_bqstorage_client=create_bqstorage_client, date_as_object=date_as_object, - max_queue_size=max_queue_size, ) def __iter__(self): diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index fa5b11f37..e8d3721b7 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -1497,11 +1497,7 @@ def _to_page_iterable( ) yield from result_pages - def _to_arrow_iterable( - self, - bqstorage_client=None, - max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, - ): + def _to_arrow_iterable(self, bqstorage_client=None): """Create an iterable of arrow RecordBatches, to process the table as a stream.""" bqstorage_download = functools.partial( _pandas_helpers.download_arrow_bqstorage, @@ -1510,7 +1506,6 @@ def _to_arrow_iterable( bqstorage_client, preserve_order=self._preserve_order, selected_fields=self._selected_fields, - max_queue_size=max_queue_size, ) tabledata_list_download = functools.partial( _pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema @@ -1528,7 +1523,6 @@ def to_arrow( progress_bar_type=None, bqstorage_client=None, create_bqstorage_client=True, - max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, ): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1572,13 +1566,6 @@ def to_arrow( ..versionadded:: 1.24.0 - max_queue_size (Optional[int]): - The maximum number of result pages to hold in the internal queue when - streaming query results over the BigQuery Storage API. Ignored if - Storage API is not used. - - ..versionadded:: 2.14.0 - Returns: pyarrow.Table A :class:`pyarrow.Table` populated with row data and column @@ -1609,7 +1596,7 @@ def to_arrow( record_batches = [] for record_batch in self._to_arrow_iterable( - bqstorage_client=bqstorage_client, max_queue_size=max_queue_size, + bqstorage_client=bqstorage_client ): record_batches.append(record_batch) @@ -1712,7 +1699,6 @@ def to_dataframe( progress_bar_type=None, create_bqstorage_client=True, date_as_object=True, - max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, ): """Create a pandas DataFrame by loading all pages of a query. @@ -1801,7 +1787,6 @@ def to_dataframe( progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client, create_bqstorage_client=create_bqstorage_client, - max_queue_size=max_queue_size, ) # When converting timestamp values to nanosecond precision, the result @@ -1849,7 +1834,6 @@ def to_arrow( progress_bar_type=None, bqstorage_client=None, create_bqstorage_client=True, - max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, ): """[Beta] Create an empty class:`pyarrow.Table`. @@ -1857,7 +1841,6 @@ def to_arrow( progress_bar_type (str): Ignored. Added for compatibility with RowIterator. bqstorage_client (Any): Ignored. Added for compatibility with RowIterator. create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. - max_queue_size (int): Ignored. Added for compatibility with RowIterator. Returns: pyarrow.Table: An empty :class:`pyarrow.Table`. @@ -1873,7 +1856,6 @@ def to_dataframe( progress_bar_type=None, create_bqstorage_client=True, date_as_object=True, - max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, ): """Create an empty dataframe. @@ -1883,7 +1865,6 @@ def to_dataframe( progress_bar_type (Any): Ignored. Added for compatibility with RowIterator. create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. date_as_object (bool): Ignored. Added for compatibility with RowIterator. - max_queue_size (int): Ignored. Added for compatibility with RowIterator. Returns: pandas.DataFrame: An empty :class:`~pandas.DataFrame`. From 85929ed32599ceb586d189a84faf545affc9f63f Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 31 Mar 2021 22:43:42 +0200 Subject: [PATCH 4/5] Adjust tests, add support for infinite queue size --- google/cloud/bigquery/_pandas_helpers.py | 17 +++++--- google/cloud/bigquery/table.py | 6 ++- tests/unit/test__pandas_helpers.py | 52 ++++++++++++++++++------ 3 files changed, 55 insertions(+), 20 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 81b33c7fa..e3567743a 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -45,7 +45,7 @@ _PROGRESS_INTERVAL = 0.2 # Maximum time between download status checks, in seconds. -_DEFAULT_MAX_QUEUE_SIZE = 1 # max queue size for BQ Storage downloads +_MAX_QUEUE_SIZE_DEFAULT = object() # max queue size sentinel for BQ Storage downloads _PANDAS_DTYPE_TO_BQ = { "bool": "BOOLEAN", @@ -610,7 +610,7 @@ def _download_table_bqstorage( preserve_order=False, selected_fields=None, page_to_item=None, - max_queue_size=_DEFAULT_MAX_QUEUE_SIZE, + max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, ): """Use (faster, but billable) BQ Storage API to construct DataFrame.""" @@ -658,10 +658,15 @@ def _download_table_bqstorage( # Create a queue to collect frames as they are created in each thread. # - # The queue needs to be bounded, because if the user code processes the fetched - # result pages to slowly, but at the same time new pages are rapidly being fetched - # from the server, the queue can grow to the point where the process runs + # The queue needs to be bounded by default, because if the user code processes the + # fetched result pages too slowly, while at the same time new pages are rapidly being + # fetched from the server, the queue can grow to the point where the process runs # out of memory. + if max_queue_size is _MAX_QUEUE_SIZE_DEFAULT: + max_queue_size = total_streams + elif max_queue_size is None: + max_queue_size = 0 # unbounded + worker_queue = queue.Queue(maxsize=max_queue_size) with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool: @@ -741,7 +746,7 @@ def download_dataframe_bqstorage( dtypes, preserve_order=False, selected_fields=None, - max_queue_size=_DEFAULT_MAX_QUEUE_SIZE, + max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, ): page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) return _download_table_bqstorage( diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index e8d3721b7..e2b572c6f 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -1625,7 +1625,7 @@ def to_dataframe_iterable( self, bqstorage_client=None, dtypes=None, - max_queue_size=_pandas_helpers._DEFAULT_MAX_QUEUE_SIZE, + max_queue_size=_pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, ): """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -1651,6 +1651,10 @@ def to_dataframe_iterable( streaming query results over the BigQuery Storage API. Ignored if Storage API is not used. + By default, the max queue size is set to the number of BQ Storage streams + created by the server. If ``max_queue_size`` is :data:`None`, the queue + size is infinite. + ..versionadded:: 2.14.0 Returns: diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index bf965d132..43692f4af 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -42,6 +42,11 @@ from google.cloud.bigquery import schema from google.cloud.bigquery._pandas_helpers import _BIGNUMERIC_SUPPORT +try: + from google.cloud import bigquery_storage +except ImportError: # pragma: NO COVER + bigquery_storage = None + skip_if_no_bignumeric = pytest.mark.skipif( not _BIGNUMERIC_SUPPORT, reason="BIGNUMERIC support requires pyarrow>=3.0.0", @@ -1266,43 +1271,64 @@ def test_dataframe_to_parquet_dict_sequence_schema(module_under_test): assert schema_arg == expected_schema_arg -def test__download_table_bqstorage_bounded_queue_size(module_under_test): +@pytest.mark.parametrize( + "stream_count,maxsize_kwarg,expected_call_count,expected_maxsize", + [ + (3, {"max_queue_size": 2}, 3, 2), # custom queue size + (4, {}, 4, 4), # default queue size + (7, {"max_queue_size": None}, 7, 0), # infinite queue size + ], +) +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`" +) +def test__download_table_bqstorage( + module_under_test, + stream_count, + maxsize_kwarg, + expected_call_count, + expected_maxsize, +): from google.cloud.bigquery import dataset from google.cloud.bigquery import table - table_ref = table.TableReference( - dataset.DatasetReference("project-x", "dataset-y"), "table-z", - ) + queue_used = None # A reference to the queue used by code under test. - bigquery_storage = pytest.importorskip( - "google.cloud.bigquery_storage", - reason="Requires BigQuery Storage dependency." - ) bqstorage_client = mock.create_autospec( bigquery_storage.BigQueryReadClient, instance=True ) - fake_session = mock.Mock(streams=["stream/s1", "stream/s2", "stream/s3"]) + fake_session = mock.Mock(streams=["stream/s{i}" for i in range(stream_count)]) bqstorage_client.create_read_session.return_value = fake_session + table_ref = table.TableReference( + dataset.DatasetReference("project-x", "dataset-y"), "table-z", + ) + def fake_download_stream( download_state, bqstorage_client, session, stream, worker_queue, page_to_item ): - worker_queue.put_nowait("result") + nonlocal queue_used + queue_used = worker_queue + try: + worker_queue.put_nowait("result_page") + except queue.Full: # pragma: NO COVER + pass download_stream = mock.Mock(side_effect=fake_download_stream) - with pytest.raises(queue.Full), mock.patch.object( + with mock.patch.object( module_under_test, "_download_table_bqstorage_stream", new=download_stream ): result_gen = module_under_test._download_table_bqstorage( - "some-project", table_ref, bqstorage_client, max_queue_size=2, + "some-project", table_ref, bqstorage_client, **maxsize_kwarg ) list(result_gen) # Timing-safe, as the method under test should block until the pool shutdown is # complete, at which point all download stream workers have already been submitted # to the thread pool. - assert download_stream.call_count == 3 + assert download_stream.call_count == stream_count # once for each stream + assert queue_used.maxsize == expected_maxsize @pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") From 489fa101824071c5a7a78ca52a211b95b89594b0 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 31 Mar 2021 22:57:29 +0200 Subject: [PATCH 5/5] Remove deleted param's description --- google/cloud/bigquery/table.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index e2b572c6f..bd5bca30f 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -1758,13 +1758,6 @@ def to_dataframe( ..versionadded:: 1.26.0 - max_queue_size (Optional[int]): - The maximum number of result pages to hold in the internal queue when - streaming query results over the BigQuery Storage API. Ignored if - Storage API is not used. - - ..versionadded:: 2.14.0 - Returns: pandas.DataFrame: A :class:`~pandas.DataFrame` populated with row data and column