Skip to content

Commit

Permalink
refactor: omit read_session with latest google-cloud-bigquery-stora…
Browse files Browse the repository at this point in the history
…ge (#748)

* refactor: omit `read_session` with latest google-cloud-bigquery-storage

`read_session` is unnecessary as of `google-cloud-bigquery-storage>=2.6.0`.
This will allow us to more loudly deprecate the use of `rows(read_session)`.

Rather than require 2.6.0, version switches will allow us to keep our
requirements range wider. Will want to give this version some time to bake
before making it required.

* optimize _verify_bq_storage_version

* fix failing tests due to optimization

* fix unit tests

* create BQStorageVersions class for version comparisons

* add type annotations

Also, use packaging directly, since that's all pkg_resources does
https://github.com/pypa/setuptools/blob/a4dbe3457d89cf67ee3aa571fdb149e6eb544e88/pkg_resources/__init__.py\#L112

* allow legacy versions

* fix coverage

* fix coverage

* add tests for version helpers
  • Loading branch information
tswast committed Jul 16, 2021
1 parent 36fe86f commit 4ff8bed
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 32 deletions.
74 changes: 54 additions & 20 deletions google/cloud/bigquery/_helpers.py
Expand Up @@ -26,7 +26,7 @@
from google.cloud._helpers import _RFC3339_MICROS
from google.cloud._helpers import _RFC3339_NO_FRACTION
from google.cloud._helpers import _to_bytes
import pkg_resources
import packaging.version

from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

Expand All @@ -41,31 +41,65 @@
re.VERBOSE,
)

_MIN_BQ_STORAGE_VERSION = pkg_resources.parse_version("2.0.0")
_MIN_BQ_STORAGE_VERSION = packaging.version.Version("2.0.0")
_BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION = packaging.version.Version("2.6.0")


def _verify_bq_storage_version():
"""Verify that a recent enough version of BigQuery Storage extra is installed.
class BQStorageVersions:
"""Version comparisons for google-cloud-bigqueyr-storage package."""

The function assumes that google-cloud-bigquery-storage extra is installed, and
should thus be used in places where this assumption holds.
def __init__(self):
self._installed_version = None

Because `pip` can install an outdated version of this extra despite the constraints
in setup.py, the the calling code can use this helper to verify the version
compatibility at runtime.
"""
from google.cloud import bigquery_storage
@property
def installed_version(self) -> packaging.version.Version:
"""Return the parsed version of google-cloud-bigquery-storage."""
if self._installed_version is None:
from google.cloud import bigquery_storage

installed_version = pkg_resources.parse_version(
getattr(bigquery_storage, "__version__", "legacy")
)
self._installed_version = packaging.version.parse(
# Use 0.0.0, since it is earlier than any released version.
# Legacy versions also have the same property, but
# creating a LegacyVersion has been deprecated.
# https://github.com/pypa/packaging/issues/321
getattr(bigquery_storage, "__version__", "0.0.0")
)

if installed_version < _MIN_BQ_STORAGE_VERSION:
msg = (
"Dependency google-cloud-bigquery-storage is outdated, please upgrade "
f"it to version >= 2.0.0 (version found: {installed_version})."
)
raise LegacyBigQueryStorageError(msg)
return self._installed_version

@property
def is_read_session_optional(self) -> bool:
"""True if read_session is optional to rows().
See: https://github.com/googleapis/python-bigquery-storage/pull/228
"""
return self.installed_version >= _BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION

def verify_version(self):
"""Verify that a recent enough version of BigQuery Storage extra is
installed.
The function assumes that google-cloud-bigquery-storage extra is
installed, and should thus be used in places where this assumption
holds.
Because `pip` can install an outdated version of this extra despite the
constraints in `setup.py`, the calling code can use this helper to
verify the version compatibility at runtime.
Raises:
LegacyBigQueryStorageError:
If the google-cloud-bigquery-storage package is outdated.
"""
if self.installed_version < _MIN_BQ_STORAGE_VERSION:
msg = (
"Dependency google-cloud-bigquery-storage is outdated, please upgrade "
f"it to version >= 2.0.0 (version found: {self.installed_version})."
)
raise LegacyBigQueryStorageError(msg)


BQ_STORAGE_VERSIONS = BQStorageVersions()


def _not_null(value, field):
Expand Down
10 changes: 9 additions & 1 deletion google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -41,6 +41,7 @@
# Having BQ Storage available implies that pyarrow >=1.0.0 is available, too.
_ARROW_COMPRESSION_SUPPORT = True

from google.cloud.bigquery import _helpers
from google.cloud.bigquery import schema


