Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use BigQuery Storage client by default #55

Merged
merged 16 commits into from Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 13 additions & 14 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -22,9 +22,9 @@
from six.moves import queue

try:
from google.cloud import bigquery_storage_v1beta1
from google.cloud import bigquery_storage_v1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None
bigquery_storage_v1 = None

try:
import pandas
Expand Down Expand Up @@ -577,8 +577,7 @@ def _bqstorage_page_to_dataframe(column_names, dtypes, page):
def _download_table_bqstorage_stream(
download_state, bqstorage_client, session, stream, worker_queue, page_to_item
):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)
rowstream = bqstorage_client.read_rows(stream.name).rows(session)
plamut marked this conversation as resolved.
Show resolved Hide resolved

for page in rowstream.pages:
if download_state.done:
Expand Down Expand Up @@ -617,21 +616,21 @@ def _download_table_bqstorage(
if "@" in table.table_id:
raise ValueError("Reading from a specific snapshot is not currently supported.")

read_options = bigquery_storage_v1beta1.types.TableReadOptions()
requested_session = bigquery_storage_v1.types.ReadSession(
table=table.to_bqstorage(),
data_format=bigquery_storage_v1.enums.DataFormat.ARROW,
)

if selected_fields is not None:
for field in selected_fields:
read_options.selected_fields.append(field.name)
requested_session.read_options.selected_fields.append(field.name)

requested_streams = 0
if preserve_order:
requested_streams = 1
requested_streams = 1 if preserve_order else 0

session = bqstorage_client.create_read_session(
table.to_bqstorage(),
"projects/{}".format(project_id),
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
read_options=read_options,
requested_streams=requested_streams,
parent="projects/{}".format(project_id),
read_session=requested_session,
max_stream_count=requested_streams,
)
_LOGGER.debug(
"Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format(
Expand Down
18 changes: 13 additions & 5 deletions google/cloud/bigquery/client.py
Expand Up @@ -396,15 +396,23 @@ def dataset(self, dataset_id, project=None):
def _create_bqstorage_client(self):
"""Create a BigQuery Storage API client using this client's credentials.

If a client cannot be created due to missing dependencies, raise a
warning and return ``None``.

Returns:
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient:
Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]:
A BigQuery Storage API client.
"""
from google.cloud import bigquery_storage_v1beta1
try:
from google.cloud import bigquery_storage_v1
except ImportError:
warnings.warn(
"Cannot create BigQuery Storage client, the dependency "
"google-cloud-bigquery-storage is not installed."
)
return None

return bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=self._credentials
)
return bigquery_storage_v1.BigQueryReadClient(credentials=self._credentials)

def create_dataset(
self, dataset, exists_ok=False, retry=DEFAULT_RETRY, timeout=None
Expand Down
43 changes: 43 additions & 0 deletions google/cloud/bigquery/dbapi/_helpers.py
Expand Up @@ -19,6 +19,7 @@

import datetime
import decimal
import functools
import numbers

import six
Expand Down Expand Up @@ -238,3 +239,45 @@ def to_table_row(row):
return table.Row(values, keys_to_index)

return (to_table_row(row_data) for row_data in rows_iterable)


def raise_on_closed(
exc_msg, exc_class=exceptions.ProgrammingError, closed_attr_name="_closed"
):
"""Make public instance methods raise an error if the instance is closed."""

def _raise_on_closed(method):
"""Make a non-static method raise an error if its containing instance is closed.
"""

def with_closed_check(self, *args, **kwargs):
if getattr(self, closed_attr_name):
raise exc_class(exc_msg)
return method(self, *args, **kwargs)

functools.update_wrapper(with_closed_check, method)
return with_closed_check

def decorate_public_methods(klass):
"""Apply ``_raise_on_closed()`` decorator to public instance methods.
"""
for name in dir(klass):
if name.startswith("_"):
continue

member = getattr(klass, name)
if not callable(member):
continue

# We need to check for class/static methods directly in the instance
# __dict__, not via the retrieved attribute (`member`), as the
# latter is already a callable *produced* by one of these descriptors.
if isinstance(klass.__dict__[name], (staticmethod, classmethod)):
continue

member = _raise_on_closed(member)
setattr(klass, name, member)

return klass

return decorate_public_methods
79 changes: 60 additions & 19 deletions google/cloud/bigquery/dbapi/connection.py
Expand Up @@ -14,22 +14,30 @@

"""Connection for the Google BigQuery DB-API."""

import weakref

from google.cloud import bigquery
from google.cloud.bigquery.dbapi import cursor
from google.cloud.bigquery.dbapi import _helpers


@_helpers.raise_on_closed("Operating on a closed connection.")
class Connection(object):
"""DB-API Connection to Google BigQuery.

Args:
client (google.cloud.bigquery.Client): A client used to connect to BigQuery.
client (Optional[google.cloud.bigquery.Client]):
A REST API client used to connect to BigQuery. If not passed, a
client is created using default options inferred from the environment.
bqstorage_client(\
Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \
Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient] \
):
[Beta] An alternative client that uses the faster BigQuery Storage
API to fetch rows from BigQuery. If both clients are given,
``bqstorage_client`` is used first to fetch query results,
with a fallback on ``client``, if necessary.
A client that uses the faster BigQuery Storage API to fetch rows from
BigQuery. If not passed, it is created using the same credentials
as ``client``.

When fetching query results, ``bqstorage_client`` is used first, with
a fallback on ``client``, if necessary.

.. note::
There is a known issue with the BigQuery Storage API with small
Expand All @@ -38,39 +46,74 @@ class Connection(object):
https://github.com/googleapis/python-bigquery-storage/issues/2
"""

def __init__(self, client, bqstorage_client=None):
def __init__(self, client=None, bqstorage_client=None):
if client is None:
client = bigquery.Client()
self._owns_client = True
else:
self._owns_client = False

if bqstorage_client is None:
# A warning is already raised by the factory if instantiation fails.
bqstorage_client = client._create_bqstorage_client()
self._owns_bqstorage_client = bqstorage_client is not None
else:
self._owns_bqstorage_client = False

self._client = client
self._bqstorage_client = bqstorage_client

self._closed = False
self._cursors_created = weakref.WeakSet()

def close(self):
"""No-op."""
"""Close the connection and any cursors created from it.

Any BigQuery clients explicitly passed to the constructor are *not*
closed, only those created by the connection instance itself.
"""
self._closed = True

if self._owns_client:
self._client.close()

if self._owns_bqstorage_client:
# There is no close() on the BQ Storage client itself.
self._bqstorage_client.transport.channel.close()

for cursor_ in self._cursors_created:
cursor_.close()

def commit(self):
"""No-op."""
"""No-op, but for consistency raise an error if connection is closed."""

def cursor(self):
"""Return a new cursor object.

Returns:
google.cloud.bigquery.dbapi.Cursor: A DB-API cursor that uses this connection.
"""
return cursor.Cursor(self)
new_cursor = cursor.Cursor(self)
self._cursors_created.add(new_cursor)
return new_cursor


def connect(client=None, bqstorage_client=None):
"""Construct a DB-API connection to Google BigQuery.

Args:
client (Optional[google.cloud.bigquery.Client]):
A client used to connect to BigQuery. If not passed, a client is
created using default options inferred from the environment.
A REST API client used to connect to BigQuery. If not passed, a
client is created using default options inferred from the environment.
bqstorage_client(\
Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \
Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient] \
):
[Beta] An alternative client that uses the faster BigQuery Storage
API to fetch rows from BigQuery. If both clients are given,
``bqstorage_client`` is used first to fetch query results,
with a fallback on ``client``, if necessary.
A client that uses the faster BigQuery Storage API to fetch rows from
BigQuery. If not passed, it is created using the same credentials
as ``client``.

When fetching query results, ``bqstorage_client`` is used first, with
a fallback on ``client``, if necessary.

.. note::
There is a known issue with the BigQuery Storage API with small
Expand All @@ -81,6 +124,4 @@ def connect(client=None, bqstorage_client=None):
Returns:
google.cloud.bigquery.dbapi.Connection: A new DB-API connection to BigQuery.
"""
if client is None:
client = bigquery.Client()
return Connection(client, bqstorage_client)
34 changes: 20 additions & 14 deletions google/cloud/bigquery/dbapi/cursor.py
Expand Up @@ -51,6 +51,7 @@
)


@_helpers.raise_on_closed("Operating on a closed cursor.")
class Cursor(object):
"""DB-API Cursor to Google BigQuery.

Expand All @@ -73,9 +74,11 @@ def __init__(self, connection):
self.arraysize = None
self._query_data = None
self._query_job = None
self._closed = False

def close(self):
"""No-op."""
"""Mark the cursor as closed, preventing its further use."""
self._closed = True

def _set_description(self, schema):
"""Set description from schema.
Expand Down Expand Up @@ -256,34 +259,37 @@ def _bqstorage_fetch(self, bqstorage_client):

Args:
bqstorage_client(\
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
google.cloud.bigquery_storage_v1.BigQueryReadClient \
):
A client tha know how to talk to the BigQuery Storage API.

Returns:
Iterable[Mapping]:
A sequence of rows, represented as dictionaries.
"""
# NOTE: Given that BQ storage client instance is passed in, it means
# that bigquery_storage_v1beta1 library is available (no ImportError).
from google.cloud import bigquery_storage_v1beta1
# Hitting this code path with a BQ Storage client instance implies that
# bigquery_storage_v1 can indeed be imported here without errors.
from google.cloud import bigquery_storage_v1

table_reference = self._query_job.destination

requested_session = bigquery_storage_v1.types.ReadSession(
table=table_reference.to_bqstorage(),
data_format=bigquery_storage_v1.enums.DataFormat.AVRO,
)

read_session = bqstorage_client.create_read_session(
table_reference.to_bqstorage(),
"projects/{}".format(table_reference.project),
parent="projects/{}".format(table_reference.project),
read_session=requested_session,
# a single stream only, as DB API is not well-suited for multithreading
requested_streams=1,
max_stream_count=1,
)

if not read_session.streams:
return iter([]) # empty table, nothing to read

read_position = bigquery_storage_v1beta1.types.StreamPosition(
stream=read_session.streams[0],
)
read_rows_stream = bqstorage_client.read_rows(read_position)
stream_name = read_session.streams[0].name
read_rows_stream = bqstorage_client.read_rows(stream_name)
rows_iterable = read_rows_stream.rows(read_session)
return rows_iterable

Expand Down Expand Up @@ -353,10 +359,10 @@ def fetchall(self):
return list(self._query_data)

def setinputsizes(self, sizes):
"""No-op."""
"""No-op, but for consistency raise an error if cursor is closed."""

def setoutputsize(self, size, column=None):
"""No-op."""
"""No-op, but for consistency raise an error if cursor is closed."""


def _format_operation_list(operation, parameters):
Expand Down