Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use pyarrow stream compression, if available #593

Merged
merged 4 commits into from Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -33,6 +33,18 @@
except ImportError: # pragma: NO COVER
pyarrow = None

try:
from google.cloud.bigquery_storage import ArrowSerializationOptions
except ImportError:
_ARROW_COMPRESSION_SUPPORT = False
else:
import pkg_resources

# Having BQ Storage available implies that pyarrow is available, too.
_ARROW_COMPRESSION_SUPPORT = pkg_resources.get_distribution(
"pyarrow"
).parsed_version >= pkg_resources.parse_version("1.0.0")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need pyarrow version check?

The bqstorage extra already pins the minimum pyarrow version to 1.0.0, thus I suppose if somebody somehow installs a less recent version, it can be considered an error on the user side?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me (removing this check). Since all of our extras require > 1.0.0, I'm okay with failing.

Counterpoint though is: what error will they get when pyarrow is too low? I wonder if we should file an FR to check for minimum versions and throw nicer errors? It might help with some of the issues like #556. This would mean keeping minimum versions in sync with 3 locations: setup.py, constraints.txt, and version_check.py, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new pip dependency resolver (announcement) is significantly stricter, thus I suppose this will become a no-issue once its use is widespread enough and/or becomes a default. It shouldn't even be possible to install an incompatible pyarrow version with it.

Yes, we can open a FR to discuss there if covering this presumably corner-case scenario in the transitional period is worth doing, considering the problem with keeping the min versions in sync.


from google.cloud.bigquery import schema


