Skip to content

Commit

Permalink
feat: use BigQuery Storage client by default (#55)
Browse files Browse the repository at this point in the history
* feat: use BigQuery Storage client by default

* Use BQ Storage API by default in cell magic

* Add raise_on_closed helper decorator to DB API

* Use BigQuery Storage API by default in DB API

* Use BQ Storage v1 stable version in main client

* Use BQ Storage v1 stable in BigQuery cell magic

* Use BQ Storage v1 stable in DB API

* Support both v1 stable and beta1 BQ Storage client

* Fix some typos and redundant Beta mark

* Use ARROW as data format in DB API cursor

* feat: add HOUR support for time partitioning interval (#91)

* feat: add HOUR support for time partitioning interval

* Bump BQ storage pin to stable version.

Co-authored-by: shollyman <shollyman@google.com>
  • Loading branch information
plamut and shollyman committed Jun 10, 2020
1 parent 3869e34 commit e75ff82
Show file tree
Hide file tree
Showing 18 changed files with 1,195 additions and 455 deletions.
79 changes: 59 additions & 20 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -22,9 +22,9 @@
from six.moves import queue

try:
from google.cloud import bigquery_storage_v1beta1
from google.cloud import bigquery_storage_v1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None
bigquery_storage_v1 = None

try:
import pandas
Expand Down Expand Up @@ -577,8 +577,19 @@ def _bqstorage_page_to_dataframe(column_names, dtypes, page):
def _download_table_bqstorage_stream(
download_state, bqstorage_client, session, stream, worker_queue, page_to_item
):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)
# Passing a BQ Storage client in implies that the BigQuery Storage library
# is available and can be imported.
from google.cloud import bigquery_storage_v1beta1

# We want to preserve comaptibility with the v1beta1 BQ Storage clients,
# thus adjust constructing the rowstream if needed.
# The assumption is that the caller provides a BQ Storage `session` that is
# compatible with the version of the BQ Storage client passed in.
if isinstance(bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)
else:
rowstream = bqstorage_client.read_rows(stream.name).rows(session)

for page in rowstream.pages:
if download_state.done:
Expand Down Expand Up @@ -610,29 +621,57 @@ def _download_table_bqstorage(
page_to_item=None,
):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""

# Passing a BQ Storage client in implies that the BigQuery Storage library
# is available and can be imported.
from google.cloud import bigquery_storage_v1
from google.cloud import bigquery_storage_v1beta1

if "$" in table.table_id:
raise ValueError(
"Reading from a specific partition is not currently supported."
)
if "@" in table.table_id:
raise ValueError("Reading from a specific snapshot is not currently supported.")

read_options = bigquery_storage_v1beta1.types.TableReadOptions()
if selected_fields is not None:
for field in selected_fields:
read_options.selected_fields.append(field.name)

requested_streams = 0
if preserve_order:
requested_streams = 1

session = bqstorage_client.create_read_session(
table.to_bqstorage(),
"projects/{}".format(project_id),
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
read_options=read_options,
requested_streams=requested_streams,
)
requested_streams = 1 if preserve_order else 0

# We want to preserve comaptibility with the v1beta1 BQ Storage clients,
# thus adjust the session creation if needed.
if isinstance(bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient):
warnings.warn(
"Support for BigQuery Storage v1beta1 clients is deprecated, please "
"consider upgrading the client to BigQuery Storage v1 stable version.",
category=DeprecationWarning,
)
read_options = bigquery_storage_v1beta1.types.TableReadOptions()

if selected_fields is not None:
for field in selected_fields:
read_options.selected_fields.append(field.name)

session = bqstorage_client.create_read_session(
table.to_bqstorage(v1beta1=True),
"projects/{}".format(project_id),
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
read_options=read_options,
requested_streams=requested_streams,
)
else:
requested_session = bigquery_storage_v1.types.ReadSession(
table=table.to_bqstorage(),
data_format=bigquery_storage_v1.enums.DataFormat.ARROW,
)
if selected_fields is not None:
for field in selected_fields:
requested_session.read_options.selected_fields.append(field.name)

session = bqstorage_client.create_read_session(
parent="projects/{}".format(project_id),
read_session=requested_session,
max_stream_count=requested_streams,
)

_LOGGER.debug(
"Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format(
table.project, table.dataset_id, table.table_id, session.name
Expand Down
18 changes: 13 additions & 5 deletions google/cloud/bigquery/client.py
Expand Up @@ -397,15 +397,23 @@ def dataset(self, dataset_id, project=None):
def _create_bqstorage_client(self):
"""Create a BigQuery Storage API client using this client's credentials.
If a client cannot be created due to missing dependencies, raise a
warning and return ``None``.
Returns:
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient:
Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]:
A BigQuery Storage API client.
"""
from google.cloud import bigquery_storage_v1beta1
try:
from google.cloud import bigquery_storage_v1
except ImportError:
warnings.warn(
"Cannot create BigQuery Storage client, the dependency "
"google-cloud-bigquery-storage is not installed."
)
return None

return bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=self._credentials
)
return bigquery_storage_v1.BigQueryReadClient(credentials=self._credentials)

def create_dataset(
self, dataset, exists_ok=False, retry=DEFAULT_RETRY, timeout=None
Expand Down
47 changes: 46 additions & 1 deletion google/cloud/bigquery/dbapi/_helpers.py
Expand Up @@ -19,6 +19,7 @@

import datetime
import decimal
import functools
import numbers

import six
Expand Down Expand Up @@ -233,8 +234,52 @@ def to_bq_table_rows(rows_iterable):
"""

def to_table_row(row):
values = tuple(row.values())
# NOTE: We fetch ARROW values, thus we need to convert them to Python
# objects with as_py().
values = tuple(value.as_py() for value in row.values())
keys_to_index = {key: i for i, key in enumerate(row.keys())}
return table.Row(values, keys_to_index)

return (to_table_row(row_data) for row_data in rows_iterable)


def raise_on_closed(
exc_msg, exc_class=exceptions.ProgrammingError, closed_attr_name="_closed"
):
"""Make public instance methods raise an error if the instance is closed."""

def _raise_on_closed(method):
"""Make a non-static method raise an error if its containing instance is closed.
"""

def with_closed_check(self, *args, **kwargs):
if getattr(self, closed_attr_name):
raise exc_class(exc_msg)
return method(self, *args, **kwargs)

functools.update_wrapper(with_closed_check, method)
return with_closed_check

def decorate_public_methods(klass):
"""Apply ``_raise_on_closed()`` decorator to public instance methods.
"""
for name in dir(klass):
if name.startswith("_"):
continue

member = getattr(klass, name)
if not callable(member):
continue

# We need to check for class/static methods directly in the instance
# __dict__, not via the retrieved attribute (`member`), as the
# latter is already a callable *produced* by one of these descriptors.
if isinstance(klass.__dict__[name], (staticmethod, classmethod)):
continue

member = _raise_on_closed(member)
setattr(klass, name, member)

return klass

return decorate_public_methods
79 changes: 60 additions & 19 deletions google/cloud/bigquery/dbapi/connection.py
Expand Up @@ -14,22 +14,30 @@

"""Connection for the Google BigQuery DB-API."""

import weakref

from google.cloud import bigquery
from google.cloud.bigquery.dbapi import cursor
from google.cloud.bigquery.dbapi import _helpers


@_helpers.raise_on_closed("Operating on a closed connection.")
class Connection(object):
"""DB-API Connection to Google BigQuery.
Args:
client (google.cloud.bigquery.Client): A client used to connect to BigQuery.
client (Optional[google.cloud.bigquery.Client]):
A REST API client used to connect to BigQuery. If not passed, a
client is created using default options inferred from the environment.
bqstorage_client(\
Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \
Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient] \
):
[Beta] An alternative client that uses the faster BigQuery Storage
API to fetch rows from BigQuery. If both clients are given,
``bqstorage_client`` is used first to fetch query results,
with a fallback on ``client``, if necessary.
A client that uses the faster BigQuery Storage API to fetch rows from
BigQuery. If not passed, it is created using the same credentials
as ``client``.
When fetching query results, ``bqstorage_client`` is used first, with
a fallback on ``client``, if necessary.
.. note::
There is a known issue with the BigQuery Storage API with small
Expand All @@ -38,39 +46,74 @@ class Connection(object):
https://github.com/googleapis/python-bigquery-storage/issues/2
"""

def __init__(self, client, bqstorage_client=None):
def __init__(self, client=None, bqstorage_client=None):
if client is None:
client = bigquery.Client()
self._owns_client = True
else:
self._owns_client = False

if bqstorage_client is None:
# A warning is already raised by the factory if instantiation fails.
bqstorage_client = client._create_bqstorage_client()
self._owns_bqstorage_client = bqstorage_client is not None
else:
self._owns_bqstorage_client = False

self._client = client
self._bqstorage_client = bqstorage_client

self._closed = False
self._cursors_created = weakref.WeakSet()

def close(self):
"""No-op."""
"""Close the connection and any cursors created from it.
Any BigQuery clients explicitly passed to the constructor are *not*
closed, only those created by the connection instance itself.
"""
self._closed = True

if self._owns_client:
self._client.close()

if self._owns_bqstorage_client:
# There is no close() on the BQ Storage client itself.
self._bqstorage_client.transport.channel.close()

for cursor_ in self._cursors_created:
cursor_.close()

def commit(self):
"""No-op."""
"""No-op, but for consistency raise an error if connection is closed."""

def cursor(self):
"""Return a new cursor object.
Returns:
google.cloud.bigquery.dbapi.Cursor: A DB-API cursor that uses this connection.
"""
return cursor.Cursor(self)
new_cursor = cursor.Cursor(self)
self._cursors_created.add(new_cursor)
return new_cursor


def connect(client=None, bqstorage_client=None):
"""Construct a DB-API connection to Google BigQuery.
Args:
client (Optional[google.cloud.bigquery.Client]):
A client used to connect to BigQuery. If not passed, a client is
created using default options inferred from the environment.
A REST API client used to connect to BigQuery. If not passed, a
client is created using default options inferred from the environment.
bqstorage_client(\
Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \
Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient] \
):
[Beta] An alternative client that uses the faster BigQuery Storage
API to fetch rows from BigQuery. If both clients are given,
``bqstorage_client`` is used first to fetch query results,
with a fallback on ``client``, if necessary.
A client that uses the faster BigQuery Storage API to fetch rows from
BigQuery. If not passed, it is created using the same credentials
as ``client``.
When fetching query results, ``bqstorage_client`` is used first, with
a fallback on ``client``, if necessary.
.. note::
There is a known issue with the BigQuery Storage API with small
Expand All @@ -81,6 +124,4 @@ def connect(client=None, bqstorage_client=None):
Returns:
google.cloud.bigquery.dbapi.Connection: A new DB-API connection to BigQuery.
"""
if client is None:
client = bigquery.Client()
return Connection(client, bqstorage_client)

0 comments on commit e75ff82

Please sign in to comment.