Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: detect obsolete BQ Storage extra at runtime #666

Merged
merged 8 commits into from May 20, 2021
3 changes: 3 additions & 0 deletions google/cloud/bigquery/__init__.py
Expand Up @@ -39,6 +39,7 @@
from google.cloud.bigquery import enums
from google.cloud.bigquery.enums import SqlTypeNames
from google.cloud.bigquery.enums import StandardSqlDataTypes
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.external_config import BigtableOptions
from google.cloud.bigquery.external_config import BigtableColumnFamily
Expand Down Expand Up @@ -152,6 +153,8 @@
"WriteDisposition",
# EncryptionConfiguration
"EncryptionConfiguration",
# Custom exceptions
"LegacyBigQueryStorageError",
]


Expand Down
30 changes: 30 additions & 0 deletions google/cloud/bigquery/_helpers.py
Expand Up @@ -25,6 +25,10 @@
from google.cloud._helpers import _RFC3339_MICROS
from google.cloud._helpers import _RFC3339_NO_FRACTION
from google.cloud._helpers import _to_bytes
import pkg_resources

from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError


_RFC3339_MICROS_NO_ZULU = "%Y-%m-%dT%H:%M:%S.%f"
_TIMEONLY_WO_MICROS = "%H:%M:%S"
Expand All @@ -36,6 +40,32 @@
re.VERBOSE,
)

_MIN_BQ_STORAGE_VERSION = pkg_resources.parse_version("2.0.0")


def _verify_bq_storage_version():
"""Verify that a recent enough version of BigQuery Storage extra is installed.

The function assumes that google-cloud-bigquery-storage extra is installed, and
should thus be used in places where this assumption holds.

Because `pip` can install an outdated version of this extra despite the constraints
in setup.py, the the calling code can use this helper to verify the version
compatibility at runtime.
"""
from google.cloud import bigquery_storage

installed_version = pkg_resources.parse_version(
getattr(bigquery_storage, "__version__", "legacy")
)

if installed_version < _MIN_BQ_STORAGE_VERSION:
msg = (
"Dependency google-cloud-bigquery-storage is outdated, please upgrade "
f"it to version >= 2.0.0 (version found: {installed_version})."
)
raise LegacyBigQueryStorageError(msg)


def _not_null(value, field):
"""Check whether 'value' should be coerced to 'field' type."""
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -42,6 +42,7 @@
_ARROW_COMPRESSION_SUPPORT = True

from google.cloud.bigquery import schema
from google.cloud.bigquery._helpers import _verify_bq_storage_version


