diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 7ad416e08..412f32754 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -33,6 +33,14 @@ except ImportError: # pragma: NO COVER pyarrow = None +try: + from google.cloud.bigquery_storage import ArrowSerializationOptions +except ImportError: + _ARROW_COMPRESSION_SUPPORT = False +else: + # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too. + _ARROW_COMPRESSION_SUPPORT = True + from google.cloud.bigquery import schema @@ -631,6 +639,11 @@ def _download_table_bqstorage( for field in selected_fields: requested_session.read_options.selected_fields.append(field.name) + if _ARROW_COMPRESSION_SUPPORT: + requested_session.read_options.arrow_serialization_options.buffer_compression = ( + ArrowSerializationOptions.CompressionCodec.LZ4_FRAME + ) + session = bqstorage_client.create_read_session( parent="projects/{}".format(project_id), read_session=requested_session, diff --git a/google/cloud/bigquery/dbapi/cursor.py b/google/cloud/bigquery/dbapi/cursor.py index e90bcc2c0..ee09158d8 100644 --- a/google/cloud/bigquery/dbapi/cursor.py +++ b/google/cloud/bigquery/dbapi/cursor.py @@ -19,6 +19,14 @@ import copy import logging +try: + from google.cloud.bigquery_storage import ArrowSerializationOptions +except ImportError: + _ARROW_COMPRESSION_SUPPORT = False +else: + # Having BQ Storage available implies that pyarrow >=1.0.0 is available, too. + _ARROW_COMPRESSION_SUPPORT = True + from google.cloud.bigquery import job from google.cloud.bigquery.dbapi import _helpers from google.cloud.bigquery.dbapi import exceptions @@ -255,6 +263,12 @@ def _bqstorage_fetch(self, bqstorage_client): table=table_reference.to_bqstorage(), data_format=bigquery_storage.types.DataFormat.ARROW, ) + + if _ARROW_COMPRESSION_SUPPORT: + requested_session.read_options.arrow_serialization_options.buffer_compression = ( + ArrowSerializationOptions.CompressionCodec.LZ4_FRAME + ) + read_session = bqstorage_client.create_read_session( parent="projects/{}".format(table_reference.project), read_session=requested_session, diff --git a/tests/system/test_client.py b/tests/system/test_client.py index 133f609a6..024441012 100644 --- a/tests/system/test_client.py +++ b/tests/system/test_client.py @@ -28,7 +28,6 @@ import psutil import pytest -import pkg_resources from google.cloud.bigquery._pandas_helpers import _BIGNUMERIC_SUPPORT from . import helpers @@ -116,13 +115,6 @@ (TooManyRequests, InternalServerError, ServiceUnavailable) ) -PYARROW_MINIMUM_VERSION = pkg_resources.parse_version("0.17.0") - -if pyarrow: - PYARROW_INSTALLED_VERSION = pkg_resources.get_distribution("pyarrow").parsed_version -else: - PYARROW_INSTALLED_VERSION = None - MTLS_TESTING = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE") == "true" diff --git a/tests/unit/job/test_query_pandas.py b/tests/unit/job/test_query_pandas.py index d1600ad43..0f9623203 100644 --- a/tests/unit/job/test_query_pandas.py +++ b/tests/unit/job/test_query_pandas.py @@ -41,6 +41,22 @@ from .helpers import _make_job_resource +@pytest.fixture +def table_read_options_kwarg(): + # Create a BigQuery Storage table read options object with pyarrow compression + # enabled if a recent-enough version of google-cloud-bigquery-storage dependency is + # installed to support the compression. + if not hasattr(bigquery_storage, "ArrowSerializationOptions"): + return {} + + read_options = bigquery_storage.ReadSession.TableReadOptions( + arrow_serialization_options=bigquery_storage.ArrowSerializationOptions( + buffer_compression=bigquery_storage.ArrowSerializationOptions.CompressionCodec.LZ4_FRAME + ) + ) + return {"read_options": read_options} + + @pytest.mark.parametrize( "query,expected", ( @@ -82,7 +98,7 @@ def test__contains_order_by(query, expected): "SelecT name, age froM table OrdeR \n\t BY other_column;", ), ) -def test_to_dataframe_bqstorage_preserve_order(query): +def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg): from google.cloud.bigquery.job import QueryJob as target_class job_resource = _make_job_resource( @@ -123,8 +139,10 @@ def test_to_dataframe_bqstorage_preserve_order(query): destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format( **job_resource["configuration"]["query"]["destinationTable"] ) - expected_session = bigquery_storage.types.ReadSession( - table=destination_table, data_format=bigquery_storage.types.DataFormat.ARROW, + expected_session = bigquery_storage.ReadSession( + table=destination_table, + data_format=bigquery_storage.DataFormat.ARROW, + **table_read_options_kwarg, ) bqstorage_client.create_read_session.assert_called_once_with( parent="projects/test-project", @@ -431,7 +449,7 @@ def test_to_dataframe_ddl_query(): @pytest.mark.skipif( bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`" ) -def test_to_dataframe_bqstorage(): +def test_to_dataframe_bqstorage(table_read_options_kwarg): from google.cloud.bigquery.job import QueryJob as target_class resource = _make_job_resource(job_type="query", ended=True) @@ -468,8 +486,10 @@ def test_to_dataframe_bqstorage(): destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format( **resource["configuration"]["query"]["destinationTable"] ) - expected_session = bigquery_storage.types.ReadSession( - table=destination_table, data_format=bigquery_storage.types.DataFormat.ARROW, + expected_session = bigquery_storage.ReadSession( + table=destination_table, + data_format=bigquery_storage.DataFormat.ARROW, + **table_read_options_kwarg, ) bqstorage_client.create_read_session.assert_called_once_with( parent=f"projects/{client.project}", @@ -478,6 +498,52 @@ def test_to_dataframe_bqstorage(): ) +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`" +) +def test_to_dataframe_bqstorage_no_pyarrow_compression(): + from google.cloud.bigquery.job import QueryJob as target_class + + resource = _make_job_resource(job_type="query", ended=True) + query_resource = { + "jobComplete": True, + "jobReference": resource["jobReference"], + "totalRows": "4", + "schema": {"fields": [{"name": "name", "type": "STRING", "mode": "NULLABLE"}]}, + } + connection = _make_connection(query_resource) + client = _make_client(connection=connection) + job = target_class.from_api_repr(resource, client) + bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient) + session = bigquery_storage.types.ReadSession() + session.avro_schema.schema = json.dumps( + { + "type": "record", + "name": "__root__", + "fields": [{"name": "name", "type": ["null", "string"]}], + } + ) + bqstorage_client.create_read_session.return_value = session + + with mock.patch( + "google.cloud.bigquery._pandas_helpers._ARROW_COMPRESSION_SUPPORT", new=False + ): + job.to_dataframe(bqstorage_client=bqstorage_client) + + destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format( + **resource["configuration"]["query"]["destinationTable"] + ) + expected_session = bigquery_storage.ReadSession( + table=destination_table, data_format=bigquery_storage.DataFormat.ARROW, + ) + bqstorage_client.create_read_session.assert_called_once_with( + parent=f"projects/{client.project}", + read_session=expected_session, + max_stream_count=0, + ) + + @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") def test_to_dataframe_column_dtypes(): from google.cloud.bigquery.job import QueryJob as target_class diff --git a/tests/unit/test_dbapi_cursor.py b/tests/unit/test_dbapi_cursor.py index cbd6f6909..0f44e3895 100644 --- a/tests/unit/test_dbapi_cursor.py +++ b/tests/unit/test_dbapi_cursor.py @@ -123,6 +123,7 @@ def _mock_job( schema=schema, num_dml_affected_rows=num_dml_affected_rows, ) + mock_job.destination.project = "P" mock_job.destination.to_bqstorage.return_value = ( "projects/P/datasets/DS/tables/T" ) @@ -380,6 +381,52 @@ def test_fetchall_w_bqstorage_client_fetch_error_no_fallback(self): # the default client was not used mock_client.list_rows.assert_not_called() + @unittest.skipIf( + bigquery_storage is None, "Requires `google-cloud-bigquery-storage`" + ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_fetchall_w_bqstorage_client_no_arrow_compression(self): + from google.cloud.bigquery import dbapi + from google.cloud.bigquery import table + + # Use unordered data to also test any non-determenistic key order in dicts. + row_data = [table.Row([1.2, 1.1], {"bar": 1, "foo": 0})] + bqstorage_streamed_rows = [{"bar": _to_pyarrow(1.2), "foo": _to_pyarrow(1.1)}] + + mock_client = self._mock_client(rows=row_data) + mock_bqstorage_client = self._mock_bqstorage_client( + stream_count=1, rows=bqstorage_streamed_rows, + ) + + connection = dbapi.connect( + client=mock_client, bqstorage_client=mock_bqstorage_client, + ) + cursor = connection.cursor() + cursor.execute("SELECT foo, bar FROM some_table") + + with mock.patch( + "google.cloud.bigquery.dbapi.cursor._ARROW_COMPRESSION_SUPPORT", new=False + ): + rows = cursor.fetchall() + + mock_client.list_rows.assert_not_called() # The default client was not used. + + # Check the BQ Storage session config. + expected_session = bigquery_storage.ReadSession( + table="projects/P/datasets/DS/tables/T", + data_format=bigquery_storage.DataFormat.ARROW, + ) + mock_bqstorage_client.create_read_session.assert_called_once_with( + parent="projects/P", read_session=expected_session, max_stream_count=1 + ) + + # Check the data returned. + field_value = op.itemgetter(1) + sorted_row_data = [sorted(row.items(), key=field_value) for row in rows] + expected_row_data = [[("foo", 1.1), ("bar", 1.2)]] + + self.assertEqual(sorted_row_data, expected_row_data) + def test_execute_custom_job_id(self): from google.cloud.bigquery.dbapi import connect