Skip to content

Commit

Permalink
feat: add BigQuery storage client support to DB API (#36)
Browse files Browse the repository at this point in the history
* feat: add BigQueryStorageClient support to DB API

* Use BigQuery Storage client in Cursor if available

* Skip BQ storage unit tests in Python 3.8

* Add system tests for Cursor w/ BQ storage client

* Add test for Connection ctor w/o BQ storage client

* Refactor exception handling in Cursor._try_fetch()

* Add explicit check against None

Co-Authored-By: Tres Seaver <tseaver@palladion.com>

* Remove redundand word in a comment in cursor.py

Co-authored-by: Tres Seaver <tseaver@palladion.com>
  • Loading branch information
plamut and tseaver committed Feb 25, 2020
1 parent 645f0fd commit ba9b2f8
Show file tree
Hide file tree
Showing 8 changed files with 487 additions and 10 deletions.
20 changes: 20 additions & 0 deletions google/cloud/bigquery/dbapi/_helpers.py
Expand Up @@ -24,6 +24,7 @@
import six

from google.cloud import bigquery
from google.cloud.bigquery import table
from google.cloud.bigquery.dbapi import exceptions


Expand Down Expand Up @@ -218,3 +219,22 @@ def array_like(value):
return isinstance(value, collections_abc.Sequence) and not isinstance(
value, (six.text_type, six.binary_type, bytearray)
)


def to_bq_table_rows(rows_iterable):
"""Convert table rows to BigQuery table Row instances.
Args:
rows_iterable (Iterable[Mapping]):
An iterable of row data items to convert to ``Row`` instances.
Returns:
Iterable[google.cloud.bigquery.table.Row]
"""

def to_table_row(row):
values = tuple(row.values())
keys_to_index = {key: i for i, key in enumerate(row.keys())}
return table.Row(values, keys_to_index)

return (to_table_row(row_data) for row_data in rows_iterable)
39 changes: 33 additions & 6 deletions google/cloud/bigquery/dbapi/connection.py
Expand Up @@ -23,10 +23,24 @@ class Connection(object):
Args:
client (google.cloud.bigquery.Client): A client used to connect to BigQuery.
bqstorage_client(\
Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \
):
[Beta] An alternative client that uses the faster BigQuery Storage
API to fetch rows from BigQuery. If both clients are given,
``bqstorage_client`` is used first to fetch query results,
with a fallback on ``client``, if necessary.
.. note::
There is a known issue with the BigQuery Storage API with small
anonymous result sets, which results in such fallback.
https://github.com/googleapis/python-bigquery-storage/issues/2
"""

def __init__(self, client):
def __init__(self, client, bqstorage_client=None):
self._client = client
self._bqstorage_client = bqstorage_client

def close(self):
"""No-op."""
Expand All @@ -43,17 +57,30 @@ def cursor(self):
return cursor.Cursor(self)


def connect(client=None):
def connect(client=None, bqstorage_client=None):
"""Construct a DB-API connection to Google BigQuery.
Args:
client (google.cloud.bigquery.Client):
(Optional) A client used to connect to BigQuery. If not passed, a
client is created using default options inferred from the environment.
client (Optional[google.cloud.bigquery.Client]):
A client used to connect to BigQuery. If not passed, a client is
created using default options inferred from the environment.
bqstorage_client(\
Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \
):
[Beta] An alternative client that uses the faster BigQuery Storage
API to fetch rows from BigQuery. If both clients are given,
``bqstorage_client`` is used first to fetch query results,
with a fallback on ``client``, if necessary.
.. note::
There is a known issue with the BigQuery Storage API with small
anonymous result sets, which results in such fallback.
https://github.com/googleapis/python-bigquery-storage/issues/2
Returns:
google.cloud.bigquery.dbapi.Connection: A new DB-API connection to BigQuery.
"""
if client is None:
client = bigquery.Client()
return Connection(client)
return Connection(client, bqstorage_client)
68 changes: 68 additions & 0 deletions google/cloud/bigquery/dbapi/cursor.py
Expand Up @@ -21,13 +21,18 @@
except ImportError: # Python 2.7
import collections as collections_abc

import logging

import six

from google.cloud.bigquery import job
from google.cloud.bigquery.dbapi import _helpers
from google.cloud.bigquery.dbapi import exceptions
import google.cloud.exceptions


_LOGGER = logging.getLogger(__name__)

# Per PEP 249: A 7-item sequence containing information describing one result
# column. The first two items (name and type_code) are mandatory, the other
# five are optional and are set to None if no meaningful values can be
Expand Down Expand Up @@ -212,13 +217,76 @@ def _try_fetch(self, size=None):

if self._query_data is None:
client = self.connection._client
bqstorage_client = self.connection._bqstorage_client

if bqstorage_client is not None:
try:
rows_iterable = self._bqstorage_fetch(bqstorage_client)
self._query_data = _helpers.to_bq_table_rows(rows_iterable)
return
except google.api_core.exceptions.GoogleAPICallError as exc:
# NOTE: Forbidden is a subclass of GoogleAPICallError
if isinstance(exc, google.api_core.exceptions.Forbidden):
# Don't hide errors such as insufficient permissions to create
# a read session, or the API is not enabled. Both of those are
# clearly problems if the developer has explicitly asked for
# BigQuery Storage API support.
raise

# There is an issue with reading from small anonymous
# query results tables. If such an error occurs, we silence
# it in order to try again with the tabledata.list API.
_LOGGER.debug(
"Error fetching data with BigQuery Storage API, "
"falling back to tabledata.list API."
)

rows_iter = client.list_rows(
self._query_job.destination,
selected_fields=self._query_job._query_results.schema,
page_size=self.arraysize,
)
self._query_data = iter(rows_iter)

def _bqstorage_fetch(self, bqstorage_client):
"""Start fetching data with the BigQuery Storage API.
The method assumes that the data about the relevant query job already
exists internally.
Args:
bqstorage_client(\
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
):
A client tha know how to talk to the BigQuery Storage API.
Returns:
Iterable[Mapping]:
A sequence of rows, represented as dictionaries.
"""
# NOTE: Given that BQ storage client instance is passed in, it means
# that bigquery_storage_v1beta1 library is available (no ImportError).
from google.cloud import bigquery_storage_v1beta1

table_reference = self._query_job.destination

read_session = bqstorage_client.create_read_session(
table_reference.to_bqstorage(),
"projects/{}".format(table_reference.project),
# a single stream only, as DB API is not well-suited for multithreading
requested_streams=1,
)

if not read_session.streams:
return iter([]) # empty table, nothing to read

read_position = bigquery_storage_v1beta1.types.StreamPosition(
stream=read_session.streams[0],
)
read_rows_stream = bqstorage_client.read_rows(read_position)
rows_iterable = read_rows_stream.rows(read_session)
return rows_iterable

def fetchone(self):
"""Fetch a single row from the results of the last ``execute*()`` call.
Expand Down
5 changes: 3 additions & 2 deletions noxfile.py
Expand Up @@ -48,7 +48,7 @@ def default(session):
# Since many tests are skipped due to missing dependencies, test
# coverage is much lower in Python 3.8. Remove once we can test with
# pyarrow.
coverage_fail_under = "--cov-fail-under=92"
coverage_fail_under = "--cov-fail-under=91"
dev_install = ".[pandas,tqdm]"

session.install("-e", dev_install)
Expand All @@ -70,7 +70,7 @@ def default(session):
"--cov-report=",
coverage_fail_under,
os.path.join("tests", "unit"),
*session.posargs
*session.posargs,
)


