Skip to content

Commit

Permalink
deps!: BigQuery Storage and pyarrow are required dependencies (#776)
Browse files Browse the repository at this point in the history
* process: make BQ Storage and pyarrow required

* Make pyarrow required in _pandas_helpers.py

* Make pyarrow required in client.py

* Make pyarrow required in table.py

* Make pyarrow required in job/query.py

* Make pyarrow required in DBAPI tests

* Make pyarrow required in snippets tests

* Make BQ storage required in client.py

* Make BQ storage required in table.py

* Make BQ storage required in DB API tests

* Make BQ storage required in magics.py

* Make BQ storage required in test__helpers.py

* Make BQ storage required in test__pandas_helpers.py

* Make BQ storage required in test_query_pandas.py

* Make method signatures compatible again

The annotations caused a mismatch

* Remove checks for minimum BQ Storage version

Since this is now a required dependency, there should not be any more
pip quirks that used to allow installing BQ Storage as an extra, but
without always respecting its minimum version pin.

* Remove LegacyBigQueryStorageError

Since it will be released in a major version bump, we can make this
a breaking change, i.e. without deprecation.

* Bump minimum pyarrow version to 3.0.0

* Remove unneeded pytest.importorskip for BQ Storage

* Remove pyarrow version checks in pandas helpers tests

* Conditionally skip pandas tests where needed

* Remove unneeded conditional pyarrow version paths

* Cover schema autodetect failed code path in test

* fix bad merge

Co-authored-by: Tim Swast <swast@google.com>
  • Loading branch information
plamut and tswast committed Jul 27, 2021
1 parent c293e3c commit 9cd7554
Show file tree
Hide file tree
Showing 22 changed files with 196 additions and 998 deletions.
4 changes: 0 additions & 4 deletions docs/snippets.py
Expand Up @@ -30,10 +30,6 @@
import pandas
except (ImportError, AttributeError):
pandas = None
try:
import pyarrow
except (ImportError, AttributeError):
pyarrow = None

from google.api_core.exceptions import InternalServerError
from google.api_core.exceptions import ServiceUnavailable
Expand Down
3 changes: 0 additions & 3 deletions google/cloud/bigquery/__init__.py
Expand Up @@ -42,7 +42,6 @@
from google.cloud.bigquery.enums import KeyResultStatementKind
from google.cloud.bigquery.enums import SqlTypeNames
from google.cloud.bigquery.enums import StandardSqlDataTypes
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.external_config import BigtableOptions
from google.cloud.bigquery.external_config import BigtableColumnFamily
Expand Down Expand Up @@ -171,8 +170,6 @@
"WriteDisposition",
# EncryptionConfiguration
"EncryptionConfiguration",
# Custom exceptions
"LegacyBigQueryStorageError",
]


Expand Down
26 changes: 0 additions & 26 deletions google/cloud/bigquery/_helpers.py
Expand Up @@ -28,8 +28,6 @@
from google.cloud._helpers import _to_bytes
import packaging.version

from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError


_RFC3339_MICROS_NO_ZULU = "%Y-%m-%dT%H:%M:%S.%f"
_TIMEONLY_WO_MICROS = "%H:%M:%S"
Expand All @@ -41,7 +39,6 @@
re.VERBOSE,
)

_MIN_BQ_STORAGE_VERSION = packaging.version.Version("2.0.0")
_BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION = packaging.version.Version("2.6.0")