_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -628,6 +629,8 @@ def _download_table_bqstorage(
# is available and can be imported.
from google.cloud import bigquery_storage

_verify_bq_storage_version()
plamut marked this conversation as resolved.
Show resolved Hide resolved

if "$" in table.table_id:
raise ValueError(
"Reading from a specific partition is not currently supported."
Expand Down
8 changes: 8 additions & 0 deletions google/cloud/bigquery/client.py
Expand Up @@ -54,12 +54,14 @@
from google.cloud.bigquery._helpers import _get_sub_prop
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._helpers import _verify_bq_storage_version
from google.cloud.bigquery._helpers import _verify_job_config_type
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetListItem
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.opentelemetry_tracing import create_span
from google.cloud.bigquery import job
from google.cloud.bigquery.job import (
Expand Down Expand Up @@ -464,6 +466,12 @@ def _create_bqstorage_client(self):
)
return None

try:
_verify_bq_storage_version()
plamut marked this conversation as resolved.
Show resolved Hide resolved
except LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
return None

return bigquery_storage.BigQueryReadClient(credentials=self._credentials)

def _dataset_from_arg(self, dataset):
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/bigquery/dbapi/cursor.py
Expand Up @@ -29,6 +29,7 @@
_ARROW_COMPRESSION_SUPPORT = True

from google.cloud.bigquery import job
from google.cloud.bigquery._helpers import _verify_bq_storage_version
from google.cloud.bigquery.dbapi import _helpers
from google.cloud.bigquery.dbapi import exceptions
import google.cloud.exceptions
Expand Down Expand Up @@ -280,6 +281,8 @@ def _bqstorage_fetch(self, bqstorage_client):
# bigquery_storage can indeed be imported here without errors.
from google.cloud import bigquery_storage

_verify_bq_storage_version()
plamut marked this conversation as resolved.
Show resolved Hide resolved

table_reference = self._query_job.destination

requested_session = bigquery_storage.types.ReadSession(
Expand Down
21 changes: 21 additions & 0 deletions google/cloud/bigquery/exceptions.py
@@ -0,0 +1,21 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class BigQueryError(Exception):
"""Base class for all custom exceptions defined by the BigQuery client."""


class LegacyBigQueryStorageError(BigQueryError):
"""Raised when too old a version of BigQuery Storage extra is detected at runtime."""
6 changes: 6 additions & 0 deletions google/cloud/bigquery/magics/magics.py
Expand Up @@ -777,6 +777,12 @@ def _make_bqstorage_client(use_bqstorage_api, credentials, client_options):
)
raise customized_error from err

try:
bigquery._helpers._verify_bq_storage_version()
except bigquery.LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
return None

plamut marked this conversation as resolved.
Show resolved Hide resolved
try:
from google.api_core.gapic_v1 import client_info as gapic_client_info
except ImportError as err:
Expand Down
12 changes: 12 additions & 0 deletions google/cloud/bigquery/table.py
Expand Up @@ -41,6 +41,7 @@
import google.cloud._helpers
from google.cloud.bigquery import _helpers
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.schema import _build_schema_resource
from google.cloud.bigquery.schema import _parse_schema_resource
from google.cloud.bigquery.schema import _to_schema_fields
Expand Down Expand Up @@ -1519,6 +1520,17 @@ def _validate_bqstorage(self, bqstorage_client, create_bqstorage_client):
)
return False

try:
from google.cloud import bigquery_storage # noqa: F401
except ImportError:
return False

try:
_helpers._verify_bq_storage_version()
except LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
return False

return True

def _get_next_page_response(self):
Expand Down
38 changes: 38 additions & 0 deletions tests/unit/test__helpers.py
Expand Up @@ -19,6 +19,44 @@

import mock

try:
from google.cloud import bigquery_storage
except ImportError: # pragma: NO COVER
bigquery_storage = None


@unittest.skipIf(bigquery_storage is None, "Requires `google-cloud-bigquery-storage`")
class Test_verify_bq_storage_version(unittest.TestCase):
def _call_fut(self):
from google.cloud.bigquery._helpers import _verify_bq_storage_version

return _verify_bq_storage_version()

def test_raises_no_error_w_recent_bqstorage(self):
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

with mock.patch("google.cloud.bigquery_storage.__version__", new="2.0.0"):
try:
self._call_fut()
except LegacyBigQueryStorageError: # pragma: NO COVER
self.fail("Legacy error raised with a non-legacy dependency version.")

def test_raises_error_w_legacy_bqstorage(self):
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

with mock.patch("google.cloud.bigquery_storage.__version__", new="1.9.9"):
with self.assertRaises(LegacyBigQueryStorageError):
self._call_fut()

def test_raises_error_w_unknown_bqstorage_version(self):
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

with mock.patch("google.cloud.bigquery_storage", autospec=True) as fake_module:
del fake_module.__version__
error_pattern = r"version found: legacy"
with self.assertRaisesRegex(LegacyBigQueryStorageError, error_pattern):
self._call_fut()


class Test_not_null(unittest.TestCase):
def _call_fut(self, value, field):
Expand Down
27 changes: 27 additions & 0 deletions tests/unit/test__pandas_helpers.py
Expand Up @@ -1331,6 +1331,33 @@ def fake_download_stream(
assert queue_used.maxsize == expected_maxsize


@pytest.mark.skipif(
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test__download_table_bqstorage_obsolete_version_error(module_under_test):
from google.cloud.bigquery import dataset
from google.cloud.bigquery import table
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

bqstorage_client = mock.create_autospec(
bigquery_storage.BigQueryReadClient, instance=True
)
table_ref = table.TableReference(
dataset.DatasetReference("project-x", "dataset-y"), "table-z",
)

patcher = mock.patch.object(
module_under_test,
"_verify_bq_storage_version",
side_effect=LegacyBigQueryStorageError,
)
with patcher, pytest.raises(LegacyBigQueryStorageError):
result_gen = module_under_test._download_table_bqstorage(
"some-project", table_ref, bqstorage_client
)
next(result_gen)


@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`")
def test_download_arrow_row_iterator_unknown_field_type(module_under_test):
fake_page = api_core.page_iterator.Page(
Expand Down
22 changes: 22 additions & 0 deletions tests/unit/test_client.py
Expand Up @@ -861,6 +861,28 @@ def fail_bqstorage_import(name, globals, locals, fromlist, level):
]
assert matching_warnings, "Missing dependency warning not raised."

@unittest.skipIf(
bigquery_storage is None, "Requires `google-cloud-bigquery-storage`"
)
def test_create_bqstorage_client_obsolete_dependency(self):
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

creds = _make_credentials()
client = self._make_one(project=self.PROJECT, credentials=creds)

patcher = mock.patch(
"google.cloud.bigquery.client._verify_bq_storage_version",
side_effect=LegacyBigQueryStorageError("BQ Storage too old"),
)
with patcher, warnings.catch_warnings(record=True) as warned:
bqstorage_client = client._create_bqstorage_client()

self.assertIsNone(bqstorage_client)
matching_warnings = [
warning for warning in warned if "BQ Storage too old" in str(warning)
]
assert matching_warnings, "Obsolete dependency warning not raised."

def test_create_routine_w_minimal_resource(self):
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineReference
Expand Down
33 changes: 33 additions & 0 deletions tests/unit/test_dbapi_cursor.py
Expand Up @@ -430,6 +430,39 @@ def test_fetchall_w_bqstorage_client_no_arrow_compression(self):

self.assertEqual(sorted_row_data, expected_row_data)

@unittest.skipIf(
bigquery_storage is None, "Requires `google-cloud-bigquery-storage`"
)
def test_fetchall_w_obsolete_bqstorage_client_error_no_fallback(self):
from google.cloud.bigquery import dbapi
from google.cloud.bigquery import table
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

row_data = [table.Row([1.1, 1.2], {"foo": 0, "bar": 1})]

mock_client = self._mock_client(rows=[])
mock_bqstorage_client = self._mock_bqstorage_client(
stream_count=1, rows=row_data,
)

connection = dbapi.connect(
client=mock_client, bqstorage_client=mock_bqstorage_client,
)
cursor = connection.cursor()
cursor.execute("SELECT foo, bar FROM some_table")

patcher = mock.patch(
"google.cloud.bigquery.dbapi.cursor._verify_bq_storage_version",
side_effect=LegacyBigQueryStorageError("BQ Storage too old"),
)
with patcher, self.assertRaisesRegex(
LegacyBigQueryStorageError, "BQ Storage too old"
):
cursor.fetchall()

# the default client was not used
mock_client.list_rows.assert_not_called()

def test_execute_custom_job_id(self):
from google.cloud.bigquery.dbapi import connect

Expand Down
25 changes: 25 additions & 0 deletions tests/unit/test_magics.py
Expand Up @@ -345,6 +345,31 @@ def test__make_bqstorage_client_true_raises_import_error(missing_bq_storage):
assert "pyarrow" in error_msg


@pytest.mark.skipif(
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test__make_bqstorage_client_true_obsolete_dependency():
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

credentials_mock = mock.create_autospec(
google.auth.credentials.Credentials, instance=True
)

patcher = mock.patch(
"google.cloud.bigquery._helpers._verify_bq_storage_version",
side_effect=LegacyBigQueryStorageError("BQ Storage too old"),
)
with patcher, warnings.catch_warnings(record=True) as warned:
got = magics._make_bqstorage_client(True, credentials_mock, {})

assert got is None

matching_warnings = [
warning for warning in warned if "BQ Storage too old" in str(warning)
]
assert matching_warnings, "Obsolete dependency warning not raised."


@pytest.mark.skipif(
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
)
Expand Down
24 changes: 24 additions & 0 deletions tests/unit/test_table.py
Expand Up @@ -1768,6 +1768,30 @@ def test__validate_bqstorage_returns_false_when_completely_cached(self):
)
)

@unittest.skipIf(
bigquery_storage is None, "Requires `google-cloud-bigquery-storage`"
)
def test__validate_bqstorage_returns_false_w_warning_if_obsolete_version(self):
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError

iterator = self._make_one(first_page_response=None) # not cached

patcher = mock.patch(
"google.cloud.bigquery.table._helpers._verify_bq_storage_version",
side_effect=LegacyBigQueryStorageError("BQ Storage too old"),
)
with patcher, warnings.catch_warnings(record=True) as warned:
result = iterator._validate_bqstorage(
bqstorage_client=None, create_bqstorage_client=True
)

self.assertFalse(result)

matching_warnings = [
warning for warning in warned if "BQ Storage too old" in str(warning)
]
assert matching_warnings, "Obsolete dependency warning not raised."

@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_to_arrow(self):
from google.cloud.bigquery.schema import SchemaField
Expand Down