Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use pyarrow stream compression, if available #593

Merged
merged 4 commits into from Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -33,6 +33,14 @@
except ImportError: # pragma: NO COVER
pyarrow = None

try:
from google.cloud.bigquery_storage import ArrowSerializationOptions
except ImportError:
_ARROW_COMPRESSION_SUPPORT = False
else:
# Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
_ARROW_COMPRESSION_SUPPORT = True

from google.cloud.bigquery import schema


Expand Down Expand Up @@ -631,6 +639,11 @@ def _download_table_bqstorage(
for field in selected_fields:
requested_session.read_options.selected_fields.append(field.name)

if _ARROW_COMPRESSION_SUPPORT:
requested_session.read_options.arrow_serialization_options.buffer_compression = (
ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
)

session = bqstorage_client.create_read_session(
parent="projects/{}".format(project_id),
read_session=requested_session,
Expand Down
14 changes: 14 additions & 0 deletions google/cloud/bigquery/dbapi/cursor.py
Expand Up @@ -19,6 +19,14 @@
import copy
import logging

try:
from google.cloud.bigquery_storage import ArrowSerializationOptions
except ImportError:
_ARROW_COMPRESSION_SUPPORT = False
else:
# Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
_ARROW_COMPRESSION_SUPPORT = True

from google.cloud.bigquery import job
from google.cloud.bigquery.dbapi import _helpers
from google.cloud.bigquery.dbapi import exceptions
Expand Down Expand Up @@ -255,6 +263,12 @@ def _bqstorage_fetch(self, bqstorage_client):
table=table_reference.to_bqstorage(),
data_format=bigquery_storage.types.DataFormat.ARROW,
)

if _ARROW_COMPRESSION_SUPPORT:
requested_session.read_options.arrow_serialization_options.buffer_compression = (
ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
)

read_session = bqstorage_client.create_read_session(
parent="projects/{}".format(table_reference.project),
read_session=requested_session,
Expand Down
8 changes: 0 additions & 8 deletions tests/system/test_client.py
Expand Up @@ -28,7 +28,6 @@

import psutil
import pytest
import pkg_resources

from google.cloud.bigquery._pandas_helpers import _BIGNUMERIC_SUPPORT
from . import helpers
Expand Down Expand Up @@ -116,13 +115,6 @@
(TooManyRequests, InternalServerError, ServiceUnavailable)
)

PYARROW_MINIMUM_VERSION = pkg_resources.parse_version("0.17.0")

if pyarrow:
PYARROW_INSTALLED_VERSION = pkg_resources.get_distribution("pyarrow").parsed_version
else:
PYARROW_INSTALLED_VERSION = None

MTLS_TESTING = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE") == "true"


Expand Down
78 changes: 72 additions & 6 deletions tests/unit/job/test_query_pandas.py
Expand Up @@ -41,6 +41,22 @@
from .helpers import _make_job_resource


@pytest.fixture
def table_read_options_kwarg():
# Create a BigQuery Storage table read options object with pyarrow compression
# enabled if a recent-enough version of google-cloud-bigquery-storage dependency is
# installed to support the compression.
if not hasattr(bigquery_storage, "ArrowSerializationOptions"):
return {}

read_options = bigquery_storage.ReadSession.TableReadOptions(
arrow_serialization_options=bigquery_storage.ArrowSerializationOptions(
buffer_compression=bigquery_storage.ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
)
)
return {"read_options": read_options}


@pytest.mark.parametrize(
"query,expected",
(
Expand Down Expand Up @@ -82,7 +98,7 @@ def test__contains_order_by(query, expected):
"SelecT name, age froM table OrdeR \n\t BY other_column;",
),
)
def test_to_dataframe_bqstorage_preserve_order(query):
def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg):
from google.cloud.bigquery.job import QueryJob as target_class