Expand Down Expand Up @@ -75,29 +72,6 @@ def is_read_session_optional(self) -> bool:
"""
return self.installed_version >= _BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION

def verify_version(self):
"""Verify that a recent enough version of BigQuery Storage extra is
installed.
The function assumes that google-cloud-bigquery-storage extra is
installed, and should thus be used in places where this assumption
holds.
Because `pip` can install an outdated version of this extra despite the
constraints in `setup.py`, the calling code can use this helper to
verify the version compatibility at runtime.
Raises:
LegacyBigQueryStorageError:
If the google-cloud-bigquery-storage package is outdated.
"""
if self.installed_version < _MIN_BQ_STORAGE_VERSION:
msg = (
"Dependency google-cloud-bigquery-storage is outdated, please upgrade "
f"it to version >= 2.0.0 (version found: {self.installed_version})."
)
raise LegacyBigQueryStorageError(msg)


BQ_STORAGE_VERSIONS = BQStorageVersions()

Expand Down
122 changes: 48 additions & 74 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -20,18 +20,13 @@
import queue
import warnings

from packaging import version

try:
import pandas
except ImportError: # pragma: NO COVER
pandas = None

try:
import pyarrow
import pyarrow.parquet
except ImportError: # pragma: NO COVER
pyarrow = None
import pyarrow
import pyarrow.parquet

try:
from google.cloud.bigquery_storage import ArrowSerializationOptions
Expand Down Expand Up @@ -106,63 +101,52 @@ def pyarrow_timestamp():
return pyarrow.timestamp("us", tz="UTC")


if pyarrow:
# This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py
# When modifying it be sure to update it there as well.
BQ_TO_ARROW_SCALARS = {
"BOOL": pyarrow.bool_,
"BOOLEAN": pyarrow.bool_,
"BYTES": pyarrow.binary,
"DATE": pyarrow.date32,
"DATETIME": pyarrow_datetime,
"FLOAT": pyarrow.float64,
"FLOAT64": pyarrow.float64,
"GEOGRAPHY": pyarrow.string,
"INT64": pyarrow.int64,
"INTEGER": pyarrow.int64,
"NUMERIC": pyarrow_numeric,
"STRING": pyarrow.string,
"TIME": pyarrow_time,
"TIMESTAMP": pyarrow_timestamp,
}
ARROW_SCALAR_IDS_TO_BQ = {
# https://arrow.apache.org/docs/python/api/datatypes.html#type-classes
pyarrow.bool_().id: "BOOL",
pyarrow.int8().id: "INT64",
pyarrow.int16().id: "INT64",
pyarrow.int32().id: "INT64",
pyarrow.int64().id: "INT64",
pyarrow.uint8().id: "INT64",
pyarrow.uint16().id: "INT64",
pyarrow.uint32().id: "INT64",
pyarrow.uint64().id: "INT64",
pyarrow.float16().id: "FLOAT64",
pyarrow.float32().id: "FLOAT64",
pyarrow.float64().id: "FLOAT64",
pyarrow.time32("ms").id: "TIME",
pyarrow.time64("ns").id: "TIME",
pyarrow.timestamp("ns").id: "TIMESTAMP",
pyarrow.date32().id: "DATE",
pyarrow.date64().id: "DATETIME", # because millisecond resolution
pyarrow.binary().id: "BYTES",
pyarrow.string().id: "STRING", # also alias for pyarrow.utf8()
# The exact scale and precision don't matter, see below.
pyarrow.decimal128(38, scale=9).id: "NUMERIC",
}

if version.parse(pyarrow.__version__) >= version.parse("3.0.0"):
BQ_TO_ARROW_SCALARS["BIGNUMERIC"] = pyarrow_bignumeric
# The exact decimal's scale and precision are not important, as only
# the type ID matters, and it's the same for all decimal256 instances.
ARROW_SCALAR_IDS_TO_BQ[pyarrow.decimal256(76, scale=38).id] = "BIGNUMERIC"
_BIGNUMERIC_SUPPORT = True
else:
_BIGNUMERIC_SUPPORT = False

else: # pragma: NO COVER
BQ_TO_ARROW_SCALARS = {} # pragma: NO COVER
ARROW_SCALAR_IDS_TO_BQ = {} # pragma: NO_COVER
_BIGNUMERIC_SUPPORT = False # pragma: NO COVER
# This dictionary is duplicated in bigquery_storage/test/unite/test_reader.py
# When modifying it be sure to update it there as well.
BQ_TO_ARROW_SCALARS = {
"BIGNUMERIC": pyarrow_bignumeric,
"BOOL": pyarrow.bool_,
"BOOLEAN": pyarrow.bool_,
"BYTES": pyarrow.binary,
"DATE": pyarrow.date32,
"DATETIME": pyarrow_datetime,
"FLOAT": pyarrow.float64,
"FLOAT64": pyarrow.float64,
"GEOGRAPHY": pyarrow.string,
"INT64": pyarrow.int64,
"INTEGER": pyarrow.int64,
"NUMERIC": pyarrow_numeric,
"STRING": pyarrow.string,
"TIME": pyarrow_time,
"TIMESTAMP": pyarrow_timestamp,
}
ARROW_SCALAR_IDS_TO_BQ = {
# https://arrow.apache.org/docs/python/api/datatypes.html#type-classes
pyarrow.bool_().id: "BOOL",
pyarrow.int8().id: "INT64",
pyarrow.int16().id: "INT64",
pyarrow.int32().id: "INT64",
pyarrow.int64().id: "INT64",
pyarrow.uint8().id: "INT64",
pyarrow.uint16().id: "INT64",
pyarrow.uint32().id: "INT64",
pyarrow.uint64().id: "INT64",
pyarrow.float16().id: "FLOAT64",
pyarrow.float32().id: "FLOAT64",
pyarrow.float64().id: "FLOAT64",
pyarrow.time32("ms").id: "TIME",
pyarrow.time64("ns").id: "TIME",
pyarrow.timestamp("ns").id: "TIMESTAMP",
pyarrow.date32().id: "DATE",
pyarrow.date64().id: "DATETIME", # because millisecond resolution
pyarrow.binary().id: "BYTES",
pyarrow.string().id: "STRING", # also alias for pyarrow.utf8()
# The exact scale and precision don't matter, see below.
pyarrow.decimal128(38, scale=9).id: "NUMERIC",
# The exact decimal's scale and precision are not important, as only
# the type ID matters, and it's the same for all decimal256 instances.
pyarrow.decimal256(76, scale=38).id: "BIGNUMERIC",
}


def bq_to_arrow_struct_data_type(field):
Expand Down Expand Up @@ -346,13 +330,6 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
# If schema detection was not successful for all columns, also try with
# pyarrow, if available.
if unknown_type_fields:
if not pyarrow:
msg = u"Could not determine the type of columns: {}".format(
", ".join(field.name for field in unknown_type_fields)
)
warnings.warn(msg)
return None # We cannot detect the schema in full.

# The augment_schema() helper itself will also issue unknown type
# warnings if detection still fails for any of the fields.
bq_schema_out = augment_schema(dataframe, bq_schema_out)
Expand Down Expand Up @@ -494,9 +471,6 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
serializing method. Defaults to "SNAPPY".
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
"""
if pyarrow is None:
raise ValueError("pyarrow is required for BigQuery schema conversion.")