Expand All @@ -94,6 +94,7 @@ def system(session):
# Install all test dependencies, then install local packages in place.
session.install("mock", "pytest", "psutil")
session.install("google-cloud-storage")
session.install("fastavro")
session.install("-e", "test_utils")
session.install("-e", ".[all]")

Expand Down
100 changes: 100 additions & 0 deletions tests/system.py
Expand Up @@ -36,6 +36,12 @@
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None

try:
import fastavro # to parse BQ storage client results
except ImportError: # pragma: NO COVER
fastavro = None

try:
import pandas
except ImportError: # pragma: NO COVER
Expand Down Expand Up @@ -1543,6 +1549,100 @@ def test_dbapi_fetchall(self):
row_tuples = [r.values() for r in rows]
self.assertEqual(row_tuples, [(1, 2), (3, 4), (5, 6)])

@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_dbapi_fetch_w_bqstorage_client_small_result_set(self):
bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=Config.CLIENT._credentials
)
cursor = dbapi.connect(Config.CLIENT, bqstorage_client).cursor()

# Reading small result sets causes an issue with BQ storage client,
# and the DB API should transparently fall back to the default client.
cursor.execute(
"""
SELECT id, `by`, time_ts
FROM `bigquery-public-data.hacker_news.comments`
ORDER BY `id` ASC
LIMIT 10
"""
)