Expand Down Expand Up @@ -590,7 +591,14 @@ def _bqstorage_page_to_dataframe(column_names, dtypes, page):
def _download_table_bqstorage_stream(
download_state, bqstorage_client, session, stream, worker_queue, page_to_item
):
rowstream = bqstorage_client.read_rows(stream.name).rows(session)
reader = bqstorage_client.read_rows(stream.name)

# Avoid deprecation warnings for passing in unnecessary read session.
# https://github.com/googleapis/python-bigquery-storage/issues/229
if _helpers.BQ_STORAGE_VERSIONS.is_read_session_optional:
rowstream = reader.rows()
else:
rowstream = reader.rows(session)

for page in rowstream.pages:
if download_state.done:
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigquery/client.py
Expand Up @@ -61,7 +61,7 @@
from google.cloud.bigquery._helpers import _get_sub_prop
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._helpers import _verify_bq_storage_version
from google.cloud.bigquery._helpers import BQ_STORAGE_VERSIONS
from google.cloud.bigquery._helpers import _verify_job_config_type
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import _pandas_helpers
Expand Down Expand Up @@ -508,7 +508,7 @@ def _ensure_bqstorage_client(
return None

try:
_verify_bq_storage_version()
BQ_STORAGE_VERSIONS.verify_version()
except LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
return None
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/bigquery/table.py
Expand Up @@ -1565,7 +1565,7 @@ def _validate_bqstorage(self, bqstorage_client, create_bqstorage_client):
return False

try:
_helpers._verify_bq_storage_version()
_helpers.BQ_STORAGE_VERSIONS.verify_version()
except LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
return False
Expand Down
39 changes: 35 additions & 4 deletions tests/unit/test__helpers.py
Expand Up @@ -26,11 +26,17 @@


@unittest.skipIf(bigquery_storage is None, "Requires `google-cloud-bigquery-storage`")
class Test_verify_bq_storage_version(unittest.TestCase):
class TestBQStorageVersions(unittest.TestCase):
def _object_under_test(self):
from google.cloud.bigquery import _helpers

return _helpers.BQStorageVersions()

def _call_fut(self):
from google.cloud.bigquery._helpers import _verify_bq_storage_version
from google.cloud.bigquery import _helpers

return _verify_bq_storage_version()
_helpers.BQ_STORAGE_VERSIONS._installed_version = None
return _helpers.BQ_STORAGE_VERSIONS.verify_version()

def test_raises_no_error_w_recent_bqstorage(self):
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
Expand All @@ -53,10 +59,35 @@ def test_raises_error_w_unknown_bqstorage_version(self):

with mock.patch("google.cloud.bigquery_storage", autospec=True) as fake_module:
del fake_module.__version__
error_pattern = r"version found: legacy"
error_pattern = r"version found: 0.0.0"
with self.assertRaisesRegex(LegacyBigQueryStorageError, error_pattern):
self._call_fut()

def test_installed_version_returns_cached(self):
versions = self._object_under_test()
versions._installed_version = object()
assert versions.installed_version is versions._installed_version

def test_installed_version_returns_parsed_version(self):
versions = self._object_under_test()

with mock.patch("google.cloud.bigquery_storage.__version__", new="1.2.3"):
version = versions.installed_version

assert version.major == 1
assert version.minor == 2
assert version.micro == 3

def test_is_read_session_optional_true(self):
versions = self._object_under_test()
with mock.patch("google.cloud.bigquery_storage.__version__", new="2.6.0"):
assert versions.is_read_session_optional

def test_is_read_session_optional_false(self):
versions = self._object_under_test()
with mock.patch("google.cloud.bigquery_storage.__version__", new="2.5.0"):
assert not versions.is_read_session_optional


class Test_not_null(unittest.TestCase):
def _call_fut(self, value, field):
Expand Down
69 changes: 69 additions & 0 deletions tests/unit/test__pandas_helpers.py
Expand Up @@ -40,11 +40,14 @@
import pytz

from google import api_core
from google.cloud.bigquery import _helpers
from google.cloud.bigquery import schema
from google.cloud.bigquery._pandas_helpers import _BIGNUMERIC_SUPPORT

try:
from google.cloud import bigquery_storage

_helpers.BQ_STORAGE_VERSIONS.verify_version()
except ImportError: # pragma: NO COVER
bigquery_storage = None

Expand Down Expand Up @@ -1311,6 +1314,72 @@ def test_dataframe_to_parquet_dict_sequence_schema(module_under_test):
assert schema_arg == expected_schema_arg


@pytest.mark.skipif(
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test__download_table_bqstorage_stream_includes_read_session(
monkeypatch, module_under_test
):
import google.cloud.bigquery_storage_v1.reader
import google.cloud.bigquery_storage_v1.types

monkeypatch.setattr(_helpers.BQ_STORAGE_VERSIONS, "_installed_version", None)
monkeypatch.setattr(bigquery_storage, "__version__", "2.5.0")
bqstorage_client = mock.create_autospec(
bigquery_storage.BigQueryReadClient, instance=True
)
reader = mock.create_autospec(
google.cloud.bigquery_storage_v1.reader.ReadRowsStream, instance=True
)
bqstorage_client.read_rows.return_value = reader
session = google.cloud.bigquery_storage_v1.types.ReadSession()

module_under_test._download_table_bqstorage_stream(
module_under_test._DownloadState(),
bqstorage_client,
session,
google.cloud.bigquery_storage_v1.types.ReadStream(name="test"),
queue.Queue(),
mock.Mock(),
)

reader.rows.assert_called_once_with(session)


@pytest.mark.skipif(
bigquery_storage is None
or not _helpers.BQ_STORAGE_VERSIONS.is_read_session_optional,
reason="Requires `google-cloud-bigquery-storage` >= 2.6.0",
)
def test__download_table_bqstorage_stream_omits_read_session(
monkeypatch, module_under_test
):
import google.cloud.bigquery_storage_v1.reader
import google.cloud.bigquery_storage_v1.types

monkeypatch.setattr(_helpers.BQ_STORAGE_VERSIONS, "_installed_version", None)
monkeypatch.setattr(bigquery_storage, "__version__", "2.6.0")
bqstorage_client = mock.create_autospec(
bigquery_storage.BigQueryReadClient, instance=True
)
reader = mock.create_autospec(
google.cloud.bigquery_storage_v1.reader.ReadRowsStream, instance=True
)
bqstorage_client.read_rows.return_value = reader
session = google.cloud.bigquery_storage_v1.types.ReadSession()

module_under_test._download_table_bqstorage_stream(
module_under_test._DownloadState(),
bqstorage_client,
session,
google.cloud.bigquery_storage_v1.types.ReadStream(name="test"),
queue.Queue(),
mock.Mock(),
)

reader.rows.assert_called_once_with()


@pytest.mark.parametrize(
"stream_count,maxsize_kwarg,expected_call_count,expected_maxsize",
[
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_client.py
Expand Up @@ -663,7 +663,7 @@ def test_ensure_bqstorage_client_obsolete_dependency(self):
client = self._make_one(project=self.PROJECT, credentials=creds)

patcher = mock.patch(
"google.cloud.bigquery.client._verify_bq_storage_version",
"google.cloud.bigquery.client.BQ_STORAGE_VERSIONS.verify_version",
side_effect=LegacyBigQueryStorageError("BQ Storage too old"),
)
with patcher, warnings.catch_warnings(record=True) as warned:
Expand Down Expand Up @@ -700,7 +700,7 @@ def test_ensure_bqstorage_client_existing_client_check_fails(self):
mock_storage_client = mock.sentinel.mock_storage_client

patcher = mock.patch(
"google.cloud.bigquery.client._verify_bq_storage_version",
"google.cloud.bigquery.client.BQ_STORAGE_VERSIONS.verify_version",
side_effect=LegacyBigQueryStorageError("BQ Storage too old"),
)
with patcher, warnings.catch_warnings(record=True) as warned:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_magics.py
Expand Up @@ -368,7 +368,7 @@ def test__make_bqstorage_client_true_obsolete_dependency():
)

patcher = mock.patch(
"google.cloud.bigquery.client._verify_bq_storage_version",
"google.cloud.bigquery.client.BQ_STORAGE_VERSIONS.verify_version",
side_effect=LegacyBigQueryStorageError("BQ Storage too old"),
)
with patcher, warnings.catch_warnings(record=True) as warned:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_table.py
Expand Up @@ -1889,7 +1889,7 @@ def test__validate_bqstorage_returns_false_w_warning_if_obsolete_version(self):
iterator = self._make_one(first_page_response=None) # not cached

patcher = mock.patch(
"google.cloud.bigquery.table._helpers._verify_bq_storage_version",
"google.cloud.bigquery.table._helpers.BQ_STORAGE_VERSIONS.verify_version",
side_effect=LegacyBigQueryStorageError("BQ Storage too old"),
)
with patcher, warnings.catch_warnings(record=True) as warned:
Expand Down

0 comments on commit 4ff8bed

Please sign in to comment.