bq_schema = schema._to_schema_fields(bq_schema)
arrow_table = dataframe_to_arrow(dataframe, bq_schema)
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)
Expand Down
67 changes: 7 additions & 60 deletions google/cloud/bigquery/client.py
Expand Up @@ -27,19 +27,11 @@
import json
import math
import os
import packaging.version
import tempfile
from typing import Any, BinaryIO, Dict, Iterable, Optional, Sequence, Tuple, Union
import uuid
import warnings

try:
import pyarrow

_PYARROW_VERSION = packaging.version.parse(pyarrow.__version__)
except ImportError: # pragma: NO COVER
pyarrow = None

from google import resumable_media # type: ignore
from google.resumable_media.requests import MultipartUpload
from google.resumable_media.requests import ResumableUpload
Expand All @@ -53,26 +45,21 @@
from google.cloud import exceptions # pytype: disable=import-error
from google.cloud.client import ClientWithProject # pytype: disable=import-error

try:
from google.cloud.bigquery_storage_v1.services.big_query_read.client import (
DEFAULT_CLIENT_INFO as DEFAULT_BQSTORAGE_CLIENT_INFO,
)
except ImportError:
DEFAULT_BQSTORAGE_CLIENT_INFO = None
from google.cloud.bigquery_storage_v1.services.big_query_read.client import (
DEFAULT_CLIENT_INFO as DEFAULT_BQSTORAGE_CLIENT_INFO,
)