Expand Down Expand Up @@ -631,6 +643,11 @@ def _download_table_bqstorage(
for field in selected_fields:
requested_session.read_options.selected_fields.append(field.name)

if _ARROW_COMPRESSION_SUPPORT:
requested_session.read_options.arrow_serialization_options.buffer_compression = (
ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
)

session = bqstorage_client.create_read_session(
parent="projects/{}".format(project_id),
read_session=requested_session,
Expand Down
18 changes: 18 additions & 0 deletions google/cloud/bigquery/dbapi/cursor.py
Expand Up @@ -19,6 +19,18 @@
import copy
import logging

try:
from google.cloud.bigquery_storage import ArrowSerializationOptions
except ImportError:
_ARROW_COMPRESSION_SUPPORT = False
else:
import pkg_resources

# Having BQ Storage available implies that pyarrow is available, too.
_ARROW_COMPRESSION_SUPPORT = pkg_resources.get_distribution(
"pyarrow"
).parsed_version >= pkg_resources.parse_version("1.0.0")

from google.cloud.bigquery import job
from google.cloud.bigquery.dbapi import _helpers
from google.cloud.bigquery.dbapi import exceptions
Expand Down Expand Up @@ -255,6 +267,12 @@ def _bqstorage_fetch(self, bqstorage_client):
table=table_reference.to_bqstorage(),
data_format=bigquery_storage.types.DataFormat.ARROW,
)

if _ARROW_COMPRESSION_SUPPORT:
requested_session.read_options.arrow_serialization_options.buffer_compression = (
ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
)

read_session = bqstorage_client.create_read_session(
parent="projects/{}".format(table_reference.project),
read_session=requested_session,
Expand Down
70 changes: 66 additions & 4 deletions tests/unit/job/test_query_pandas.py
Expand Up @@ -123,8 +123,16 @@ def test_to_dataframe_bqstorage_preserve_order(query):
destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
**job_resource["configuration"]["query"]["destinationTable"]
)
expected_session = bigquery_storage.types.ReadSession(
table=destination_table, data_format=bigquery_storage.types.DataFormat.ARROW,

read_options = bigquery_storage.ReadSession.TableReadOptions(
arrow_serialization_options=bigquery_storage.ArrowSerializationOptions(
buffer_compression=bigquery_storage.ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
)
)
expected_session = bigquery_storage.ReadSession(
table=destination_table,
data_format=bigquery_storage.DataFormat.ARROW,
read_options=read_options,
)
bqstorage_client.create_read_session.assert_called_once_with(
parent="projects/test-project",
Expand Down Expand Up @@ -468,8 +476,16 @@ def test_to_dataframe_bqstorage():
destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
**resource["configuration"]["query"]["destinationTable"]
)
expected_session = bigquery_storage.types.ReadSession(
table=destination_table, data_format=bigquery_storage.types.DataFormat.ARROW,

read_options = bigquery_storage.ReadSession.TableReadOptions(
arrow_serialization_options=bigquery_storage.ArrowSerializationOptions(
buffer_compression=bigquery_storage.ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
)
)
expected_session = bigquery_storage.ReadSession(
table=destination_table,
data_format=bigquery_storage.DataFormat.ARROW,
read_options=read_options,
)
bqstorage_client.create_read_session.assert_called_once_with(
parent=f"projects/{client.project}",
Expand All @@ -478,6 +494,52 @@ def test_to_dataframe_bqstorage():
)


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_bqstorage_no_pyarrow_compression():
from google.cloud.bigquery.job import QueryJob as target_class

resource = _make_job_resource(job_type="query", ended=True)
query_resource = {
"jobComplete": True,
"jobReference": resource["jobReference"],
"totalRows": "4",
"schema": {"fields": [{"name": "name", "type": "STRING", "mode": "NULLABLE"}]},
}
connection = _make_connection(query_resource)
client = _make_client(connection=connection)
job = target_class.from_api_repr(resource, client)
bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
session = bigquery_storage.types.ReadSession()
session.avro_schema.schema = json.dumps(
{
"type": "record",
"name": "__root__",
"fields": [{"name": "name", "type": ["null", "string"]}],
}
)
bqstorage_client.create_read_session.return_value = session

with mock.patch(
"google.cloud.bigquery._pandas_helpers._ARROW_COMPRESSION_SUPPORT", new=False
):
job.to_dataframe(bqstorage_client=bqstorage_client)

destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format(
**resource["configuration"]["query"]["destinationTable"]
)
expected_session = bigquery_storage.ReadSession(
table=destination_table, data_format=bigquery_storage.DataFormat.ARROW,
)
bqstorage_client.create_read_session.assert_called_once_with(
parent=f"projects/{client.project}",
read_session=expected_session,
max_stream_count=0,
)


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_to_dataframe_column_dtypes():
from google.cloud.bigquery.job import QueryJob as target_class
Expand Down
47 changes: 47 additions & 0 deletions tests/unit/test_dbapi_cursor.py
Expand Up @@ -123,6 +123,7 @@ def _mock_job(
schema=schema,
num_dml_affected_rows=num_dml_affected_rows,
)
mock_job.destination.project = "P"
mock_job.destination.to_bqstorage.return_value = (
"projects/P/datasets/DS/tables/T"
)
Expand Down Expand Up @@ -380,6 +381,52 @@ def test_fetchall_w_bqstorage_client_fetch_error_no_fallback(self):
# the default client was not used
mock_client.list_rows.assert_not_called()

@unittest.skipIf(
bigquery_storage is None, "Requires `google-cloud-bigquery-storage`"
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_fetchall_w_bqstorage_client_no_arrow_compression(self):
from google.cloud.bigquery import dbapi
from google.cloud.bigquery import table

# Use unordered data to also test any non-determenistic key order in dicts.
row_data = [table.Row([1.2, 1.1], {"bar": 1, "foo": 0})]
bqstorage_streamed_rows = [{"bar": _to_pyarrow(1.2), "foo": _to_pyarrow(1.1)}]

mock_client = self._mock_client(rows=row_data)
mock_bqstorage_client = self._mock_bqstorage_client(
stream_count=1, rows=bqstorage_streamed_rows,
)

connection = dbapi.connect(
client=mock_client, bqstorage_client=mock_bqstorage_client,
)
cursor = connection.cursor()
cursor.execute("SELECT foo, bar FROM some_table")

with mock.patch(
"google.cloud.bigquery.dbapi.cursor._ARROW_COMPRESSION_SUPPORT", new=False
):
rows = cursor.fetchall()

mock_client.list_rows.assert_not_called() # The default client was not used.

# Check the BQ Storage session config.
expected_session = bigquery_storage.ReadSession(
table="projects/P/datasets/DS/tables/T",
data_format=bigquery_storage.DataFormat.ARROW,
)
mock_bqstorage_client.create_read_session.assert_called_once_with(
parent="projects/P", read_session=expected_session, max_stream_count=1
)

# Check the data returned.
field_value = op.itemgetter(1)
sorted_row_data = [sorted(row.items(), key=field_value) for row in rows]
expected_row_data = [[("foo", 1.1), ("bar", 1.2)]]

self.assertEqual(sorted_row_data, expected_row_data)

def test_execute_custom_job_id(self):
from google.cloud.bigquery.dbapi import connect

Expand Down