diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 645478ff6..f5f9d4a99 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -22,9 +22,9 @@ from six.moves import queue try: - from google.cloud import bigquery_storage_v1beta1 + from google.cloud import bigquery_storage_v1 except ImportError: # pragma: NO COVER - bigquery_storage_v1beta1 = None + bigquery_storage_v1 = None try: import pandas @@ -577,8 +577,19 @@ def _bqstorage_page_to_dataframe(column_names, dtypes, page): def _download_table_bqstorage_stream( download_state, bqstorage_client, session, stream, worker_queue, page_to_item ): - position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) - rowstream = bqstorage_client.read_rows(position).rows(session) + # Passing a BQ Storage client in implies that the BigQuery Storage library + # is available and can be imported. + from google.cloud import bigquery_storage_v1beta1 + + # We want to preserve comaptibility with the v1beta1 BQ Storage clients, + # thus adjust constructing the rowstream if needed. + # The assumption is that the caller provides a BQ Storage `session` that is + # compatible with the version of the BQ Storage client passed in. + if isinstance(bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient): + position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) + rowstream = bqstorage_client.read_rows(position).rows(session) + else: + rowstream = bqstorage_client.read_rows(stream.name).rows(session) for page in rowstream.pages: if download_state.done: @@ -610,6 +621,12 @@ def _download_table_bqstorage( page_to_item=None, ): """Use (faster, but billable) BQ Storage API to construct DataFrame.""" + + # Passing a BQ Storage client in implies that the BigQuery Storage library + # is available and can be imported. + from google.cloud import bigquery_storage_v1 + from google.cloud import bigquery_storage_v1beta1 + if "$" in table.table_id: raise ValueError( "Reading from a specific partition is not currently supported." @@ -617,22 +634,44 @@ def _download_table_bqstorage( if "@" in table.table_id: raise ValueError("Reading from a specific snapshot is not currently supported.") - read_options = bigquery_storage_v1beta1.types.TableReadOptions() - if selected_fields is not None: - for field in selected_fields: - read_options.selected_fields.append(field.name) - - requested_streams = 0 - if preserve_order: - requested_streams = 1 - - session = bqstorage_client.create_read_session( - table.to_bqstorage(), - "projects/{}".format(project_id), - format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, - read_options=read_options, - requested_streams=requested_streams, - ) + requested_streams = 1 if preserve_order else 0 + + # We want to preserve comaptibility with the v1beta1 BQ Storage clients, + # thus adjust the session creation if needed. + if isinstance(bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient): + warnings.warn( + "Support for BigQuery Storage v1beta1 clients is deprecated, please " + "consider upgrading the client to BigQuery Storage v1 stable version.", + category=DeprecationWarning, + ) + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + + if selected_fields is not None: + for field in selected_fields: + read_options.selected_fields.append(field.name) + + session = bqstorage_client.create_read_session( + table.to_bqstorage(v1beta1=True), + "projects/{}".format(project_id), + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, + read_options=read_options, + requested_streams=requested_streams, + ) + else: + requested_session = bigquery_storage_v1.types.ReadSession( + table=table.to_bqstorage(), + data_format=bigquery_storage_v1.enums.DataFormat.ARROW, + ) + if selected_fields is not None: + for field in selected_fields: + requested_session.read_options.selected_fields.append(field.name) + + session = bqstorage_client.create_read_session( + parent="projects/{}".format(project_id), + read_session=requested_session, + max_stream_count=requested_streams, + ) + _LOGGER.debug( "Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format( table.project, table.dataset_id, table.table_id, session.name diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index da5b30a35..8e265d971 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -397,15 +397,23 @@ def dataset(self, dataset_id, project=None): def _create_bqstorage_client(self): """Create a BigQuery Storage API client using this client's credentials. + If a client cannot be created due to missing dependencies, raise a + warning and return ``None``. + Returns: - google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient: + Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]: A BigQuery Storage API client. """ - from google.cloud import bigquery_storage_v1beta1 + try: + from google.cloud import bigquery_storage_v1 + except ImportError: + warnings.warn( + "Cannot create BigQuery Storage client, the dependency " + "google-cloud-bigquery-storage is not installed." + ) + return None - return bigquery_storage_v1beta1.BigQueryStorageClient( - credentials=self._credentials - ) + return bigquery_storage_v1.BigQueryReadClient(credentials=self._credentials) def create_dataset( self, dataset, exists_ok=False, retry=DEFAULT_RETRY, timeout=None diff --git a/google/cloud/bigquery/dbapi/_helpers.py b/google/cloud/bigquery/dbapi/_helpers.py index 6558177d7..1bcf45f31 100644 --- a/google/cloud/bigquery/dbapi/_helpers.py +++ b/google/cloud/bigquery/dbapi/_helpers.py @@ -19,6 +19,7 @@ import datetime import decimal +import functools import numbers import six @@ -233,8 +234,52 @@ def to_bq_table_rows(rows_iterable): """ def to_table_row(row): - values = tuple(row.values()) + # NOTE: We fetch ARROW values, thus we need to convert them to Python + # objects with as_py(). + values = tuple(value.as_py() for value in row.values()) keys_to_index = {key: i for i, key in enumerate(row.keys())} return table.Row(values, keys_to_index) return (to_table_row(row_data) for row_data in rows_iterable) + + +def raise_on_closed( + exc_msg, exc_class=exceptions.ProgrammingError, closed_attr_name="_closed" +): + """Make public instance methods raise an error if the instance is closed.""" + + def _raise_on_closed(method): + """Make a non-static method raise an error if its containing instance is closed. + """ + + def with_closed_check(self, *args, **kwargs): + if getattr(self, closed_attr_name): + raise exc_class(exc_msg) + return method(self, *args, **kwargs) + + functools.update_wrapper(with_closed_check, method) + return with_closed_check + + def decorate_public_methods(klass): + """Apply ``_raise_on_closed()`` decorator to public instance methods. + """ + for name in dir(klass): + if name.startswith("_"): + continue + + member = getattr(klass, name) + if not callable(member): + continue + + # We need to check for class/static methods directly in the instance + # __dict__, not via the retrieved attribute (`member`), as the + # latter is already a callable *produced* by one of these descriptors. + if isinstance(klass.__dict__[name], (staticmethod, classmethod)): + continue + + member = _raise_on_closed(member) + setattr(klass, name, member) + + return klass + + return decorate_public_methods diff --git a/google/cloud/bigquery/dbapi/connection.py b/google/cloud/bigquery/dbapi/connection.py index b8eaf2f9b..23e966486 100644 --- a/google/cloud/bigquery/dbapi/connection.py +++ b/google/cloud/bigquery/dbapi/connection.py @@ -14,22 +14,30 @@ """Connection for the Google BigQuery DB-API.""" +import weakref + from google.cloud import bigquery from google.cloud.bigquery.dbapi import cursor +from google.cloud.bigquery.dbapi import _helpers +@_helpers.raise_on_closed("Operating on a closed connection.") class Connection(object): """DB-API Connection to Google BigQuery. Args: - client (google.cloud.bigquery.Client): A client used to connect to BigQuery. + client (Optional[google.cloud.bigquery.Client]): + A REST API client used to connect to BigQuery. If not passed, a + client is created using default options inferred from the environment. bqstorage_client(\ - Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \ + Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient] \ ): - [Beta] An alternative client that uses the faster BigQuery Storage - API to fetch rows from BigQuery. If both clients are given, - ``bqstorage_client`` is used first to fetch query results, - with a fallback on ``client``, if necessary. + A client that uses the faster BigQuery Storage API to fetch rows from + BigQuery. If not passed, it is created using the same credentials + as ``client``. + + When fetching query results, ``bqstorage_client`` is used first, with + a fallback on ``client``, if necessary. .. note:: There is a known issue with the BigQuery Storage API with small @@ -38,15 +46,46 @@ class Connection(object): https://github.com/googleapis/python-bigquery-storage/issues/2 """ - def __init__(self, client, bqstorage_client=None): + def __init__(self, client=None, bqstorage_client=None): + if client is None: + client = bigquery.Client() + self._owns_client = True + else: + self._owns_client = False + + if bqstorage_client is None: + # A warning is already raised by the factory if instantiation fails. + bqstorage_client = client._create_bqstorage_client() + self._owns_bqstorage_client = bqstorage_client is not None + else: + self._owns_bqstorage_client = False + self._client = client self._bqstorage_client = bqstorage_client + self._closed = False + self._cursors_created = weakref.WeakSet() + def close(self): - """No-op.""" + """Close the connection and any cursors created from it. + + Any BigQuery clients explicitly passed to the constructor are *not* + closed, only those created by the connection instance itself. + """ + self._closed = True + + if self._owns_client: + self._client.close() + + if self._owns_bqstorage_client: + # There is no close() on the BQ Storage client itself. + self._bqstorage_client.transport.channel.close() + + for cursor_ in self._cursors_created: + cursor_.close() def commit(self): - """No-op.""" + """No-op, but for consistency raise an error if connection is closed.""" def cursor(self): """Return a new cursor object. @@ -54,7 +93,9 @@ def cursor(self): Returns: google.cloud.bigquery.dbapi.Cursor: A DB-API cursor that uses this connection. """ - return cursor.Cursor(self) + new_cursor = cursor.Cursor(self) + self._cursors_created.add(new_cursor) + return new_cursor def connect(client=None, bqstorage_client=None): @@ -62,15 +103,17 @@ def connect(client=None, bqstorage_client=None): Args: client (Optional[google.cloud.bigquery.Client]): - A client used to connect to BigQuery. If not passed, a client is - created using default options inferred from the environment. + A REST API client used to connect to BigQuery. If not passed, a + client is created using default options inferred from the environment. bqstorage_client(\ - Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \ + Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient] \ ): - [Beta] An alternative client that uses the faster BigQuery Storage - API to fetch rows from BigQuery. If both clients are given, - ``bqstorage_client`` is used first to fetch query results, - with a fallback on ``client``, if necessary. + A client that uses the faster BigQuery Storage API to fetch rows from + BigQuery. If not passed, it is created using the same credentials + as ``client``. + + When fetching query results, ``bqstorage_client`` is used first, with + a fallback on ``client``, if necessary. .. note:: There is a known issue with the BigQuery Storage API with small @@ -81,6 +124,4 @@ def connect(client=None, bqstorage_client=None): Returns: google.cloud.bigquery.dbapi.Connection: A new DB-API connection to BigQuery. """ - if client is None: - client = bigquery.Client() return Connection(client, bqstorage_client) diff --git a/google/cloud/bigquery/dbapi/cursor.py b/google/cloud/bigquery/dbapi/cursor.py index eb73b3d56..c72116d07 100644 --- a/google/cloud/bigquery/dbapi/cursor.py +++ b/google/cloud/bigquery/dbapi/cursor.py @@ -15,6 +15,7 @@ """Cursor for the Google BigQuery DB-API.""" import collections +import warnings try: from collections import abc as collections_abc @@ -51,6 +52,7 @@ ) +@_helpers.raise_on_closed("Operating on a closed cursor.") class Cursor(object): """DB-API Cursor to Google BigQuery. @@ -73,9 +75,11 @@ def __init__(self, connection): self.arraysize = None self._query_data = None self._query_job = None + self._closed = False def close(self): - """No-op.""" + """Mark the cursor as closed, preventing its further use.""" + self._closed = True def _set_description(self, schema): """Set description from schema. @@ -256,7 +260,7 @@ def _bqstorage_fetch(self, bqstorage_client): Args: bqstorage_client(\ - google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \ + google.cloud.bigquery_storage_v1.BigQueryReadClient \ ): A client tha know how to talk to the BigQuery Storage API. @@ -264,26 +268,56 @@ def _bqstorage_fetch(self, bqstorage_client): Iterable[Mapping]: A sequence of rows, represented as dictionaries. """ - # NOTE: Given that BQ storage client instance is passed in, it means - # that bigquery_storage_v1beta1 library is available (no ImportError). + # Hitting this code path with a BQ Storage client instance implies that + # bigquery_storage_v1* can indeed be imported here without errors. + from google.cloud import bigquery_storage_v1 from google.cloud import bigquery_storage_v1beta1 table_reference = self._query_job.destination - read_session = bqstorage_client.create_read_session( - table_reference.to_bqstorage(), - "projects/{}".format(table_reference.project), - # a single stream only, as DB API is not well-suited for multithreading - requested_streams=1, + is_v1beta1_client = isinstance( + bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient ) + # We want to preserve compatibility with the v1beta1 BQ Storage clients, + # thus adjust the session creation if needed. + if is_v1beta1_client: + warnings.warn( + "Support for BigQuery Storage v1beta1 clients is deprecated, please " + "consider upgrading the client to BigQuery Storage v1 stable version.", + category=DeprecationWarning, + ) + read_session = bqstorage_client.create_read_session( + table_reference.to_bqstorage(v1beta1=True), + "projects/{}".format(table_reference.project), + # a single stream only, as DB API is not well-suited for multithreading + requested_streams=1, + format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, + ) + else: + requested_session = bigquery_storage_v1.types.ReadSession( + table=table_reference.to_bqstorage(), + data_format=bigquery_storage_v1.enums.DataFormat.ARROW, + ) + read_session = bqstorage_client.create_read_session( + parent="projects/{}".format(table_reference.project), + read_session=requested_session, + # a single stream only, as DB API is not well-suited for multithreading + max_stream_count=1, + ) + if not read_session.streams: return iter([]) # empty table, nothing to read - read_position = bigquery_storage_v1beta1.types.StreamPosition( - stream=read_session.streams[0], - ) - read_rows_stream = bqstorage_client.read_rows(read_position) + if is_v1beta1_client: + read_position = bigquery_storage_v1beta1.types.StreamPosition( + stream=read_session.streams[0], + ) + read_rows_stream = bqstorage_client.read_rows(read_position) + else: + stream_name = read_session.streams[0].name + read_rows_stream = bqstorage_client.read_rows(stream_name) + rows_iterable = read_rows_stream.rows(read_session) return rows_iterable @@ -353,10 +387,10 @@ def fetchall(self): return list(self._query_data) def setinputsizes(self, sizes): - """No-op.""" + """No-op, but for consistency raise an error if cursor is closed.""" def setoutputsize(self, size, column=None): - """No-op.""" + """No-op, but for consistency raise an error if cursor is closed.""" def _format_operation_list(operation, parameters): diff --git a/google/cloud/bigquery/job.py b/google/cloud/bigquery/job.py index 7a1178a8c..0040d585a 100644 --- a/google/cloud/bigquery/job.py +++ b/google/cloud/bigquery/job.py @@ -3250,7 +3250,7 @@ def to_arrow( self, progress_bar_type=None, bqstorage_client=None, - create_bqstorage_client=False, + create_bqstorage_client=True, ): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -3274,10 +3274,10 @@ def to_arrow( ``'tqdm_gui'`` Use the :func:`tqdm.tqdm_gui` function to display a progress bar as a graphical dialog box. - bqstorage_client (google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient): - **Beta Feature** Optional. A BigQuery Storage API client. If - supplied, use the faster BigQuery Storage API to fetch rows - from BigQuery. This API is a billable API. + bqstorage_client (google.cloud.bigquery_storage_v1.BigQueryReadClient): + Optional. A BigQuery Storage API client. If supplied, use the + faster BigQuery Storage API to fetch rows from BigQuery. + This API is a billable API. This method requires the ``pyarrow`` and ``google-cloud-bigquery-storage`` libraries. @@ -3285,11 +3285,10 @@ def to_arrow( Reading from a specific partition or snapshot is not currently supported by this method. create_bqstorage_client (bool): - **Beta Feature** Optional. If ``True``, create a BigQuery - Storage API client using the default API settings. The - BigQuery Storage API is a faster way to fetch rows from - BigQuery. See the ``bqstorage_client`` parameter for more - information. + Optional. If ``True`` (default), create a BigQuery Storage API + client using the default API settings. The BigQuery Storage API + is a faster way to fetch rows from BigQuery. See the + ``bqstorage_client`` parameter for more information. This argument does nothing if ``bqstorage_client`` is supplied. @@ -3320,15 +3319,15 @@ def to_dataframe( bqstorage_client=None, dtypes=None, progress_bar_type=None, - create_bqstorage_client=False, + create_bqstorage_client=True, ): """Return a pandas DataFrame from a QueryJob Args: - bqstorage_client (google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient): - **Alpha Feature** Optional. A BigQuery Storage API client. If - supplied, use the faster BigQuery Storage API to fetch rows - from BigQuery. This API is a billable API. + bqstorage_client (google.cloud.bigquery_storage_v1.BigQueryReadClient): + Optional. A BigQuery Storage API client. If supplied, use the + faster BigQuery Storage API to fetch rows from BigQuery. This + API is a billable API. This method requires the ``fastavro`` and ``google-cloud-bigquery-storage`` libraries. @@ -3355,11 +3354,10 @@ def to_dataframe( ..versionadded:: 1.11.0 create_bqstorage_client (bool): - **Beta Feature** Optional. If ``True``, create a BigQuery - Storage API client using the default API settings. The - BigQuery Storage API is a faster way to fetch rows from - BigQuery. See the ``bqstorage_client`` parameter for more - information. + Optional. If ``True`` (default), create a BigQuery Storage API + client using the default API settings. The BigQuery Storage API + is a faster way to fetch rows from BigQuery. See the + ``bqstorage_client`` parameter for more information. This argument does nothing if ``bqstorage_client`` is supplied. diff --git a/google/cloud/bigquery/magics.py b/google/cloud/bigquery/magics.py index 4f2a16cca..40dda3d13 100644 --- a/google/cloud/bigquery/magics.py +++ b/google/cloud/bigquery/magics.py @@ -39,10 +39,9 @@ Project to use for running the query. Defaults to the context :attr:`~google.cloud.bigquery.magics.Context.project`. * ``--use_bqstorage_api`` (optional, line argument): - Downloads the DataFrame using the BigQuery Storage API. To use this - option, install the ``google-cloud-bigquery-storage`` and ``fastavro`` - packages, and `enable the BigQuery Storage API - `_. + [Deprecated] Not used anymore, as BigQuery Storage API is used by default. + * ``--use_rest_api`` (optional, line argument): + Use the BigQuery REST API instead of the Storage API. * ``--use_legacy_sql`` (optional, line argument): Runs the query using Legacy SQL syntax. Defaults to Standard SQL if this argument not used. @@ -150,6 +149,7 @@ import functools import sys import time +import warnings from concurrent import futures try: @@ -182,7 +182,6 @@ def __init__(self): self._credentials = None self._project = None self._connection = None - self._use_bqstorage_api = None self._default_query_job_config = bigquery.QueryJobConfig() @property @@ -245,21 +244,6 @@ def project(self): def project(self, value): self._project = value - @property - def use_bqstorage_api(self): - """bool: [Beta] Set to True to use the BigQuery Storage API to - download query results - - To use this option, install the ``google-cloud-bigquery-storage`` and - ``fastavro`` packages, and `enable the BigQuery Storage API - `_. - """ - return self._use_bqstorage_api - - @use_bqstorage_api.setter - def use_bqstorage_api(self, value): - self._use_bqstorage_api = value - @property def default_query_job_config(self): """google.cloud.bigquery.job.QueryJobConfig: Default job @@ -434,11 +418,21 @@ def _create_dataset_if_necessary(client, dataset_id): @magic_arguments.argument( "--use_bqstorage_api", action="store_true", + default=None, + help=( + "[Deprecated] The BigQuery Storage API is already used by default to " + "download large query results, and this option has no effect. " + "If you want to switch to the classic REST API instead, use the " + "--use_rest_api option." + ), +) +@magic_arguments.argument( + "--use_rest_api", + action="store_true", default=False, help=( - "[Beta] Use the BigQuery Storage API to download large query results. " - "To use this option, install the google-cloud-bigquery-storage and " - "fastavro packages, and enable the BigQuery Storage API." + "Use the classic REST API instead of the BigQuery Storage API to " + "download query results." ), ) @magic_arguments.argument( @@ -481,6 +475,14 @@ def _cell_magic(line, query): """ args = magic_arguments.parse_argstring(_cell_magic, line) + if args.use_bqstorage_api is not None: + warnings.warn( + "Deprecated option --use_bqstorage_api, the BigQuery " + "Storage API is already used by default.", + category=DeprecationWarning, + ) + use_bqstorage_api = not args.use_rest_api + params = [] if args.params is not None: try: @@ -502,9 +504,7 @@ def _cell_magic(line, query): ) if context._connection: client._connection = context._connection - bqstorage_client = _make_bqstorage_client( - args.use_bqstorage_api or context.use_bqstorage_api, context.credentials - ) + bqstorage_client = _make_bqstorage_client(use_bqstorage_api, context.credentials) close_transports = functools.partial(_close_transports, client, bqstorage_client) @@ -603,11 +603,13 @@ def _make_bqstorage_client(use_bqstorage_api, credentials): return None try: - from google.cloud import bigquery_storage_v1beta1 + from google.cloud import bigquery_storage_v1 except ImportError as err: customized_error = ImportError( - "Install the google-cloud-bigquery-storage and pyarrow packages " - "to use the BigQuery Storage API." + "The default BigQuery Storage API client cannot be used, install " + "the missing google-cloud-bigquery-storage and pyarrow packages " + "to use it. Alternatively, use the classic REST API by specifying " + "the --use_rest_api magic option." ) six.raise_from(customized_error, err) @@ -619,7 +621,7 @@ def _make_bqstorage_client(use_bqstorage_api, credentials): ) six.raise_from(customized_error, err) - return bigquery_storage_v1beta1.BigQueryStorageClient( + return bigquery_storage_v1.BigQueryReadClient( credentials=credentials, client_info=gapic_client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT), ) @@ -634,7 +636,7 @@ def _close_transports(client, bqstorage_client): Args: client (:class:`~google.cloud.bigquery.client.Client`): bqstorage_client - (Optional[:class:`~google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient`]): + (Optional[:class:`~google.cloud.bigquery_storage_v1.BigQueryReadClient`]): A client for the BigQuery Storage API. """ diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index e66d24e74..3022ddbd5 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -26,6 +26,7 @@ import six try: + # Needed for the to_bqstorage() method. from google.cloud import bigquery_storage_v1beta1 except ImportError: # pragma: NO COVER bigquery_storage_v1beta1 = None @@ -226,7 +227,7 @@ def to_api_repr(self): "tableId": self._table_id, } - def to_bqstorage(self): + def to_bqstorage(self, v1beta1=False): """Construct a BigQuery Storage API representation of this table. Install the ``google-cloud-bigquery-storage`` package to use this @@ -235,35 +236,41 @@ def to_bqstorage(self): If the ``table_id`` contains a partition identifier (e.g. ``my_table$201812``) or a snapshot identifier (e.g. ``mytable@1234567890``), it is ignored. Use - :class:`google.cloud.bigquery_storage_v1beta1.types.TableReadOptions` + :class:`google.cloud.bigquery_storage_v1.types.ReadSession.TableReadOptions` to filter rows by partition. Use - :class:`google.cloud.bigquery_storage_v1beta1.types.TableModifiers` + :class:`google.cloud.bigquery_storage_v1.types.ReadSession.TableModifiers` to select a specific snapshot to read from. + Args: + v1beta1 (Optiona[bool]): + If :data:`True`, return representation compatible with BigQuery + Storage ``v1beta1`` version. Defaults to :data:`False`. + Returns: - google.cloud.bigquery_storage_v1beta1.types.TableReference: + Union[str, google.cloud.bigquery_storage_v1beta1.types.TableReference:]: A reference to this table in the BigQuery Storage API. Raises: ValueError: - If the :mod:`google.cloud.bigquery_storage_v1beta1` module - cannot be imported. + If ``v1beta1`` compatibility is requested, but the + :mod:`google.cloud.bigquery_storage_v1beta1` module cannot be imported. """ - if bigquery_storage_v1beta1 is None: + if v1beta1 and bigquery_storage_v1beta1 is None: raise ValueError(_NO_BQSTORAGE_ERROR) - table_ref = bigquery_storage_v1beta1.types.TableReference() - table_ref.project_id = self._project - table_ref.dataset_id = self._dataset_id - table_id = self._table_id - - if "@" in table_id: - table_id = table_id.split("@")[0] + table_id, _, _ = self._table_id.partition("@") + table_id, _, _ = table_id.partition("$") - if "$" in table_id: - table_id = table_id.split("$")[0] - - table_ref.table_id = table_id + if v1beta1: + table_ref = bigquery_storage_v1beta1.types.TableReference( + project_id=self._project, + dataset_id=self._dataset_id, + table_id=table_id, + ) + else: + table_ref = "projects/{}/datasets/{}/tables/{}".format( + self._project, self._dataset_id, table_id, + ) return table_ref @@ -868,14 +875,19 @@ def to_api_repr(self): """ return copy.deepcopy(self._properties) - def to_bqstorage(self): + def to_bqstorage(self, v1beta1=False): """Construct a BigQuery Storage API representation of this table. + Args: + v1beta1 (Optiona[bool]): + If :data:`True`, return representation compatible with BigQuery + Storage ``v1beta1`` version. Defaults to :data:`False`. + Returns: - google.cloud.bigquery_storage_v1beta1.types.TableReference: + Union[str, google.cloud.bigquery_storage_v1beta1.types.TableReference:]: A reference to this table in the BigQuery Storage API. """ - return self.reference.to_bqstorage() + return self.reference.to_bqstorage(v1beta1=v1beta1) def _build_resource(self, filter_fields): """Generate a resource for ``update``.""" @@ -1083,14 +1095,19 @@ def from_string(cls, full_table_id): {"tableReference": TableReference.from_string(full_table_id).to_api_repr()} ) - def to_bqstorage(self): + def to_bqstorage(self, v1beta1=False): """Construct a BigQuery Storage API representation of this table. + Args: + v1beta1 (Optiona[bool]): + If :data:`True`, return representation compatible with BigQuery + Storage ``v1beta1`` version. Defaults to :data:`False`. + Returns: - google.cloud.bigquery_storage_v1beta1.types.TableReference: + Union[str, google.cloud.bigquery_storage_v1beta1.types.TableReference:]: A reference to this table in the BigQuery Storage API. """ - return self.reference.to_bqstorage() + return self.reference.to_bqstorage(v1beta1=v1beta1) def _row_from_mapping(mapping, schema): @@ -1466,7 +1483,7 @@ def to_arrow( self, progress_bar_type=None, bqstorage_client=None, - create_bqstorage_client=False, + create_bqstorage_client=True, ): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1490,10 +1507,10 @@ def to_arrow( ``'tqdm_gui'`` Use the :func:`tqdm.tqdm_gui` function to display a progress bar as a graphical dialog box. - bqstorage_client (google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient): - **Beta Feature** Optional. A BigQuery Storage API client. If - supplied, use the faster BigQuery Storage API to fetch rows - from BigQuery. This API is a billable API. + bqstorage_client (google.cloud.bigquery_storage_v1.BigQueryReadClient): + Optional. A BigQuery Storage API client. If supplied, use the + faster BigQuery Storage API to fetch rows from BigQuery. This + API is a billable API. This method requires the ``pyarrow`` and ``google-cloud-bigquery-storage`` libraries. @@ -1501,11 +1518,10 @@ def to_arrow( Reading from a specific partition or snapshot is not currently supported by this method. create_bqstorage_client (bool): - **Beta Feature** Optional. If ``True``, create a BigQuery - Storage API client using the default API settings. The - BigQuery Storage API is a faster way to fetch rows from - BigQuery. See the ``bqstorage_client`` parameter for more - information. + Optional. If ``True`` (default), create a BigQuery Storage API + client using the default API settings. The BigQuery Storage API + is a faster way to fetch rows from BigQuery. See the + ``bqstorage_client`` parameter for more information. This argument does nothing if ``bqstorage_client`` is supplied. @@ -1575,10 +1591,9 @@ def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): """Create an iterable of pandas DataFrames, to process the table as a stream. Args: - bqstorage_client (google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient): - **Beta Feature** Optional. A BigQuery Storage API client. If - supplied, use the faster BigQuery Storage API to fetch rows - from BigQuery. + bqstorage_client (google.cloud.bigquery_storage_v1.BigQueryReadClient): + Optional. A BigQuery Storage API client. If supplied, use the + faster BigQuery Storage API to fetch rows from BigQuery. This method requires the ``pyarrow`` and ``google-cloud-bigquery-storage`` libraries. @@ -1639,15 +1654,14 @@ def to_dataframe( bqstorage_client=None, dtypes=None, progress_bar_type=None, - create_bqstorage_client=False, + create_bqstorage_client=True, ): """Create a pandas DataFrame by loading all pages of a query. Args: - bqstorage_client (google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient): - **Beta Feature** Optional. A BigQuery Storage API client. If - supplied, use the faster BigQuery Storage API to fetch rows - from BigQuery. + bqstorage_client (google.cloud.bigquery_storage_v1.BigQueryReadClient): + Optional. A BigQuery Storage API client. If supplied, use the + faster BigQuery Storage API to fetch rows from BigQuery. This method requires the ``pyarrow`` and ``google-cloud-bigquery-storage`` libraries. @@ -1685,11 +1699,10 @@ def to_dataframe( ..versionadded:: 1.11.0 create_bqstorage_client (bool): - **Beta Feature** Optional. If ``True``, create a BigQuery - Storage API client using the default API settings. The - BigQuery Storage API is a faster way to fetch rows from - BigQuery. See the ``bqstorage_client`` parameter for more - information. + Optional. If ``True`` (default), create a BigQuery Storage API + client using the default API settings. The BigQuery Storage API + is a faster way to fetch rows from BigQuery. See the + ``bqstorage_client`` parameter for more information. This argument does nothing if ``bqstorage_client`` is supplied. @@ -1704,7 +1717,7 @@ def to_dataframe( Raises: ValueError: If the :mod:`pandas` library cannot be imported, or the - :mod:`google.cloud.bigquery_storage_v1beta1` module is + :mod:`google.cloud.bigquery_storage_v1` module is required but cannot be imported. """ @@ -1789,7 +1802,7 @@ def to_arrow( self, progress_bar_type=None, bqstorage_client=None, - create_bqstorage_client=False, + create_bqstorage_client=True, ): """[Beta] Create an empty class:`pyarrow.Table`. @@ -1810,7 +1823,7 @@ def to_dataframe( bqstorage_client=None, dtypes=None, progress_bar_type=None, - create_bqstorage_client=False, + create_bqstorage_client=True, ): """Create an empty dataframe. diff --git a/setup.py b/setup.py index 3ec2ba0bd..422584d12 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ ] extras = { "bqstorage": [ - "google-cloud-bigquery-storage >= 0.6.0, <2.0.0dev", + "google-cloud-bigquery-storage >= 1.0.0, <2.0.0dev", # Due to an issue in pip's dependency resolver, the `grpc` extra is not # installed, even though `google-cloud-bigquery-storage` specifies it # as `google-api-core[grpc]`. We thus need to explicitly specify it here. diff --git a/tests/system.py b/tests/system.py index 66d7ee259..3b874300f 100644 --- a/tests/system.py +++ b/tests/system.py @@ -34,8 +34,10 @@ import pkg_resources try: + from google.cloud import bigquery_storage_v1 from google.cloud import bigquery_storage_v1beta1 except ImportError: # pragma: NO COVER + bigquery_storage_v1 = None bigquery_storage_v1beta1 = None try: @@ -1689,10 +1691,10 @@ def test_dbapi_fetchall(self): self.assertEqual(row_tuples, [(1, 2), (3, 4), (5, 6)]) @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_dbapi_fetch_w_bqstorage_client_small_result_set(self): - bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient( + bqstorage_client = bigquery_storage_v1.BigQueryReadClient( credentials=Config.CLIENT._credentials ) cursor = dbapi.connect(Config.CLIENT, bqstorage_client).cursor() @@ -1733,10 +1735,60 @@ def test_dbapi_fetch_w_bqstorage_client_small_result_set(self): self.assertEqual(fetched_data, expected_data) @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) - @unittest.skipIf(fastavro is None, "Requires `fastavro`") + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_dbapi_fetch_w_bqstorage_client_large_result_set(self): + bqstorage_client = bigquery_storage_v1.BigQueryReadClient( + credentials=Config.CLIENT._credentials + ) + cursor = dbapi.connect(Config.CLIENT, bqstorage_client).cursor() + + # Pick a large enough LIMIT value to assure that the fallback to the + # default client is not needed due to the result set being too small + # (a known issue that causes problems when reading such result sets with + # BQ storage client). + cursor.execute( + """ + SELECT id, `by`, time_ts + FROM `bigquery-public-data.hacker_news.comments` + ORDER BY `id` ASC + LIMIT 100000 + """ + ) + + result_rows = [cursor.fetchone(), cursor.fetchone(), cursor.fetchone()] + + field_name = operator.itemgetter(0) + fetched_data = [sorted(row.items(), key=field_name) for row in result_rows] + + # Since DB API is not thread safe, only a single result stream should be + # requested by the BQ storage client, meaning that results should arrive + # in the sorted order. + expected_data = [ + [ + ("by", "sama"), + ("id", 15), + ("time_ts", datetime.datetime(2006, 10, 9, 19, 51, 1, tzinfo=UTC)), + ], + [ + ("by", "pg"), + ("id", 17), + ("time_ts", datetime.datetime(2006, 10, 9, 19, 52, 45, tzinfo=UTC)), + ], + [ + ("by", "pg"), + ("id", 22), + ("time_ts", datetime.datetime(2006, 10, 10, 2, 18, 22, tzinfo=UTC)), + ], + ] + self.assertEqual(fetched_data, expected_data) + + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_dbapi_fetch_w_bqstorage_client_v1beta1_large_result_set(self): bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient( credentials=Config.CLIENT._credentials ) @@ -1782,6 +1834,36 @@ def test_dbapi_fetch_w_bqstorage_client_large_result_set(self): ] self.assertEqual(fetched_data, expected_data) + @unittest.skipIf( + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_dbapi_connection_does_not_leak_sockets(self): + current_process = psutil.Process() + conn_count_start = len(current_process.connections()) + + # Provide no explicit clients, so that the connection will create and own them. + connection = dbapi.connect() + cursor = connection.cursor() + + # Pick a large enough LIMIT value to assure that the fallback to the + # default client is not needed due to the result set being too small + # (a known issue that causes problems when reding such result sets with + # BQ storage client). + cursor.execute( + """ + SELECT id, `by`, time_ts + FROM `bigquery-public-data.hacker_news.comments` + ORDER BY `id` ASC + LIMIT 100000 + """ + ) + rows = cursor.fetchall() + self.assertEqual(len(rows), 100000) + + connection.close() + conn_count_end = len(current_process.connections()) + self.assertEqual(conn_count_end, conn_count_start) + def _load_table_for_dml(self, rows, dataset_id, table_id): from google.cloud._testing import _NamedTemporaryFile from google.cloud.bigquery.job import CreateDisposition @@ -2187,7 +2269,7 @@ def test_query_results_to_dataframe(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_query_results_to_dataframe_w_bqstorage(self): dest_dataset = self.temp_dataset(_make_dataset_id("bqstorage_to_dataframe_")) @@ -2199,6 +2281,60 @@ def test_query_results_to_dataframe_w_bqstorage(self): LIMIT 10 """ + bqstorage_client = bigquery_storage_v1.BigQueryReadClient( + credentials=Config.CLIENT._credentials + ) + + job_configs = ( + # There is a known issue reading small anonymous query result + # tables with the BQ Storage API. Writing to a destination + # table works around this issue. + bigquery.QueryJobConfig( + destination=dest_ref, write_disposition="WRITE_TRUNCATE" + ), + # Check that the client is able to work around the issue with + # reading small anonymous query result tables by falling back to + # the tabledata.list API. + None, + ) + + for job_config in job_configs: + df = ( + Config.CLIENT.query(query, job_config=job_config) + .result() + .to_dataframe(bqstorage_client) + ) + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 10) # verify the number of rows + column_names = ["id", "author", "time_ts", "dead"] + self.assertEqual(list(df), column_names) + exp_datatypes = { + "id": int, + "author": six.text_type, + "time_ts": pandas.Timestamp, + "dead": bool, + } + for index, row in df.iterrows(): + for col in column_names: + # all the schema fields are nullable, so None is acceptable + if not row[col] is None: + self.assertIsInstance(row[col], exp_datatypes[col]) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_query_results_to_dataframe_w_bqstorage_v1beta1(self): + dest_dataset = self.temp_dataset(_make_dataset_id("bqstorage_to_dataframe_")) + dest_ref = dest_dataset.table("query_results") + + query = """ + SELECT id, author, time_ts, dead + FROM `bigquery-public-data.hacker_news.comments` + LIMIT 10 + """ + bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient( credentials=Config.CLIENT._credentials ) @@ -2485,7 +2621,7 @@ def _fetch_dataframe(self, query): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_nested_table_to_arrow(self): from google.cloud.bigquery.job import SourceFormat @@ -2521,7 +2657,7 @@ def test_nested_table_to_arrow(self): job_config.schema = schema # Load a table using a local JSON file from memory. Config.CLIENT.load_table_from_file(body, table, job_config=job_config).result() - bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient( + bqstorage_client = bigquery_storage_v1.BigQueryReadClient( credentials=Config.CLIENT._credentials ) @@ -2677,13 +2813,13 @@ def test_list_rows_page_size(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_list_rows_max_results_w_bqstorage(self): table_ref = DatasetReference("bigquery-public-data", "utility_us").table( "country_code_iso" ) - bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient( + bqstorage_client = bigquery_storage_v1.BigQueryReadClient( credentials=Config.CLIENT._credentials ) @@ -2741,7 +2877,11 @@ def test_bigquery_magic(): assert isinstance(result, pandas.DataFrame) assert len(result) == 10 # verify row count assert list(result) == ["url", "view_count"] # verify column names - assert conn_count_end == conn_count_start # system resources are released + + # NOTE: For some reason, the number of open sockets is sometimes one *less* + # than expected when running system tests on Kokoro, thus using the <= assertion. + # That's still fine, however, since the sockets are apparently not leaked. + assert conn_count_end <= conn_count_start # system resources are released def _job_done(instance): diff --git a/tests/unit/helpers.py b/tests/unit/helpers.py index 5b731a763..eea345e89 100644 --- a/tests/unit/helpers.py +++ b/tests/unit/helpers.py @@ -22,3 +22,10 @@ def make_connection(*responses): mock_conn.user_agent = "testing 1.2.3" mock_conn.api_request.side_effect = list(responses) + [NotFound("miss")] return mock_conn + + +def _to_pyarrow(value): + """Convert Python value to pyarrow value.""" + import pyarrow + + return pyarrow.array([value])[0] diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index f1dc4e816..0e083d43f 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -52,9 +52,10 @@ from google.cloud.bigquery.dataset import DatasetReference try: - from google.cloud import bigquery_storage_v1beta1 + from google.cloud import bigquery_storage_v1 except (ImportError, AttributeError): # pragma: NO COVER - bigquery_storage_v1beta1 = None + bigquery_storage_v1 = None +from test_utils.imports import maybe_fail_import from tests.unit.helpers import make_connection PANDAS_MINIUM_VERSION = pkg_resources.parse_version("1.0.0") @@ -655,25 +656,46 @@ def test_get_dataset(self): self.assertEqual(dataset.dataset_id, self.DS_ID) @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_create_bqstorage_client(self): - mock_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + mock_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) mock_client_instance = object() mock_client.return_value = mock_client_instance creds = _make_credentials() client = self._make_one(project=self.PROJECT, credentials=creds) with mock.patch( - "google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient", mock_client + "google.cloud.bigquery_storage_v1.BigQueryReadClient", mock_client ): bqstorage_client = client._create_bqstorage_client() self.assertIs(bqstorage_client, mock_client_instance) mock_client.assert_called_once_with(credentials=creds) + def test_create_bqstorage_client_missing_dependency(self): + client = self._make_one() + + def fail_bqstorage_import(name, globals, locals, fromlist, level): + # NOTE: *very* simplified, assuming a straightforward absolute import + return "bigquery_storage_v1" in name or ( + fromlist is not None and "bigquery_storage_v1" in fromlist + ) + + no_bqstorage = maybe_fail_import(predicate=fail_bqstorage_import) + + with no_bqstorage, warnings.catch_warnings(record=True) as warned: + bqstorage_client = client._create_bqstorage_client() + + self.assertIsNone(bqstorage_client) + matching_warnings = [ + warning + for warning in warned + if "not installed" in str(warning) + and "google-cloud-bigquery-storage" in str(warning) + ] + assert matching_warnings, "Missing dependency warning not raised." + def test_create_dataset_minimal(self): from google.cloud.bigquery.dataset import Dataset diff --git a/tests/unit/test_dbapi__helpers.py b/tests/unit/test_dbapi__helpers.py index 8f98d0c53..08dd6dcfa 100644 --- a/tests/unit/test_dbapi__helpers.py +++ b/tests/unit/test_dbapi__helpers.py @@ -18,10 +18,18 @@ import operator as op import unittest +try: + import pyarrow +except ImportError: # pragma: NO COVER + pyarrow = None + +import six + import google.cloud._helpers from google.cloud.bigquery import table from google.cloud.bigquery.dbapi import _helpers from google.cloud.bigquery.dbapi import exceptions +from tests.unit.helpers import _to_pyarrow class TestQueryParameters(unittest.TestCase): @@ -195,10 +203,21 @@ def test_empty_iterable(self): result = _helpers.to_bq_table_rows(rows_iterable) self.assertEqual(list(result), []) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_non_empty_iterable(self): rows_iterable = [ - dict(one=1.1, four=1.4, two=1.2, three=1.3), - dict(one=2.1, four=2.4, two=2.2, three=2.3), + dict( + one=_to_pyarrow(1.1), + four=_to_pyarrow(1.4), + two=_to_pyarrow(1.2), + three=_to_pyarrow(1.3), + ), + dict( + one=_to_pyarrow(2.1), + four=_to_pyarrow(2.4), + two=_to_pyarrow(2.2), + three=_to_pyarrow(2.3), + ), ] result = _helpers.to_bq_table_rows(rows_iterable) @@ -219,3 +238,94 @@ def test_non_empty_iterable(self): items = sorted(row_2.items(), key=field_value) expected_items = [("one", 2.1), ("two", 2.2), ("three", 2.3), ("four", 2.4)] self.assertEqual(items, expected_items) + + +class TestRaiseOnClosedDecorator(unittest.TestCase): + def _make_class(self): + class Foo(object): + + class_member = "class member" + + def __init__(self): + self._closed = False + self.instance_member = "instance member" + + def instance_method(self): + return self.instance_member + + @classmethod + def class_method(cls): # pragma: NO COVER + return cls.class_member + + @staticmethod + def static_method(): # pragma: NO COVER + return "static return value" + + def _private_method(self): + return self.instance_member + + return Foo + + def test_preserves_method_names(self): + klass = self._make_class() + decorated_class = _helpers.raise_on_closed("I'm closed!")(klass) + instance = decorated_class() + + self.assertEqual(instance.instance_method.__name__, "instance_method") + self.assertEqual(instance.class_method.__name__, "class_method") + self.assertEqual(instance.static_method.__name__, "static_method") + self.assertEqual(instance._private_method.__name__, "_private_method") + + def test_methods_on_not_closed_instance(self): + klass = self._make_class() + decorated_class = _helpers.raise_on_closed("I'm closed!")(klass) + instance = decorated_class() + instance._closed = False + + self.assertEqual(instance.instance_method(), "instance member") + self.assertEqual(instance.class_method(), "class member") + self.assertEqual(instance.static_method(), "static return value") + self.assertEqual(instance._private_method(), "instance member") + + def test_public_instance_methods_on_closed_instance(self): + klass = self._make_class() + decorated_class = _helpers.raise_on_closed("I'm closed!")(klass) + instance = decorated_class() + instance._closed = True + + with six.assertRaisesRegex(self, exceptions.ProgrammingError, "I'm closed!"): + instance.instance_method() + + def test_methods_wo_public_instance_methods_on_closed_instance(self): + klass = self._make_class() + decorated_class = _helpers.raise_on_closed("I'm closed!")(klass) + instance = decorated_class() + instance._closed = True + + # no errors expected + self.assertEqual(instance.class_method(), "class member") + self.assertEqual(instance.static_method(), "static return value") + self.assertEqual(instance._private_method(), "instance member") + + def test_custom_class_closed_attribute(self): + klass = self._make_class() + decorated_class = _helpers.raise_on_closed( + "I'm closed!", closed_attr_name="_really_closed" + )(klass) + instance = decorated_class() + instance._closed = False + instance._really_closed = True + + with six.assertRaisesRegex(self, exceptions.ProgrammingError, "I'm closed!"): + instance.instance_method() + + def test_custom_on_closed_error_type(self): + klass = self._make_class() + decorated_class = _helpers.raise_on_closed( + "I'm closed!", exc_class=RuntimeError + )(klass) + instance = decorated_class() + instance._closed = True + + with six.assertRaisesRegex(self, RuntimeError, "I'm closed!"): + instance.instance_method() diff --git a/tests/unit/test_dbapi_connection.py b/tests/unit/test_dbapi_connection.py index 595afd0fe..96ec41c51 100644 --- a/tests/unit/test_dbapi_connection.py +++ b/tests/unit/test_dbapi_connection.py @@ -12,14 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import gc import unittest import mock +import six try: - from google.cloud import bigquery_storage_v1beta1 + from google.cloud import bigquery_storage_v1 except ImportError: # pragma: NO COVER - bigquery_storage_v1beta1 = None + bigquery_storage_v1 = None class TestConnection(unittest.TestCase): @@ -39,22 +41,27 @@ def _mock_client(self): return mock_client def _mock_bqstorage_client(self): - from google.cloud.bigquery_storage_v1beta1 import client + from google.cloud.bigquery_storage_v1 import client - mock_client = mock.create_autospec(client.BigQueryStorageClient) + mock_client = mock.create_autospec(client.BigQueryReadClient) + mock_client.transport = mock.Mock(spec=["channel"]) + mock_client.transport.channel = mock.Mock(spec=["close"]) return mock_client def test_ctor_wo_bqstorage_client(self): from google.cloud.bigquery.dbapi import Connection mock_client = self._mock_client() + mock_bqstorage_client = self._mock_bqstorage_client() + mock_client._create_bqstorage_client.return_value = mock_bqstorage_client + connection = self._make_one(client=mock_client) self.assertIsInstance(connection, Connection) self.assertIs(connection._client, mock_client) - self.assertIsNone(connection._bqstorage_client) + self.assertIs(connection._bqstorage_client, mock_bqstorage_client) @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_ctor_w_bqstorage_client(self): from google.cloud.bigquery.dbapi import Connection @@ -76,20 +83,23 @@ def test_connect_wo_client(self, mock_client): connection = connect() self.assertIsInstance(connection, Connection) self.assertIsNotNone(connection._client) - self.assertIsNone(connection._bqstorage_client) + self.assertIsNotNone(connection._bqstorage_client) def test_connect_w_client(self): from google.cloud.bigquery.dbapi import connect from google.cloud.bigquery.dbapi import Connection mock_client = self._mock_client() + mock_bqstorage_client = self._mock_bqstorage_client() + mock_client._create_bqstorage_client.return_value = mock_bqstorage_client + connection = connect(client=mock_client) self.assertIsInstance(connection, Connection) self.assertIs(connection._client, mock_client) - self.assertIsNone(connection._bqstorage_client) + self.assertIs(connection._bqstorage_client, mock_bqstorage_client) @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_connect_w_both_clients(self): from google.cloud.bigquery.dbapi import connect @@ -104,11 +114,77 @@ def test_connect_w_both_clients(self): self.assertIs(connection._client, mock_client) self.assertIs(connection._bqstorage_client, mock_bqstorage_client) - def test_close(self): + def test_raises_error_if_closed(self): + from google.cloud.bigquery.dbapi.exceptions import ProgrammingError + + connection = self._make_one(client=self._mock_client()) + + connection.close() + + for method in ("close", "commit", "cursor"): + with six.assertRaisesRegex( + self, ProgrammingError, r"Operating on a closed connection\." + ): + getattr(connection, method)() + + def test_close_closes_all_created_bigquery_clients(self): + client = self._mock_client() + bqstorage_client = self._mock_bqstorage_client() + + client_patcher = mock.patch( + "google.cloud.bigquery.dbapi.connection.bigquery.Client", + return_value=client, + ) + bqstorage_client_patcher = mock.patch.object( + client, "_create_bqstorage_client", return_value=bqstorage_client, + ) + + with client_patcher, bqstorage_client_patcher: + connection = self._make_one(client=None, bqstorage_client=None) + + connection.close() + + self.assertTrue(client.close.called) + self.assertTrue(bqstorage_client.transport.channel.close.called) + + def test_close_does_not_close_bigquery_clients_passed_to_it(self): + client = self._mock_client() + bqstorage_client = self._mock_bqstorage_client() + connection = self._make_one(client=client, bqstorage_client=bqstorage_client) + + connection.close() + + self.assertFalse(client.close.called) + self.assertFalse(bqstorage_client.transport.channel.called) + + def test_close_closes_all_created_cursors(self): connection = self._make_one(client=self._mock_client()) - # close() is a no-op, there is nothing to test. + cursor_1 = connection.cursor() + cursor_2 = connection.cursor() + self.assertFalse(cursor_1._closed) + self.assertFalse(cursor_2._closed) + connection.close() + self.assertTrue(cursor_1._closed) + self.assertTrue(cursor_2._closed) + + def test_does_not_keep_cursor_instances_alive(self): + from google.cloud.bigquery.dbapi import Cursor + + connection = self._make_one(client=self._mock_client()) + cursor_1 = connection.cursor() # noqa + cursor_2 = connection.cursor() + cursor_3 = connection.cursor() # noqa + + del cursor_2 + + # Connections should not hold strong references to the Cursor instances + # they created, unnecessarily keeping them alive. + gc.collect() + cursors = [obj for obj in gc.get_objects() if isinstance(obj, Cursor)] + self.assertEqual(len(cursors), 2) + def test_commit(self): connection = self._make_one(client=self._mock_client()) # commit() is a no-op, there is nothing to test. diff --git a/tests/unit/test_dbapi_cursor.py b/tests/unit/test_dbapi_cursor.py index e53cc158a..caec4b1bd 100644 --- a/tests/unit/test_dbapi_cursor.py +++ b/tests/unit/test_dbapi_cursor.py @@ -14,17 +14,27 @@ import operator as op import unittest +import warnings import mock import six +try: + import pyarrow +except ImportError: # pragma: NO COVER + pyarrow = None + from google.api_core import exceptions try: + from google.cloud import bigquery_storage_v1 from google.cloud import bigquery_storage_v1beta1 except ImportError: # pragma: NO COVER + bigquery_storage_v1 = None bigquery_storage_v1beta1 = None +from tests.unit.helpers import _to_pyarrow + class TestCursor(unittest.TestCase): @staticmethod @@ -51,25 +61,40 @@ def _mock_client(self, rows=None, schema=None, num_dml_affected_rows=None): num_dml_affected_rows=num_dml_affected_rows, ) mock_client.list_rows.return_value = rows + + # Assure that the REST client gets used, not the BQ Storage client. + mock_client._create_bqstorage_client.return_value = None + return mock_client - def _mock_bqstorage_client(self, rows=None, stream_count=0): - from google.cloud.bigquery_storage_v1beta1 import client - from google.cloud.bigquery_storage_v1beta1 import types + def _mock_bqstorage_client(self, rows=None, stream_count=0, v1beta1=False): + from google.cloud.bigquery_storage_v1 import client + from google.cloud.bigquery_storage_v1 import types + from google.cloud.bigquery_storage_v1beta1 import types as types_v1beta1 if rows is None: rows = [] - mock_client = mock.create_autospec(client.BigQueryStorageClient) + if v1beta1: + mock_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + mock_read_session = mock.MagicMock( + streams=[ + types_v1beta1.Stream(name="streams/stream_{}".format(i)) + for i in range(stream_count) + ] + ) + else: + mock_client = mock.create_autospec(client.BigQueryReadClient) + mock_read_session = mock.MagicMock( + streams=[ + types.ReadStream(name="streams/stream_{}".format(i)) + for i in range(stream_count) + ] + ) - mock_read_session = mock.MagicMock( - streams=[ - types.Stream(name="streams/stream_{}".format(i)) - for i in range(stream_count) - ] - ) mock_client.create_read_session.return_value = mock_read_session - mock_rows_stream = mock.MagicMock() mock_rows_stream.rows.return_value = iter(rows) mock_client.read_rows.return_value = mock_rows_stream @@ -88,6 +113,9 @@ def _mock_job(self, total_rows=0, schema=None, num_dml_affected_rows=None): schema=schema, num_dml_affected_rows=num_dml_affected_rows, ) + mock_job.destination.to_bqstorage.return_value = ( + "projects/P/datasets/DS/tables/T" + ) if num_dml_affected_rows is None: mock_job.statement_type = None # API sends back None for SELECT @@ -122,6 +150,31 @@ def test_close(self): # close() is a no-op, there is nothing to test. cursor.close() + def test_raises_error_if_closed(self): + from google.cloud.bigquery.dbapi import connect + from google.cloud.bigquery.dbapi.exceptions import ProgrammingError + + connection = connect(self._mock_client()) + cursor = connection.cursor() + cursor.close() + + method_names = ( + "close", + "execute", + "executemany", + "fetchall", + "fetchmany", + "fetchone", + "setinputsizes", + "setoutputsize", + ) + + for method in method_names: + with six.assertRaisesRegex( + self, ProgrammingError, r"Operating on a closed cursor\." + ): + getattr(cursor, method)() + def test_fetchone_wo_execute_raises_error(self): from google.cloud.bigquery import dbapi @@ -213,8 +266,9 @@ def test_fetchall_w_row(self): self.assertEqual(rows[0], (1,)) @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_fetchall_w_bqstorage_client_fetch_success(self): from google.cloud.bigquery import dbapi from google.cloud.bigquery import table @@ -225,8 +279,18 @@ def test_fetchall_w_bqstorage_client_fetch_success(self): table.Row([2.4, 2.1, 2.3, 2.2], {"bar": 3, "baz": 2, "foo": 1, "quux": 0}), ] bqstorage_streamed_rows = [ - {"bar": 1.2, "foo": 1.1, "quux": 1.4, "baz": 1.3}, - {"bar": 2.2, "foo": 2.1, "quux": 2.4, "baz": 2.3}, + { + "bar": _to_pyarrow(1.2), + "foo": _to_pyarrow(1.1), + "quux": _to_pyarrow(1.4), + "baz": _to_pyarrow(1.3), + }, + { + "bar": _to_pyarrow(2.2), + "foo": _to_pyarrow(2.1), + "quux": _to_pyarrow(2.4), + "baz": _to_pyarrow(2.3), + }, ] mock_client = self._mock_client(rows=row_data) @@ -258,6 +322,70 @@ def test_fetchall_w_bqstorage_client_fetch_success(self): @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_fetchall_w_bqstorage_client_v1beta1_fetch_success(self): + from google.cloud.bigquery import dbapi + from google.cloud.bigquery import table + + # use unordered data to also test any non-determenistic key order in dicts + row_data = [ + table.Row([1.4, 1.1, 1.3, 1.2], {"bar": 3, "baz": 2, "foo": 1, "quux": 0}), + table.Row([2.4, 2.1, 2.3, 2.2], {"bar": 3, "baz": 2, "foo": 1, "quux": 0}), + ] + bqstorage_streamed_rows = [ + { + "bar": _to_pyarrow(1.2), + "foo": _to_pyarrow(1.1), + "quux": _to_pyarrow(1.4), + "baz": _to_pyarrow(1.3), + }, + { + "bar": _to_pyarrow(2.2), + "foo": _to_pyarrow(2.1), + "quux": _to_pyarrow(2.4), + "baz": _to_pyarrow(2.3), + }, + ] + + mock_client = self._mock_client(rows=row_data) + mock_bqstorage_client = self._mock_bqstorage_client( + stream_count=1, rows=bqstorage_streamed_rows, v1beta1=True + ) + + connection = dbapi.connect( + client=mock_client, bqstorage_client=mock_bqstorage_client, + ) + cursor = connection.cursor() + cursor.execute("SELECT foo, bar FROM some_table") + + with warnings.catch_warnings(record=True) as warned: + rows = cursor.fetchall() + + # a deprecation warning should have been emitted + expected_warnings = [ + warning + for warning in warned + if issubclass(warning.category, DeprecationWarning) + and "v1beta1" in str(warning) + ] + self.assertEqual(len(expected_warnings), 1, "Deprecation warning not raised.") + + # the default client was not used + mock_client.list_rows.assert_not_called() + + # check the data returned + field_value = op.itemgetter(1) + sorted_row_data = [sorted(row.items(), key=field_value) for row in rows] + expected_row_data = [ + [("foo", 1.1), ("bar", 1.2), ("baz", 1.3), ("quux", 1.4)], + [("foo", 2.1), ("bar", 2.2), ("baz", 2.3), ("quux", 2.4)], + ] + + self.assertEqual(sorted_row_data, expected_row_data) + + @unittest.skipIf( + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" + ) def test_fetchall_w_bqstorage_client_fetch_no_rows(self): from google.cloud.bigquery import dbapi @@ -279,7 +407,7 @@ def test_fetchall_w_bqstorage_client_fetch_no_rows(self): self.assertEqual(rows, []) @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_fetchall_w_bqstorage_client_fetch_error_no_fallback(self): from google.cloud.bigquery import dbapi @@ -307,7 +435,7 @@ def test_fetchall_w_bqstorage_client_fetch_error_no_fallback(self): mock_client.list_rows.assert_not_called() @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_fetchall_w_bqstorage_client_fetch_error_fallback_on_client(self): from google.cloud.bigquery import dbapi diff --git a/tests/unit/test_job.py b/tests/unit/test_job.py index c89cad749..23991b9ec 100644 --- a/tests/unit/test_job.py +++ b/tests/unit/test_job.py @@ -34,9 +34,9 @@ except ImportError: # pragma: NO COVER pyarrow = None try: - from google.cloud import bigquery_storage_v1beta1 + from google.cloud import bigquery_storage_v1 except (ImportError, AttributeError): # pragma: NO COVER - bigquery_storage_v1beta1 = None + bigquery_storage_v1 = None try: from tqdm import tqdm except (ImportError, AttributeError): # pragma: NO COVER @@ -5437,7 +5437,7 @@ def test_to_dataframe_ddl_query(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_to_dataframe_bqstorage(self): query_resource = { @@ -5455,10 +5455,8 @@ def test_to_dataframe_bqstorage(self): client = _make_client(self.PROJECT, connection=connection) resource = self._make_resource(ended=True) job = self._get_target_class().from_api_repr(resource, client) - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) - session = bigquery_storage_v1beta1.types.ReadSession() + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) + session = bigquery_storage_v1.types.ReadSession() session.avro_schema.schema = json.dumps( { "type": "record", @@ -5473,13 +5471,17 @@ def test_to_dataframe_bqstorage(self): job.to_dataframe(bqstorage_client=bqstorage_client) + destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format( + **resource["configuration"]["query"]["destinationTable"] + ) + expected_session = bigquery_storage_v1.types.ReadSession( + table=destination_table, + data_format=bigquery_storage_v1.enums.DataFormat.ARROW, + ) bqstorage_client.create_read_session.assert_called_once_with( - mock.ANY, - "projects/{}".format(self.PROJECT), - format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, - read_options=mock.ANY, - # Use default number of streams for best performance. - requested_streams=0, + parent="projects/{}".format(self.PROJECT), + read_session=expected_session, + max_stream_count=0, # Use default number of streams for best performance. ) @unittest.skipIf(pandas is None, "Requires `pandas`") @@ -5949,7 +5951,7 @@ def test__contains_order_by(query, expected): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") @pytest.mark.skipif( - bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, reason="Requires `google-cloud-bigquery-storage`" ) @pytest.mark.parametrize( "query", @@ -5985,10 +5987,8 @@ def test_to_dataframe_bqstorage_preserve_order(query): connection = _make_connection(get_query_results_resource, job_resource) client = _make_client(connection=connection) job = target_class.from_api_repr(job_resource, client) - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) - session = bigquery_storage_v1beta1.types.ReadSession() + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) + session = bigquery_storage_v1.types.ReadSession() session.avro_schema.schema = json.dumps( { "type": "record", @@ -6003,11 +6003,14 @@ def test_to_dataframe_bqstorage_preserve_order(query): job.to_dataframe(bqstorage_client=bqstorage_client) + destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format( + **job_resource["configuration"]["query"]["destinationTable"] + ) + expected_session = bigquery_storage_v1.types.ReadSession( + table=destination_table, data_format=bigquery_storage_v1.enums.DataFormat.ARROW, + ) bqstorage_client.create_read_session.assert_called_once_with( - mock.ANY, - "projects/test-project", - format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, - read_options=mock.ANY, - # Use a single stream to preserve row order. - requested_streams=1, + parent="projects/test-project", + read_session=expected_session, + max_stream_count=1, # Use a single stream to preserve row order. ) diff --git a/tests/unit/test_magics.py b/tests/unit/test_magics.py index fd9d1d700..a42592e3c 100644 --- a/tests/unit/test_magics.py +++ b/tests/unit/test_magics.py @@ -15,6 +15,7 @@ import copy import re from concurrent import futures +import warnings import mock import pytest @@ -36,9 +37,9 @@ import google.auth.credentials try: - from google.cloud import bigquery_storage_v1beta1 + from google.cloud import bigquery_storage_v1 except ImportError: # pragma: NO COVER - bigquery_storage_v1beta1 = None + bigquery_storage_v1 = None from google.cloud import bigquery from google.cloud.bigquery import job from google.cloud.bigquery import table @@ -74,8 +75,8 @@ def missing_bq_storage(): def fail_if(name, globals, locals, fromlist, level): # NOTE: *very* simplified, assuming a straightforward absolute import - return "bigquery_storage_v1beta1" in name or ( - fromlist is not None and "bigquery_storage_v1beta1" in fromlist + return "bigquery_storage_v1" in name or ( + fromlist is not None and "bigquery_storage_v1" in fromlist ) return maybe_fail_import(predicate=fail_if) @@ -305,14 +306,14 @@ def test__make_bqstorage_client_false(): @pytest.mark.skipif( - bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, reason="Requires `google-cloud-bigquery-storage`" ) def test__make_bqstorage_client_true(): credentials_mock = mock.create_autospec( google.auth.credentials.Credentials, instance=True ) got = magics._make_bqstorage_client(True, credentials_mock) - assert isinstance(got, bigquery_storage_v1beta1.BigQueryStorageClient) + assert isinstance(got, bigquery_storage_v1.BigQueryReadClient) def test__make_bqstorage_client_true_raises_import_error(missing_bq_storage): @@ -329,7 +330,7 @@ def test__make_bqstorage_client_true_raises_import_error(missing_bq_storage): @pytest.mark.skipif( - bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, reason="Requires `google-cloud-bigquery-storage`" ) @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") def test__make_bqstorage_client_true_missing_gapic(missing_grpcio_lib): @@ -386,13 +387,31 @@ def test_extension_load(): @pytest.mark.usefixtures("ipython_interactive") @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -def test_bigquery_magic_without_optional_arguments(missing_bq_storage): +@pytest.mark.skipif( + bigquery_storage_v1 is None, reason="Requires `google-cloud-bigquery-storage`" +) +def test_bigquery_magic_without_optional_arguments(monkeypatch): ip = IPython.get_ipython() ip.extension_manager.load_extension("google.cloud.bigquery") - magics.context.credentials = mock.create_autospec( + mock_credentials = mock.create_autospec( google.auth.credentials.Credentials, instance=True ) + # Set up the context with monkeypatch so that it's reset for subsequent + # tests. + monkeypatch.setattr(magics.context, "credentials", mock_credentials) + + # Mock out the BigQuery Storage API. + bqstorage_mock = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) + bqstorage_instance_mock = mock.create_autospec( + bigquery_storage_v1.BigQueryReadClient, instance=True + ) + bqstorage_instance_mock.transport = mock.Mock() + bqstorage_mock.return_value = bqstorage_instance_mock + bqstorage_client_patch = mock.patch( + "google.cloud.bigquery_storage_v1.BigQueryReadClient", bqstorage_mock + ) + sql = "SELECT 17 AS num" result = pandas.DataFrame([17], columns=["num"]) run_query_patch = mock.patch( @@ -403,11 +422,11 @@ def test_bigquery_magic_without_optional_arguments(missing_bq_storage): ) query_job_mock.to_dataframe.return_value = result - # Shouldn't fail when BigQuery Storage client isn't installed. - with run_query_patch as run_query_mock, missing_bq_storage: + with run_query_patch as run_query_mock, bqstorage_client_patch: run_query_mock.return_value = query_job_mock return_value = ip.run_cell_magic("bigquery", "", sql) + assert bqstorage_mock.called # BQ storage client was used assert isinstance(return_value, pandas.DataFrame) assert len(return_value) == len(result) # verify row count assert list(return_value) == list(result) # verify column names @@ -530,7 +549,7 @@ def test_bigquery_magic_clears_display_in_verbose_mode(): @pytest.mark.usefixtures("ipython_interactive") @pytest.mark.skipif( - bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, reason="Requires `google-cloud-bigquery-storage`" ) def test_bigquery_magic_with_bqstorage_from_argument(monkeypatch): ip = IPython.get_ipython() @@ -542,19 +561,16 @@ def test_bigquery_magic_with_bqstorage_from_argument(monkeypatch): # Set up the context with monkeypatch so that it's reset for subsequent # tests. monkeypatch.setattr(magics.context, "credentials", mock_credentials) - monkeypatch.setattr(magics.context, "use_bqstorage_api", False) # Mock out the BigQuery Storage API. - bqstorage_mock = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + bqstorage_mock = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) bqstorage_instance_mock = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient, instance=True + bigquery_storage_v1.BigQueryReadClient, instance=True ) bqstorage_instance_mock.transport = mock.Mock() bqstorage_mock.return_value = bqstorage_instance_mock bqstorage_client_patch = mock.patch( - "google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient", bqstorage_mock + "google.cloud.bigquery_storage_v1.BigQueryReadClient", bqstorage_mock ) sql = "SELECT 17 AS num" @@ -566,67 +582,20 @@ def test_bigquery_magic_with_bqstorage_from_argument(monkeypatch): google.cloud.bigquery.job.QueryJob, instance=True ) query_job_mock.to_dataframe.return_value = result - with run_query_patch as run_query_mock, bqstorage_client_patch: + with run_query_patch as run_query_mock, bqstorage_client_patch, warnings.catch_warnings( + record=True + ) as warned: run_query_mock.return_value = query_job_mock return_value = ip.run_cell_magic("bigquery", "--use_bqstorage_api", sql) - assert len(bqstorage_mock.call_args_list) == 1 - kwargs = bqstorage_mock.call_args_list[0].kwargs - assert kwargs.get("credentials") is mock_credentials - client_info = kwargs.get("client_info") - assert client_info is not None - assert client_info.user_agent == "ipython-" + IPython.__version__ - - query_job_mock.to_dataframe.assert_called_once_with( - bqstorage_client=bqstorage_instance_mock - ) - - assert isinstance(return_value, pandas.DataFrame) - - -@pytest.mark.usefixtures("ipython_interactive") -@pytest.mark.skipif( - bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`" -) -def test_bigquery_magic_with_bqstorage_from_context(monkeypatch): - ip = IPython.get_ipython() - ip.extension_manager.load_extension("google.cloud.bigquery") - mock_credentials = mock.create_autospec( - google.auth.credentials.Credentials, instance=True - ) - - # Set up the context with monkeypatch so that it's reset for subsequent - # tests. - monkeypatch.setattr(magics.context, "credentials", mock_credentials) - monkeypatch.setattr(magics.context, "use_bqstorage_api", True) + # Deprecation warning should have been issued. + def warning_match(warning): + message = str(warning).lower() + return "deprecated" in message and "use_bqstorage_api" in message - # Mock out the BigQuery Storage API. - bqstorage_mock = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) - bqstorage_instance_mock = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient, instance=True - ) - bqstorage_instance_mock.transport = mock.Mock() - bqstorage_mock.return_value = bqstorage_instance_mock - bqstorage_client_patch = mock.patch( - "google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient", bqstorage_mock - ) - - sql = "SELECT 17 AS num" - result = pandas.DataFrame([17], columns=["num"]) - run_query_patch = mock.patch( - "google.cloud.bigquery.magics._run_query", autospec=True - ) - query_job_mock = mock.create_autospec( - google.cloud.bigquery.job.QueryJob, instance=True - ) - query_job_mock.to_dataframe.return_value = result - with run_query_patch as run_query_mock, bqstorage_client_patch: - run_query_mock.return_value = query_job_mock - - return_value = ip.run_cell_magic("bigquery", "", sql) + expected_warnings = list(filter(warning_match, warned)) + assert len(expected_warnings) == 1 assert len(bqstorage_mock.call_args_list) == 1 kwargs = bqstorage_mock.call_args_list[0].kwargs @@ -644,9 +613,9 @@ def test_bigquery_magic_with_bqstorage_from_context(monkeypatch): @pytest.mark.usefixtures("ipython_interactive") @pytest.mark.skipif( - bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, reason="Requires `google-cloud-bigquery-storage`" ) -def test_bigquery_magic_without_bqstorage(monkeypatch): +def test_bigquery_magic_with_rest_client_requested(monkeypatch): ip = IPython.get_ipython() ip.extension_manager.load_extension("google.cloud.bigquery") mock_credentials = mock.create_autospec( @@ -658,11 +627,9 @@ def test_bigquery_magic_without_bqstorage(monkeypatch): monkeypatch.setattr(magics.context, "credentials", mock_credentials) # Mock out the BigQuery Storage API. - bqstorage_mock = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + bqstorage_mock = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) bqstorage_client_patch = mock.patch( - "google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient", bqstorage_mock + "google.cloud.bigquery_storage_v1.BigQueryReadClient", bqstorage_mock ) sql = "SELECT 17 AS num" @@ -677,7 +644,7 @@ def test_bigquery_magic_without_bqstorage(monkeypatch): with run_query_patch as run_query_mock, bqstorage_client_patch: run_query_mock.return_value = query_job_mock - return_value = ip.run_cell_magic("bigquery", "", sql) + return_value = ip.run_cell_magic("bigquery", "--use_rest_api", sql) bqstorage_mock.assert_not_called() query_job_mock.to_dataframe.assert_called_once_with(bqstorage_client=None) @@ -855,7 +822,7 @@ def test_bigquery_magic_w_table_id_and_destination_var(): @pytest.mark.usefixtures("ipython_interactive") @pytest.mark.skipif( - bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, reason="Requires `google-cloud-bigquery-storage`" ) @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") def test_bigquery_magic_w_table_id_and_bqstorage_client(): @@ -878,16 +845,14 @@ def test_bigquery_magic_w_table_id_and_bqstorage_client(): "google.cloud.bigquery.magics.bigquery.Client", autospec=True ) - bqstorage_mock = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + bqstorage_mock = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) bqstorage_instance_mock = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient, instance=True + bigquery_storage_v1.BigQueryReadClient, instance=True ) bqstorage_instance_mock.transport = mock.Mock() bqstorage_mock.return_value = bqstorage_instance_mock bqstorage_client_patch = mock.patch( - "google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient", bqstorage_mock + "google.cloud.bigquery_storage_v1.BigQueryReadClient", bqstorage_mock ) table_id = "bigquery-public-data.samples.shakespeare" @@ -895,7 +860,7 @@ def test_bigquery_magic_w_table_id_and_bqstorage_client(): with default_patch, client_patch as client_mock, bqstorage_client_patch: client_mock().list_rows.return_value = row_iterator_mock - ip.run_cell_magic("bigquery", "--use_bqstorage_api --max_results=5", table_id) + ip.run_cell_magic("bigquery", "--max_results=5", table_id) row_iterator_mock.to_dataframe.assert_called_once_with( bqstorage_client=bqstorage_instance_mock ) diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index 72275fc53..cbce25b00 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -25,13 +25,19 @@ import google.api_core.exceptions try: + from google.cloud import bigquery_storage_v1 from google.cloud import bigquery_storage_v1beta1 + from google.cloud.bigquery_storage_v1.gapic.transports import ( + big_query_read_grpc_transport, + ) from google.cloud.bigquery_storage_v1beta1.gapic.transports import ( - big_query_storage_grpc_transport, + big_query_storage_grpc_transport as big_query_storage_grpc_transport_v1beta1, ) except ImportError: # pragma: NO COVER + bigquery_storage_v1 = None bigquery_storage_v1beta1 = None - big_query_storage_grpc_transport = None + big_query_read_grpc_transport = None + big_query_storage_grpc_transport_v1beta1 = None try: import pandas @@ -1492,7 +1498,7 @@ def test_to_dataframe_error_if_pandas_is_none(self): @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe(self): row_iterator = self._make_one() - df = row_iterator.to_dataframe() + df = row_iterator.to_dataframe(create_bqstorage_client=False) self.assertIsInstance(df, pandas.DataFrame) self.assertEqual(len(df), 0) # verify the number of rows @@ -1687,7 +1693,7 @@ def test_to_arrow(self): api_request = mock.Mock(return_value={"rows": rows}) row_iterator = self._make_one(_mock_client(), api_request, path, schema) - tbl = row_iterator.to_arrow() + tbl = row_iterator.to_arrow(create_bqstorage_client=False) self.assertIsInstance(tbl, pyarrow.Table) self.assertEqual(tbl.num_rows, 2) @@ -1737,7 +1743,7 @@ def test_to_arrow_w_nulls(self): api_request = mock.Mock(return_value={"rows": rows}) row_iterator = self._make_one(_mock_client(), api_request, path, schema) - tbl = row_iterator.to_arrow() + tbl = row_iterator.to_arrow(create_bqstorage_client=False) self.assertIsInstance(tbl, pyarrow.Table) self.assertEqual(tbl.num_rows, 4) @@ -1772,7 +1778,7 @@ def test_to_arrow_w_unknown_type(self): api_request = mock.Mock(return_value={"rows": rows}) row_iterator = self._make_one(_mock_client(), api_request, path, schema) - tbl = row_iterator.to_arrow() + tbl = row_iterator.to_arrow(create_bqstorage_client=False) self.assertIsInstance(tbl, pyarrow.Table) self.assertEqual(tbl.num_rows, 2) @@ -1815,7 +1821,7 @@ def test_to_arrow_w_empty_table(self): api_request = mock.Mock(return_value={"rows": rows}) row_iterator = self._make_one(_mock_client(), api_request, path, schema) - tbl = row_iterator.to_arrow() + tbl = row_iterator.to_arrow(create_bqstorage_client=False) self.assertIsInstance(tbl, pyarrow.Table) self.assertEqual(tbl.num_rows, 0) @@ -1834,7 +1840,7 @@ def test_to_arrow_w_empty_table(self): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_to_arrow_max_results_w_create_bqstorage_warning(self): from google.cloud.bigquery.schema import SchemaField @@ -1874,25 +1880,23 @@ def test_to_arrow_max_results_w_create_bqstorage_warning(self): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_to_arrow_w_bqstorage(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut - from google.cloud.bigquery_storage_v1beta1 import reader + from google.cloud.bigquery_storage_v1 import reader - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) bqstorage_client.transport = mock.create_autospec( - big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + big_query_read_grpc_transport.BigQueryReadGrpcTransport ) streams = [ # Use two streams we want to check frames are read from each stream. {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, ] - session = bigquery_storage_v1beta1.types.ReadSession(streams=streams) + session = bigquery_storage_v1.types.ReadSession(streams=streams) arrow_schema = pyarrow.schema( [ pyarrow.field("colA", pyarrow.int64()), @@ -1957,21 +1961,19 @@ def test_to_arrow_w_bqstorage(self): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_to_arrow_w_bqstorage_creates_client(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut mock_client = _mock_client() - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) bqstorage_client.transport = mock.create_autospec( - big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + big_query_read_grpc_transport.BigQueryReadGrpcTransport ) mock_client._create_bqstorage_client.return_value = bqstorage_client - session = bigquery_storage_v1beta1.types.ReadSession() + session = bigquery_storage_v1.types.ReadSession() bqstorage_client.create_read_session.return_value = session row_iterator = mut.RowIterator( mock_client, @@ -1990,16 +1992,14 @@ def test_to_arrow_w_bqstorage_creates_client(self): @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_to_arrow_w_bqstorage_no_streams(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) - session = bigquery_storage_v1beta1.types.ReadSession() + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) + session = bigquery_storage_v1.types.ReadSession() arrow_schema = pyarrow.schema( [ pyarrow.field("colA", pyarrow.string()), @@ -2059,7 +2059,9 @@ def test_to_arrow_progress_bar(self, tqdm_mock, tqdm_notebook_mock, tqdm_gui_moc for progress_bar_type, progress_bar_mock in progress_bars: row_iterator = self._make_one(_mock_client(), api_request, path, schema) - tbl = row_iterator.to_arrow(progress_bar_type=progress_bar_type) + tbl = row_iterator.to_arrow( + progress_bar_type=progress_bar_type, create_bqstorage_client=False, + ) progress_bar_mock.assert_called() progress_bar_mock().update.assert_called() @@ -2122,13 +2124,13 @@ def test_to_dataframe_iterable(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_dataframe_iterable_w_bqstorage(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut - from google.cloud.bigquery_storage_v1beta1 import reader + from google.cloud.bigquery_storage_v1 import reader arrow_fields = [ pyarrow.field("colA", pyarrow.int64()), @@ -2138,18 +2140,16 @@ def test_to_dataframe_iterable_w_bqstorage(self): ] arrow_schema = pyarrow.schema(arrow_fields) - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) bqstorage_client.transport = mock.create_autospec( - big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + big_query_read_grpc_transport.BigQueryReadGrpcTransport ) streams = [ # Use two streams we want to check frames are read from each stream. {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, ] - session = bigquery_storage_v1beta1.types.ReadSession( + session = bigquery_storage_v1.types.ReadSession( streams=streams, arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, ) @@ -2231,7 +2231,7 @@ def test_to_dataframe(self): api_request = mock.Mock(return_value={"rows": rows}) row_iterator = self._make_one(_mock_client(), api_request, path, schema) - df = row_iterator.to_dataframe() + df = row_iterator.to_dataframe(create_bqstorage_client=False) self.assertIsInstance(df, pandas.DataFrame) self.assertEqual(len(df), 4) # verify the number of rows @@ -2302,7 +2302,9 @@ def test_to_dataframe_progress_bar( for progress_bar_type, progress_bar_mock in progress_bars: row_iterator = self._make_one(_mock_client(), api_request, path, schema) - df = row_iterator.to_dataframe(progress_bar_type=progress_bar_type) + df = row_iterator.to_dataframe( + progress_bar_type=progress_bar_type, create_bqstorage_client=False, + ) progress_bar_mock.assert_called() progress_bar_mock().update.assert_called() @@ -2368,7 +2370,7 @@ def test_to_dataframe_no_tqdm_no_progress_bar(self): row_iterator = self._make_one(_mock_client(), api_request, path, schema) with warnings.catch_warnings(record=True) as warned: - df = row_iterator.to_dataframe() + df = row_iterator.to_dataframe(create_bqstorage_client=False) self.assertEqual(len(warned), 0) self.assertEqual(len(df), 4) @@ -2393,7 +2395,9 @@ def test_to_dataframe_no_tqdm(self): row_iterator = self._make_one(_mock_client(), api_request, path, schema) with warnings.catch_warnings(record=True) as warned: - df = row_iterator.to_dataframe(progress_bar_type="tqdm") + df = row_iterator.to_dataframe( + progress_bar_type="tqdm", create_bqstorage_client=False, + ) self.assertEqual(len(warned), 1) for warning in warned: @@ -2428,7 +2432,9 @@ def test_to_dataframe_tqdm_error(self): row_iterator = self._make_one(_mock_client(), api_request, path, schema) with warnings.catch_warnings(record=True) as warned: - df = row_iterator.to_dataframe(progress_bar_type=progress_bar_type) + df = row_iterator.to_dataframe( + progress_bar_type=progress_bar_type, create_bqstorage_client=False, + ) self.assertEqual(len(df), 4) # all should be well @@ -2448,7 +2454,7 @@ def test_to_dataframe_w_empty_results(self): api_request = mock.Mock(return_value={"rows": []}) row_iterator = self._make_one(_mock_client(), api_request, schema=schema) - df = row_iterator.to_dataframe() + df = row_iterator.to_dataframe(create_bqstorage_client=False) self.assertIsInstance(df, pandas.DataFrame) self.assertEqual(len(df), 0) # verify the number of rows @@ -2506,7 +2512,7 @@ def test_to_dataframe_logs_tabledata_list(self): ) with mock.patch("google.cloud.bigquery.table._LOGGER", mock_logger): - row_iterator.to_dataframe() + row_iterator.to_dataframe(create_bqstorage_client=False) mock_logger.debug.assert_any_call( "Started reading table 'debug-proj.debug_dset.debug_tbl' with tabledata.list." @@ -2536,7 +2542,7 @@ def test_to_dataframe_w_various_types_nullable(self): api_request = mock.Mock(return_value={"rows": rows}) row_iterator = self._make_one(_mock_client(), api_request, path, schema) - df = row_iterator.to_dataframe() + df = row_iterator.to_dataframe(create_bqstorage_client=False) self.assertIsInstance(df, pandas.DataFrame) self.assertEqual(len(df), 4) # verify the number of rows @@ -2576,7 +2582,9 @@ def test_to_dataframe_column_dtypes(self): api_request = mock.Mock(return_value={"rows": rows}) row_iterator = self._make_one(_mock_client(), api_request, path, schema) - df = row_iterator.to_dataframe(dtypes={"km": "float16"}) + df = row_iterator.to_dataframe( + dtypes={"km": "float16"}, create_bqstorage_client=False, + ) self.assertIsInstance(df, pandas.DataFrame) self.assertEqual(len(df), 3) # verify the number of rows @@ -2685,21 +2693,19 @@ def test_to_dataframe_max_results_w_create_bqstorage_warning(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_to_dataframe_w_bqstorage_creates_client(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut mock_client = _mock_client() - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) bqstorage_client.transport = mock.create_autospec( - big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + big_query_read_grpc_transport.BigQueryReadGrpcTransport ) mock_client._create_bqstorage_client.return_value = bqstorage_client - session = bigquery_storage_v1beta1.types.ReadSession() + session = bigquery_storage_v1.types.ReadSession() bqstorage_client.create_read_session.return_value = session row_iterator = mut.RowIterator( mock_client, @@ -2718,12 +2724,41 @@ def test_to_dataframe_w_bqstorage_creates_client(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_to_dataframe_w_bqstorage_no_streams(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) + session = bigquery_storage_v1.types.ReadSession() + bqstorage_client.create_read_session.return_value = session + + row_iterator = mut.RowIterator( + _mock_client(), + api_request=None, + path=None, + schema=[ + schema.SchemaField("colA", "INTEGER"), + schema.SchemaField("colC", "FLOAT"), + schema.SchemaField("colB", "STRING"), + ], + table=mut.TableReference.from_string("proj.dset.tbl"), + ) + + got = row_iterator.to_dataframe(bqstorage_client) + column_names = ["colA", "colC", "colB"] + self.assertEqual(list(got), column_names) + self.assertTrue(got.empty) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_dataframe_w_bqstorage_v1beta1_no_streams(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) @@ -2748,17 +2783,15 @@ def test_to_dataframe_w_bqstorage_no_streams(self): self.assertTrue(got.empty) @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_dataframe_w_bqstorage_logs_session(self): from google.cloud.bigquery.table import Table - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) - session = bigquery_storage_v1beta1.types.ReadSession() + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) + session = bigquery_storage_v1.types.ReadSession() session.name = "projects/test-proj/locations/us/sessions/SOMESESSION" bqstorage_client.create_read_session.return_value = session mock_logger = mock.create_autospec(logging.Logger) @@ -2776,13 +2809,13 @@ def test_to_dataframe_w_bqstorage_logs_session(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_dataframe_w_bqstorage_empty_streams(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut - from google.cloud.bigquery_storage_v1beta1 import reader + from google.cloud.bigquery_storage_v1 import reader arrow_fields = [ pyarrow.field("colA", pyarrow.int64()), @@ -2792,10 +2825,8 @@ def test_to_dataframe_w_bqstorage_empty_streams(self): ] arrow_schema = pyarrow.schema(arrow_fields) - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) - session = bigquery_storage_v1beta1.types.ReadSession( + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) + session = bigquery_storage_v1.types.ReadSession( streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}], arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, ) @@ -2833,10 +2864,90 @@ def test_to_dataframe_w_bqstorage_empty_streams(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_dataframe_w_bqstorage_nonempty(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud.bigquery_storage_v1 import reader + + arrow_fields = [ + pyarrow.field("colA", pyarrow.int64()), + # Not alphabetical to test column order. + pyarrow.field("colC", pyarrow.float64()), + pyarrow.field("colB", pyarrow.utf8()), + ] + arrow_schema = pyarrow.schema(arrow_fields) + + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) + bqstorage_client.transport = mock.create_autospec( + big_query_read_grpc_transport.BigQueryReadGrpcTransport + ) + streams = [ + # Use two streams we want to check frames are read from each stream. + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, + ] + session = bigquery_storage_v1.types.ReadSession( + streams=streams, + arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, + ) + bqstorage_client.create_read_session.return_value = session + + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) + bqstorage_client.read_rows.return_value = mock_rowstream + + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_rowstream.rows.return_value = mock_rows + page_items = [ + pyarrow.array([1, -1]), + pyarrow.array([2.0, 4.0]), + pyarrow.array(["abc", "def"]), + ] + page_record_batch = pyarrow.RecordBatch.from_arrays( + page_items, schema=arrow_schema + ) + mock_page = mock.create_autospec(reader.ReadRowsPage) + mock_page.to_arrow.return_value = page_record_batch + mock_pages = (mock_page, mock_page, mock_page) + type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) + + schema = [ + schema.SchemaField("colA", "IGNORED"), + schema.SchemaField("colC", "IGNORED"), + schema.SchemaField("colB", "IGNORED"), + ] + + row_iterator = mut.RowIterator( + _mock_client(), + None, # api_request: ignored + None, # path: ignored + schema, + table=mut.TableReference.from_string("proj.dset.tbl"), + selected_fields=schema, + ) + + got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) + + # Are the columns in the expected order? + column_names = ["colA", "colC", "colB"] + self.assertEqual(list(got), column_names) + + # Have expected number of rows? + total_pages = len(streams) * len(mock_pages) + total_rows = len(page_items[0]) * total_pages + self.assertEqual(len(got.index), total_rows) + + # Don't close the client if it was passed in. + bqstorage_client.transport.channel.close.assert_not_called() + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + def test_to_dataframe_w_bqstorage_v1beta1_nonempty(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut from google.cloud.bigquery_storage_v1beta1 import reader @@ -2853,7 +2964,7 @@ def test_to_dataframe_w_bqstorage_nonempty(self): bigquery_storage_v1beta1.BigQueryStorageClient ) bqstorage_client.transport = mock.create_autospec( - big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + big_query_storage_grpc_transport_v1beta1.BigQueryStorageGrpcTransport ) streams = [ # Use two streams we want to check frames are read from each stream. @@ -2899,7 +3010,17 @@ def test_to_dataframe_w_bqstorage_nonempty(self): selected_fields=schema, ) - got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) + with warnings.catch_warnings(record=True) as warned: + got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) + + # Was a deprecation warning emitted? + expected_warnings = [ + warning + for warning in warned + if issubclass(warning.category, DeprecationWarning) + and "v1beta1" in str(warning) + ] + self.assertEqual(len(expected_warnings), 1, "Deprecation warning not raised.") # Are the columns in the expected order? column_names = ["colA", "colC", "colB"] @@ -2915,13 +3036,13 @@ def test_to_dataframe_w_bqstorage_nonempty(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut - from google.cloud.bigquery_storage_v1beta1 import reader + from google.cloud.bigquery_storage_v1 import reader arrow_fields = [pyarrow.field("colA", pyarrow.int64())] arrow_schema = pyarrow.schema(arrow_fields) @@ -2930,14 +3051,12 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self): {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, ] - session = bigquery_storage_v1beta1.types.ReadSession( + session = bigquery_storage_v1.types.ReadSession( streams=streams, arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, ) - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) bqstorage_client.create_read_session.return_value = session mock_rowstream = mock.create_autospec(reader.ReadRowsStream) @@ -2971,7 +3090,7 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) @unittest.skipIf(tqdm is None, "Requires `tqdm`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @@ -2979,7 +3098,7 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self): def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut - from google.cloud.bigquery_storage_v1beta1 import reader + from google.cloud.bigquery_storage_v1 import reader # Speed up testing. mut._PROGRESS_INTERVAL = 0.01 @@ -2987,16 +3106,14 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock): arrow_fields = [pyarrow.field("testcol", pyarrow.int64())] arrow_schema = pyarrow.schema(arrow_fields) - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) streams = [ # Use two streams we want to check that progress bar updates are # sent from each stream. {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, ] - session = bigquery_storage_v1beta1.types.ReadSession( + session = bigquery_storage_v1.types.ReadSession( streams=streams, arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, ) @@ -3052,13 +3169,13 @@ def blocking_to_arrow(*args, **kwargs): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut - from google.cloud.bigquery_storage_v1beta1 import reader + from google.cloud.bigquery_storage_v1 import reader # Speed up testing. mut._PROGRESS_INTERVAL = 0.01 @@ -3071,16 +3188,15 @@ def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self): ] arrow_schema = pyarrow.schema(arrow_fields) - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) - session = bigquery_storage_v1beta1.types.ReadSession( + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) + session = bigquery_storage_v1.types.ReadSession( streams=[ - # Use two streams because one will fail with a - # KeyboardInterrupt, and we want to check that the other stream + # Use multiple streams because one will fail with a + # KeyboardInterrupt, and we want to check that the other streams # ends early. {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/9999"}, ], arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, ) @@ -3112,6 +3228,7 @@ def blocking_to_arrow(*args, **kwargs): mock_cancelled_rowstream.rows.return_value = mock_cancelled_rows bqstorage_client.read_rows.side_effect = ( + mock_rowstream, mock_cancelled_rowstream, mock_rowstream, ) @@ -3140,15 +3257,13 @@ def blocking_to_arrow(*args, **kwargs): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_to_dataframe_w_bqstorage_fallback_to_tabledata_list(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) bqstorage_client.create_read_session.side_effect = google.api_core.exceptions.InternalServerError( "can't read with bqstorage_client" ) @@ -3201,7 +3316,9 @@ def test_to_dataframe_tabledata_list_w_multiple_pages_return_unique_index(self): table=mut.Table("proj.dset.tbl"), ) - df = row_iterator.to_dataframe(bqstorage_client=None) + df = row_iterator.to_dataframe( + bqstorage_client=None, create_bqstorage_client=False, + ) self.assertIsInstance(df, pandas.DataFrame) self.assertEqual(len(df), 2) @@ -3211,14 +3328,12 @@ def test_to_dataframe_tabledata_list_w_multiple_pages_return_unique_index(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_to_dataframe_w_bqstorage_raises_auth_error(self): from google.cloud.bigquery import table as mut - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) bqstorage_client.create_read_session.side_effect = google.api_core.exceptions.Forbidden( "TEST BigQuery Storage API not enabled. TEST" ) @@ -3231,38 +3346,14 @@ def test_to_dataframe_w_bqstorage_raises_auth_error(self): with pytest.raises(google.api_core.exceptions.Forbidden): row_iterator.to_dataframe(bqstorage_client=bqstorage_client) - @unittest.skipIf(pandas is None, "Requires `pandas`") - @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" - ) - def test_to_dataframe_w_bqstorage_raises_import_error(self): - from google.cloud.bigquery import table as mut - - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) - path = "/foo" - api_request = mock.Mock(return_value={"rows": []}) - row_iterator = mut.RowIterator( - _mock_client(), api_request, path, [], table=mut.Table("proj.dset.tbl") - ) - - with mock.patch.object(mut, "bigquery_storage_v1beta1", None), pytest.raises( - ValueError - ) as exc_context: - row_iterator.to_dataframe(bqstorage_client=bqstorage_client) - assert mut._NO_BQSTORAGE_ERROR in str(exc_context.value) - @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_to_dataframe_w_bqstorage_partition(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) row_iterator = mut.RowIterator( _mock_client(), @@ -3276,15 +3367,13 @@ def test_to_dataframe_w_bqstorage_partition(self): row_iterator.to_dataframe(bqstorage_client) @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) def test_to_dataframe_w_bqstorage_snapshot(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) row_iterator = mut.RowIterator( _mock_client(), @@ -3299,13 +3388,13 @@ def test_to_dataframe_w_bqstorage_snapshot(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( - bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`" ) @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_dataframe_concat_categorical_dtype_w_pyarrow(self): from google.cloud.bigquery import schema from google.cloud.bigquery import table as mut - from google.cloud.bigquery_storage_v1beta1 import reader + from google.cloud.bigquery_storage_v1 import reader arrow_fields = [ # Not alphabetical to test column order. @@ -3318,13 +3407,11 @@ def test_to_dataframe_concat_categorical_dtype_w_pyarrow(self): arrow_schema = pyarrow.schema(arrow_fields) # create a mock BQ storage client - bqstorage_client = mock.create_autospec( - bigquery_storage_v1beta1.BigQueryStorageClient - ) + bqstorage_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient) bqstorage_client.transport = mock.create_autospec( - big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + big_query_read_grpc_transport.BigQueryReadGrpcTransport ) - session = bigquery_storage_v1beta1.types.ReadSession( + session = bigquery_storage_v1.types.ReadSession( streams=[{"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}], arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()}, ) @@ -3761,10 +3848,32 @@ def test_set_expiration_w_none(self): assert time_partitioning._properties["expirationMs"] is None +@pytest.mark.skipif( + bigquery_storage_v1 is None, reason="Requires `google-cloud-bigquery-storage`" +) +@pytest.mark.parametrize( + "table_path", + ( + "my-project.my_dataset.my_table", + "my-project.my_dataset.my_table$20181225", + "my-project.my_dataset.my_table@1234567890", + "my-project.my_dataset.my_table$20181225@1234567890", + ), +) +def test_table_reference_to_bqstorage_v1_stable(table_path): + from google.cloud.bigquery import table as mut + + expected = "projects/my-project/datasets/my_dataset/tables/my_table" + + for klass in (mut.TableReference, mut.Table, mut.TableListItem): + got = klass.from_string(table_path).to_bqstorage() + assert got == expected + + @pytest.mark.skipif( bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`" ) -def test_table_reference_to_bqstorage(): +def test_table_reference_to_bqstorage_v1beta1(): from google.cloud.bigquery import table as mut # Can't use parametrized pytest because bigquery_storage_v1beta1 may not be @@ -3782,14 +3891,14 @@ def test_table_reference_to_bqstorage(): classes = (mut.TableReference, mut.Table, mut.TableListItem) for case, cls in itertools.product(cases, classes): - got = cls.from_string(case).to_bqstorage() + got = cls.from_string(case).to_bqstorage(v1beta1=True) assert got == expected @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" ) -def test_table_reference_to_bqstorage_raises_import_error(): +def test_table_reference_to_bqstorage_v1beta1_raises_import_error(): from google.cloud.bigquery import table as mut classes = (mut.TableReference, mut.Table, mut.TableListItem) @@ -3797,5 +3906,5 @@ def test_table_reference_to_bqstorage_raises_import_error(): with mock.patch.object(mut, "bigquery_storage_v1beta1", None), pytest.raises( ValueError ) as exc_context: - cls.from_string("my-project.my_dataset.my_table").to_bqstorage() + cls.from_string("my-project.my_dataset.my_table").to_bqstorage(v1beta1=True) assert mut._NO_BQSTORAGE_ERROR in str(exc_context.value)