from google.cloud.bigquery._helpers import _del_sub_prop
from google.cloud.bigquery._helpers import _get_sub_prop
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._helpers import BQ_STORAGE_VERSIONS
from google.cloud.bigquery._helpers import _verify_job_config_type
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetListItem
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.enums import AutoRowIDs
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.opentelemetry_tracing import create_span
from google.cloud.bigquery import job
from google.cloud.bigquery.job import (
Expand Down Expand Up @@ -121,9 +108,6 @@
# https://github.com/googleapis/python-bigquery/issues/438
_MIN_GET_QUERY_RESULTS_TIMEOUT = 120

# https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414
_PYARROW_BAD_VERSIONS = frozenset([packaging.version.Version("2.0.0")])


class Project(object):
"""Wrapper for resource describing a BigQuery project.
Expand Down Expand Up @@ -483,17 +467,10 @@ def _ensure_bqstorage_client(
) -> Optional["google.cloud.bigquery_storage.BigQueryReadClient"]:
"""Create a BigQuery Storage API client using this client's credentials.
If a client cannot be created due to a missing or outdated dependency
`google-cloud-bigquery-storage`, raise a warning and return ``None``.
If the `bqstorage_client` argument is not ``None``, still perform the version
check and return the argument back to the caller if the check passes. If it
fails, raise a warning and return ``None``.
Args:
bqstorage_client:
An existing BigQuery Storage client instance to check for version
compatibility. If ``None``, a new instance is created and returned.
An existing BigQuery Storage client instance. If ``None``, a new
instance is created and returned.
client_options:
Custom options used with a new BigQuery Storage client instance if one
is created.
Expand All @@ -504,20 +481,7 @@ def _ensure_bqstorage_client(
Returns:
A BigQuery Storage API client.
"""
try:
from google.cloud import bigquery_storage
except ImportError:
warnings.warn(
"Cannot create BigQuery Storage client, the dependency "
"google-cloud-bigquery-storage is not installed."
)
return None

try:
BQ_STORAGE_VERSIONS.verify_version()
except LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
return None
from google.cloud import bigquery_storage

if bqstorage_client is None:
bqstorage_client = bigquery_storage.BigQueryReadClient(
Expand Down Expand Up @@ -2496,7 +2460,7 @@ def load_table_from_dataframe(
:attr:`~google.cloud.bigquery.job.LoadJobConfig.schema` with
column names matching those of the dataframe. The BigQuery
schema is used to determine the correct data type conversion.
Indexes are not loaded. Requires the :mod:`pyarrow` library.
Indexes are not loaded.
By default, this method uses the parquet source format. To
override this, supply a value for
Expand Down Expand Up @@ -2526,9 +2490,6 @@ def load_table_from_dataframe(
google.cloud.bigquery.job.LoadJob: A new load job.
Raises:
ValueError:
If a usable parquet engine cannot be found. This method
requires :mod:`pyarrow` to be installed.
TypeError:
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
class.
Expand Down Expand Up @@ -2556,10 +2517,6 @@ def load_table_from_dataframe(
)
)

if pyarrow is None and job_config.source_format == job.SourceFormat.PARQUET:
# pyarrow is now the only supported parquet engine.
raise ValueError("This method requires pyarrow to be installed")

if location is None:
location = self.location

Expand Down Expand Up @@ -2615,16 +2572,6 @@ def load_table_from_dataframe(
try:

if job_config.source_format == job.SourceFormat.PARQUET:
if _PYARROW_VERSION in _PYARROW_BAD_VERSIONS:
msg = (
"Loading dataframe data in PARQUET format with pyarrow "
f"{_PYARROW_VERSION} can result in data corruption. It is "
"therefore *strongly* advised to use a different pyarrow "
"version or a different source format. "
"See: https://github.com/googleapis/python-bigquery/issues/781"
)
warnings.warn(msg, category=RuntimeWarning)

if job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()
Expand Down

0 comments on commit 9cd7554

Please sign in to comment.