result_rows = [cursor.fetchone(), cursor.fetchone(), cursor.fetchone()]

field_name = operator.itemgetter(0)
fetched_data = [sorted(row.items(), key=field_name) for row in result_rows]

expected_data = [
[
("by", "sama"),
("id", 15),
("time_ts", datetime.datetime(2006, 10, 9, 19, 51, 1, tzinfo=UTC)),
],
[
("by", "pg"),
("id", 17),
("time_ts", datetime.datetime(2006, 10, 9, 19, 52, 45, tzinfo=UTC)),
],
[
("by", "pg"),
("id", 22),
("time_ts", datetime.datetime(2006, 10, 10, 2, 18, 22, tzinfo=UTC)),
],
]
self.assertEqual(fetched_data, expected_data)

@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
@unittest.skipIf(fastavro is None, "Requires `fastavro`")
def test_dbapi_fetch_w_bqstorage_client_large_result_set(self):
bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=Config.CLIENT._credentials
)
cursor = dbapi.connect(Config.CLIENT, bqstorage_client).cursor()

# Pick a large enouhg LIMIT value to assure that the fallback to the
# default client is not needed due to the result set being too small
# (a known issue that causes problems when reding such result sets with
# BQ storage client).
cursor.execute(
"""
SELECT id, `by`, time_ts
FROM `bigquery-public-data.hacker_news.comments`
ORDER BY `id` ASC
LIMIT 100000
"""
)

result_rows = [cursor.fetchone(), cursor.fetchone(), cursor.fetchone()]

field_name = operator.itemgetter(0)
fetched_data = [sorted(row.items(), key=field_name) for row in result_rows]

# Since DB API is not thread safe, only a single result stream should be
# requested by the BQ storage client, meaning that results should arrive
# in the sorted order.
expected_data = [
[
("by", "sama"),
("id", 15),
("time_ts", datetime.datetime(2006, 10, 9, 19, 51, 1, tzinfo=UTC)),
],
[
("by", "pg"),
("id", 17),
("time_ts", datetime.datetime(2006, 10, 9, 19, 52, 45, tzinfo=UTC)),
],
[
("by", "pg"),
("id", 22),
("time_ts", datetime.datetime(2006, 10, 10, 2, 18, 22, tzinfo=UTC)),
],
]
self.assertEqual(fetched_data, expected_data)

def _load_table_for_dml(self, rows, dataset_id, table_id):
from google.cloud._testing import _NamedTemporaryFile
from google.cloud.bigquery.job import CreateDisposition
Expand Down
34 changes: 34 additions & 0 deletions tests/unit/test_dbapi__helpers.py
Expand Up @@ -15,9 +15,11 @@
import datetime
import decimal
import math
import operator as op
import unittest

import google.cloud._helpers
from google.cloud.bigquery import table
from google.cloud.bigquery.dbapi import _helpers
from google.cloud.bigquery.dbapi import exceptions

Expand Down Expand Up @@ -185,3 +187,35 @@ def test_to_query_parameters_w_list_dict_param(self):
def test_to_query_parameters_none_argument(self):
query_parameters = _helpers.to_query_parameters(None)
self.assertEqual(query_parameters, [])


class TestToBqTableRows(unittest.TestCase):
def test_empty_iterable(self):
rows_iterable = iter([])
result = _helpers.to_bq_table_rows(rows_iterable)
self.assertEqual(list(result), [])

def test_non_empty_iterable(self):
rows_iterable = [
dict(one=1.1, four=1.4, two=1.2, three=1.3),
dict(one=2.1, four=2.4, two=2.2, three=2.3),
]

result = _helpers.to_bq_table_rows(rows_iterable)

rows = list(result)
self.assertEqual(len(rows), 2)

row_1, row_2 = rows
self.assertIsInstance(row_1, table.Row)
self.assertIsInstance(row_2, table.Row)

field_value = op.itemgetter(1)

items = sorted(row_1.items(), key=field_value)
expected_items = [("one", 1.1), ("two", 1.2), ("three", 1.3), ("four", 1.4)]
self.assertEqual(items, expected_items)

items = sorted(row_2.items(), key=field_value)
expected_items = [("one", 2.1), ("two", 2.2), ("three", 2.3), ("four", 2.4)]
self.assertEqual(items, expected_items)

0 comments on commit ba9b2f8

Please sign in to comment.