job_resource = _make_job_resource(
Expand Down Expand Up @@ -123,8 +139,10 @@ def test_to_dataframe_bqstorage_preserve_order(query):
destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
**job_resource["configuration"]["query"]["destinationTable"]
)
expected_session = bigquery_storage.types.ReadSession(
table=destination_table, data_format=bigquery_storage.types.DataFormat.ARROW,
expected_session = bigquery_storage.ReadSession(
table=destination_table,
data_format=bigquery_storage.DataFormat.ARROW,
**table_read_options_kwarg,
)
bqstorage_client.create_read_session.assert_called_once_with(
parent="projects/test-project",
Expand Down Expand Up @@ -431,7 +449,7 @@ def test_to_dataframe_ddl_query():
@pytest.mark.skipif(
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_bqstorage():
def test_to_dataframe_bqstorage(table_read_options_kwarg):
from google.cloud.bigquery.job import QueryJob as target_class

resource = _make_job_resource(job_type="query", ended=True)
Expand Down Expand Up @@ -468,8 +486,10 @@ def test_to_dataframe_bqstorage():
destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
**resource["configuration"]["query"]["destinationTable"]
)
expected_session = bigquery_storage.types.ReadSession(
table=destination_table, data_format=bigquery_storage.types.DataFormat.ARROW,
expected_session = bigquery_storage.ReadSession(
table=destination_table,
data_format=bigquery_storage.DataFormat.ARROW,
**table_read_options_kwarg,
)
bqstorage_client.create_read_session.assert_called_once_with(
parent=f"projects/{client.project}",
Expand All @@ -478,6 +498,52 @@ def test_to_dataframe_bqstorage():
)


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_bqstorage_no_pyarrow_compression():
from google.cloud.bigquery.job import QueryJob as target_class

resource = _make_job_resource(job_type="query", ended=True)
query_resource = {
"jobComplete": True,
"jobReference": resource["jobReference"],
"totalRows": "4",
"schema": {"fields": [{"name": "name", "type": "STRING", "mode": "NULLABLE"}]},
}
connection = _make_connection(query_resource)
client = _make_client(connection=connection)
job = target_class.from_api_repr(resource, client)
bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
session = bigquery_storage.types.ReadSession()
session.avro_schema.schema = json.dumps(
{
"type": "record",
"name": "__root__",
"fields": [{"name": "name", "type": ["null", "string"]}],
}
)
bqstorage_client.create_read_session.return_value = session

with mock.patch(
"google.cloud.bigquery._pandas_helpers._ARROW_COMPRESSION_SUPPORT", new=False
):
job.to_dataframe(bqstorage_client=bqstorage_client)

destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
**resource["configuration"]["query"]["destinationTable"]
)
expected_session = bigquery_storage.ReadSession(
table=destination_table, data_format=bigquery_storage.DataFormat.ARROW,
)
bqstorage_client.create_read_session.assert_called_once_with(
parent=f"projects/{client.project}",
read_session=expected_session,
max_stream_count=0,
)


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_to_dataframe_column_dtypes():
from google.cloud.bigquery.job import QueryJob as target_class
Expand Down
47 changes: 47 additions & 0 deletions tests/unit/test_dbapi_cursor.py
Expand Up @@ -123,6 +123,7 @@ def _mock_job(
schema=schema,
num_dml_affected_rows=num_dml_affected_rows,
)
mock_job.destination.project = "P"
mock_job.destination.to_bqstorage.return_value = (
"projects/P/datasets/DS/tables/T"
)
Expand Down Expand Up @@ -380,6 +381,52 @@ def test_fetchall_w_bqstorage_client_fetch_error_no_fallback(self):
# the default client was not used
mock_client.list_rows.assert_not_called()

@unittest.skipIf(
bigquery_storage is None, "Requires `google-cloud-bigquery-storage`"
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_fetchall_w_bqstorage_client_no_arrow_compression(self):
from google.cloud.bigquery import dbapi
from google.cloud.bigquery import table

# Use unordered data to also test any non-determenistic key order in dicts.
row_data = [table.Row([1.2, 1.1], {"bar": 1, "foo": 0})]
bqstorage_streamed_rows = [{"bar": _to_pyarrow(1.2), "foo": _to_pyarrow(1.1)}]

mock_client = self._mock_client(rows=row_data)
mock_bqstorage_client = self._mock_bqstorage_client(
stream_count=1, rows=bqstorage_streamed_rows,
)

connection = dbapi.connect(
client=mock_client, bqstorage_client=mock_bqstorage_client,
)
cursor = connection.cursor()
cursor.execute("SELECT foo, bar FROM some_table")

with mock.patch(
"google.cloud.bigquery.dbapi.cursor._ARROW_COMPRESSION_SUPPORT", new=False
):
rows = cursor.fetchall()

mock_client.list_rows.assert_not_called() # The default client was not used.

# Check the BQ Storage session config.
expected_session = bigquery_storage.ReadSession(
table="projects/P/datasets/DS/tables/T",
data_format=bigquery_storage.DataFormat.ARROW,
)
mock_bqstorage_client.create_read_session.assert_called_once_with(
parent="projects/P", read_session=expected_session, max_stream_count=1
)

# Check the data returned.
field_value = op.itemgetter(1)
sorted_row_data = [sorted(row.items(), key=field_value) for row in rows]
expected_row_data = [[("foo", 1.1), ("bar", 1.2)]]

self.assertEqual(sorted_row_data, expected_row_data)

def test_execute_custom_job_id(self):
from google.cloud.bigquery.dbapi import connect

Expand Down