Skip to content

Commit

Permalink
feat: use pyarrow stream compression, if available (#593)
Browse files Browse the repository at this point in the history
* feat: use pyarrow stream compression, if available

* Remove unnecessary pyarrow version check

Arrow stream compression requires pyarrow>=1.0.0, but that's already
guaranteed by a version pin in setup.py if bqstorage extra is
installed.

* Remvoe unused pyarrow version parsing in tests

* Only use arrow compression in tests if available
  • Loading branch information
plamut committed Apr 12, 2021
1 parent c8b5581 commit dde9dc5
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 14 deletions.
13 changes: 13 additions & 0 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -33,6 +33,14 @@
except ImportError: # pragma: NO COVER
pyarrow = None

try:
from google.cloud.bigquery_storage import ArrowSerializationOptions
except ImportError:
_ARROW_COMPRESSION_SUPPORT = False
else:
# Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
_ARROW_COMPRESSION_SUPPORT = True

from google.cloud.bigquery import schema


Expand Down Expand Up @@ -631,6 +639,11 @@ def _download_table_bqstorage(
for field in selected_fields:
requested_session.read_options.selected_fields.append(field.name)

if _ARROW_COMPRESSION_SUPPORT:
requested_session.read_options.arrow_serialization_options.buffer_compression = (
ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
)

session = bqstorage_client.create_read_session(
parent="projects/{}".format(project_id),
read_session=requested_session,
Expand Down
14 changes: 14 additions & 0 deletions google/cloud/bigquery/dbapi/cursor.py
Expand Up @@ -19,6 +19,14 @@
import copy
import logging

try:
from google.cloud.bigquery_storage import ArrowSerializationOptions
except ImportError:
_ARROW_COMPRESSION_SUPPORT = False
else:
# Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
_ARROW_COMPRESSION_SUPPORT = True

from google.cloud.bigquery import job
from google.cloud.bigquery.dbapi import _helpers
from google.cloud.bigquery.dbapi import exceptions
Expand Down Expand Up @@ -255,6 +263,12 @@ def _bqstorage_fetch(self, bqstorage_client):
table=table_reference.to_bqstorage(),
data_format=bigquery_storage.types.DataFormat.ARROW,
)

if _ARROW_COMPRESSION_SUPPORT:
requested_session.read_options.arrow_serialization_options.buffer_compression = (
ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
)

read_session = bqstorage_client.create_read_session(
parent="projects/{}".format(table_reference.project),
read_session=requested_session,
Expand Down
8 changes: 0 additions & 8 deletions tests/system/test_client.py
Expand Up @@ -28,7 +28,6 @@

import psutil
import pytest
import pkg_resources

from google.cloud.bigquery._pandas_helpers import _BIGNUMERIC_SUPPORT
from . import helpers
Expand Down Expand Up @@ -116,13 +115,6 @@
(TooManyRequests, InternalServerError, ServiceUnavailable)
)

PYARROW_MINIMUM_VERSION = pkg_resources.parse_version("0.17.0")

if pyarrow:
PYARROW_INSTALLED_VERSION = pkg_resources.get_distribution("pyarrow").parsed_version
else:
PYARROW_INSTALLED_VERSION = None

MTLS_TESTING = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE") == "true"


Expand Down
78 changes: 72 additions & 6 deletions tests/unit/job/test_query_pandas.py
Expand Up @@ -41,6 +41,22 @@
from .helpers import _make_job_resource


@pytest.fixture
def table_read_options_kwarg():
# Create a BigQuery Storage table read options object with pyarrow compression
# enabled if a recent-enough version of google-cloud-bigquery-storage dependency is
# installed to support the compression.
if not hasattr(bigquery_storage, "ArrowSerializationOptions"):
return {}

read_options = bigquery_storage.ReadSession.TableReadOptions(
arrow_serialization_options=bigquery_storage.ArrowSerializationOptions(
buffer_compression=bigquery_storage.ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
)
)
return {"read_options": read_options}


@pytest.mark.parametrize(
"query,expected",
(
Expand Down Expand Up @@ -82,7 +98,7 @@ def test__contains_order_by(query, expected):
"SelecT name, age froM table OrdeR \n\t BY other_column;",
),
)
def test_to_dataframe_bqstorage_preserve_order(query):
def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg):
from google.cloud.bigquery.job import QueryJob as target_class

job_resource = _make_job_resource(
Expand Down Expand Up @@ -123,8 +139,10 @@ def test_to_dataframe_bqstorage_preserve_order(query):
destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
**job_resource["configuration"]["query"]["destinationTable"]
)
expected_session = bigquery_storage.types.ReadSession(
table=destination_table, data_format=bigquery_storage.types.DataFormat.ARROW,
expected_session = bigquery_storage.ReadSession(
table=destination_table,
data_format=bigquery_storage.DataFormat.ARROW,
**table_read_options_kwarg,
)
bqstorage_client.create_read_session.assert_called_once_with(
parent="projects/test-project",
Expand Down Expand Up @@ -431,7 +449,7 @@ def test_to_dataframe_ddl_query():
@pytest.mark.skipif(
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_bqstorage():
def test_to_dataframe_bqstorage(table_read_options_kwarg):
from google.cloud.bigquery.job import QueryJob as target_class

resource = _make_job_resource(job_type="query", ended=True)
Expand Down Expand Up @@ -468,8 +486,10 @@ def test_to_dataframe_bqstorage():
destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
**resource["configuration"]["query"]["destinationTable"]
)
expected_session = bigquery_storage.types.ReadSession(
table=destination_table, data_format=bigquery_storage.types.DataFormat.ARROW,
expected_session = bigquery_storage.ReadSession(
table=destination_table,
data_format=bigquery_storage.DataFormat.ARROW,
**table_read_options_kwarg,
)
bqstorage_client.create_read_session.assert_called_once_with(
parent=f"projects/{client.project}",
Expand All @@ -478,6 +498,52 @@ def test_to_dataframe_bqstorage():
)


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_bqstorage_no_pyarrow_compression():
from google.cloud.bigquery.job import QueryJob as target_class

resource = _make_job_resource(job_type="query", ended=True)
query_resource = {
"jobComplete": True,
"jobReference": resource["jobReference"],
"totalRows": "4",
"schema": {"fields": [{"name": "name", "type": "STRING", "mode": "NULLABLE"}]},
}
connection = _make_connection(query_resource)
client = _make_client(connection=connection)
job = target_class.from_api_repr(resource, client)
bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
session = bigquery_storage.types.ReadSession()
session.avro_schema.schema = json.dumps(
{
"type": "record",
"name": "__root__",
"fields": [{"name": "name", "type": ["null", "string"]}],
}
)
bqstorage_client.create_read_session.return_value = session

with mock.patch(
"google.cloud.bigquery._pandas_helpers._ARROW_COMPRESSION_SUPPORT", new=False
):
job.to_dataframe(bqstorage_client=bqstorage_client)

destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
**resource["configuration"]["query"]["destinationTable"]
)
expected_session = bigquery_storage.ReadSession(
table=destination_table, data_format=bigquery_storage.DataFormat.ARROW,
)
bqstorage_client.create_read_session.assert_called_once_with(
parent=f"projects/{client.project}",
read_session=expected_session,
max_stream_count=0,
)


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_to_dataframe_column_dtypes():
from google.cloud.bigquery.job import QueryJob as target_class
Expand Down
47 changes: 47 additions & 0 deletions tests/unit/test_dbapi_cursor.py
Expand Up @@ -123,6 +123,7 @@ def _mock_job(
schema=schema,
num_dml_affected_rows=num_dml_affected_rows,
)
mock_job.destination.project = "P"
mock_job.destination.to_bqstorage.return_value = (
"projects/P/datasets/DS/tables/T"
)
Expand Down Expand Up @@ -380,6 +381,52 @@ def test_fetchall_w_bqstorage_client_fetch_error_no_fallback(self):
# the default client was not used
mock_client.list_rows.assert_not_called()

@unittest.skipIf(
bigquery_storage is None, "Requires `google-cloud-bigquery-storage`"
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_fetchall_w_bqstorage_client_no_arrow_compression(self):
from google.cloud.bigquery import dbapi
from google.cloud.bigquery import table

# Use unordered data to also test any non-determenistic key order in dicts.
row_data = [table.Row([1.2, 1.1], {"bar": 1, "foo": 0})]
bqstorage_streamed_rows = [{"bar": _to_pyarrow(1.2), "foo": _to_pyarrow(1.1)}]

mock_client = self._mock_client(rows=row_data)
mock_bqstorage_client = self._mock_bqstorage_client(
stream_count=1, rows=bqstorage_streamed_rows,
)

connection = dbapi.connect(
client=mock_client, bqstorage_client=mock_bqstorage_client,
)
cursor = connection.cursor()
cursor.execute("SELECT foo, bar FROM some_table")

with mock.patch(
"google.cloud.bigquery.dbapi.cursor._ARROW_COMPRESSION_SUPPORT", new=False
):
rows = cursor.fetchall()

mock_client.list_rows.assert_not_called() # The default client was not used.

# Check the BQ Storage session config.
expected_session = bigquery_storage.ReadSession(
table="projects/P/datasets/DS/tables/T",
data_format=bigquery_storage.DataFormat.ARROW,
)
mock_bqstorage_client.create_read_session.assert_called_once_with(
parent="projects/P", read_session=expected_session, max_stream_count=1
)

# Check the data returned.
field_value = op.itemgetter(1)
sorted_row_data = [sorted(row.items(), key=field_value) for row in rows]
expected_row_data = [[("foo", 1.1), ("bar", 1.2)]]

self.assertEqual(sorted_row_data, expected_row_data)

def test_execute_custom_job_id(self):
from google.cloud.bigquery.dbapi import connect

Expand Down

0 comments on commit dde9dc5

Please sign in to comment.