Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add BigQuery storage client support to DB API #36

Merged
merged 10 commits into from Feb 25, 2020
20 changes: 20 additions & 0 deletions google/cloud/bigquery/dbapi/_helpers.py
Expand Up @@ -24,6 +24,7 @@
import six

from google.cloud import bigquery
from google.cloud.bigquery import table
from google.cloud.bigquery.dbapi import exceptions


Expand Down Expand Up @@ -218,3 +219,22 @@ def array_like(value):
return isinstance(value, collections_abc.Sequence) and not isinstance(
value, (six.text_type, six.binary_type, bytearray)
)


def to_bq_table_rows(rows_iterable):
"""Convert table rows to BigQuery table Row instances.

Args:
rows_iterable (Iterable[Mapping]):
An iterable of row data items to convert to ``Row`` instances.

Returns:
Iterable[google.cloud.bigquery.table.Row]
"""

def to_table_row(row):
values = tuple(row.values())
keys_to_index = {key: i for i, key in enumerate(row.keys())}
return table.Row(values, keys_to_index)

return (to_table_row(row_data) for row_data in rows_iterable)
39 changes: 33 additions & 6 deletions google/cloud/bigquery/dbapi/connection.py
Expand Up @@ -23,10 +23,24 @@ class Connection(object):
Args:
client (google.cloud.bigquery.Client): A client used to connect to BigQuery.
bqstorage_client(\
Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \
):
[Beta] An alternative client that uses the faster BigQuery Storage
API to fetch rows from BigQuery. If both clients are given,
``bqstorage_client`` is used first to fetch query results,
with a fallback on ``client``, if necessary.
.. note::
There is a known issue with the BigQuery Storage API with small
anonymous result sets, which results in such fallback.
https://github.com/googleapis/python-bigquery-storage/issues/2
"""

def __init__(self, client):
def __init__(self, client, bqstorage_client=None):
self._client = client
self._bqstorage_client = bqstorage_client

def close(self):
"""No-op."""
Expand All @@ -43,17 +57,30 @@ def cursor(self):
return cursor.Cursor(self)


def connect(client=None):
def connect(client=None, bqstorage_client=None):
"""Construct a DB-API connection to Google BigQuery.
Args:
client (google.cloud.bigquery.Client):
(Optional) A client used to connect to BigQuery. If not passed, a
client is created using default options inferred from the environment.
client (Optional[google.cloud.bigquery.Client]):
A client used to connect to BigQuery. If not passed, a client is
created using default options inferred from the environment.
bqstorage_client(\
Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \
):
[Beta] An alternative client that uses the faster BigQuery Storage
API to fetch rows from BigQuery. If both clients are given,
``bqstorage_client`` is used first to fetch query results,
with a fallback on ``client``, if necessary.
.. note::
There is a known issue with the BigQuery Storage API with small
anonymous result sets, which results in such fallback.
https://github.com/googleapis/python-bigquery-storage/issues/2
Returns:
google.cloud.bigquery.dbapi.Connection: A new DB-API connection to BigQuery.
"""
if client is None:
client = bigquery.Client()
return Connection(client)
return Connection(client, bqstorage_client)
66 changes: 66 additions & 0 deletions google/cloud/bigquery/dbapi/cursor.py
Expand Up @@ -21,13 +21,18 @@
except ImportError: # Python 2.7
import collections as collections_abc

import logging

import six

from google.cloud.bigquery import job
from google.cloud.bigquery.dbapi import _helpers
from google.cloud.bigquery.dbapi import exceptions
import google.cloud.exceptions


_LOGGER = logging.getLogger(__name__)

# Per PEP 249: A 7-item sequence containing information describing one result
# column. The first two items (name and type_code) are mandatory, the other
# five are optional and are set to None if no meaningful values can be
Expand Down Expand Up @@ -212,13 +217,74 @@ def _try_fetch(self, size=None):

if self._query_data is None:
client = self.connection._client
bqstorage_client = self.connection._bqstorage_client

if bqstorage_client:
plamut marked this conversation as resolved.
Show resolved Hide resolved
try:
rows_iterable = self._bqstorage_fetch(bqstorage_client)
self._query_data = _helpers.to_bq_table_rows(rows_iterable)
return
except google.api_core.exceptions.Forbidden:
# Don't hide errors such as insufficient permissions to create
# a read session, or the API is not enabled. Both of those are
# clearly problems if the developer has explicitly asked for
# BigQuery Storage API support.
raise
plamut marked this conversation as resolved.
Show resolved Hide resolved
except google.api_core.exceptions.GoogleAPICallError:
# There is an issue with reading from small anonymous
# query results tables. If such an error occurs, we silence
# it in order to try again with the tabledata.list API.
_LOGGER.debug(
"Error fetching data with BigQuery Storage API, "
"falling back to tabledata.list API."
)

rows_iter = client.list_rows(
self._query_job.destination,
selected_fields=self._query_job._query_results.schema,
page_size=self.arraysize,
)
self._query_data = iter(rows_iter)

def _bqstorage_fetch(self, bqstorage_client):
"""Start fetching data with the BigQuery Storage API.

The method assumes that the data about the relevant query job already
exists internally.

Args:
bqstorage_client(\
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
):
A client tha know how to talk to the BigQuery Storage API.

Returns:
Iterable[Mapping]:
A sequence of rows, represented as dictionaries.
"""
# NOTE: Given that BQ storage client instance is passed in, it means
# that bigquery_storage_v1beta1 library is available (no ImportError).
from google.cloud import bigquery_storage_v1beta1

table_reference = self._query_job.destination

read_session = bqstorage_client.create_read_session(
table_reference.to_bqstorage(),
"projects/{}".format(table_reference.project),
# only a single stream only, as DB API is not well-suited for multithreading
plamut marked this conversation as resolved.
Show resolved Hide resolved
requested_streams=1,
)

if not read_session.streams:
return iter([]) # empty table, nothing to read

read_position = bigquery_storage_v1beta1.types.StreamPosition(
stream=read_session.streams[0],
)
read_rows_stream = bqstorage_client.read_rows(read_position)
rows_iterable = read_rows_stream.rows(read_session)
return rows_iterable

def fetchone(self):
"""Fetch a single row from the results of the last ``execute*()`` call.

Expand Down
5 changes: 3 additions & 2 deletions noxfile.py
Expand Up @@ -48,7 +48,7 @@ def default(session):
# Since many tests are skipped due to missing dependencies, test
# coverage is much lower in Python 3.8. Remove once we can test with
# pyarrow.
coverage_fail_under = "--cov-fail-under=92"
coverage_fail_under = "--cov-fail-under=91"
plamut marked this conversation as resolved.
Show resolved Hide resolved
dev_install = ".[pandas,tqdm]"

session.install("-e", dev_install)
Expand All @@ -70,7 +70,7 @@ def default(session):
"--cov-report=",
coverage_fail_under,
os.path.join("tests", "unit"),
*session.posargs
*session.posargs,
)


Expand All @@ -94,6 +94,7 @@ def system(session):
# Install all test dependencies, then install local packages in place.
session.install("mock", "pytest", "psutil")
session.install("google-cloud-storage")
session.install("fastavro")
session.install("-e", "test_utils")
session.install("-e", ".[all]")

Expand Down
100 changes: 100 additions & 0 deletions tests/system.py
Expand Up @@ -36,6 +36,12 @@
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None

try:
import fastavro # to parse BQ storage client results
except ImportError: # pragma: NO COVER
fastavro = None

try:
import pandas
except ImportError: # pragma: NO COVER
Expand Down Expand Up @@ -1543,6 +1549,100 @@ def test_dbapi_fetchall(self):
row_tuples = [r.values() for r in rows]
self.assertEqual(row_tuples, [(1, 2), (3, 4), (5, 6)])

@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_dbapi_fetch_w_bqstorage_client_small_result_set(self):
bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=Config.CLIENT._credentials
)
cursor = dbapi.connect(Config.CLIENT, bqstorage_client).cursor()

# Reading small result sets causes an issue with BQ storage client,
# and the DB API should transparently fall back to the default client.
cursor.execute(
"""
SELECT id, `by`, time_ts
FROM `bigquery-public-data.hacker_news.comments`
ORDER BY `id` ASC
LIMIT 10
"""
)

result_rows = [cursor.fetchone(), cursor.fetchone(), cursor.fetchone()]

field_name = operator.itemgetter(0)
fetched_data = [sorted(row.items(), key=field_name) for row in result_rows]

expected_data = [
[
("by", "sama"),
("id", 15),
("time_ts", datetime.datetime(2006, 10, 9, 19, 51, 1, tzinfo=UTC)),
],
[
("by", "pg"),
("id", 17),
("time_ts", datetime.datetime(2006, 10, 9, 19, 52, 45, tzinfo=UTC)),
],
[
("by", "pg"),
("id", 22),
("time_ts", datetime.datetime(2006, 10, 10, 2, 18, 22, tzinfo=UTC)),
],
]
self.assertEqual(fetched_data, expected_data)

@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
@unittest.skipIf(fastavro is None, "Requires `fastavro`")
def test_dbapi_fetch_w_bqstorage_client_large_result_set(self):
bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=Config.CLIENT._credentials
)
cursor = dbapi.connect(Config.CLIENT, bqstorage_client).cursor()

# Pick a large enouhg LIMIT value to assure that the fallback to the
# default client is not needed due to the result set being too small
# (a known issue that causes problems when reding such result sets with
# BQ storage client).
cursor.execute(
"""
SELECT id, `by`, time_ts
FROM `bigquery-public-data.hacker_news.comments`
ORDER BY `id` ASC
LIMIT 100000
"""
)

result_rows = [cursor.fetchone(), cursor.fetchone(), cursor.fetchone()]

field_name = operator.itemgetter(0)
fetched_data = [sorted(row.items(), key=field_name) for row in result_rows]

# Since DB API is not thread safe, only a single result stream should be
# requested by the BQ storage client, meaning that results should arrive
# in the sorted order.
expected_data = [
[
("by", "sama"),
("id", 15),
("time_ts", datetime.datetime(2006, 10, 9, 19, 51, 1, tzinfo=UTC)),
],
[
("by", "pg"),
("id", 17),
("time_ts", datetime.datetime(2006, 10, 9, 19, 52, 45, tzinfo=UTC)),
],
[
("by", "pg"),
("id", 22),
("time_ts", datetime.datetime(2006, 10, 10, 2, 18, 22, tzinfo=UTC)),
],
]
self.assertEqual(fetched_data, expected_data)

def _load_table_for_dml(self, rows, dataset_id, table_id):
from google.cloud._testing import _NamedTemporaryFile
from google.cloud.bigquery.job import CreateDisposition
Expand Down
34 changes: 34 additions & 0 deletions tests/unit/test_dbapi__helpers.py
Expand Up @@ -15,9 +15,11 @@
import datetime
import decimal
import math
import operator as op
import unittest

import google.cloud._helpers
from google.cloud.bigquery import table
from google.cloud.bigquery.dbapi import _helpers
from google.cloud.bigquery.dbapi import exceptions

Expand Down Expand Up @@ -185,3 +187,35 @@ def test_to_query_parameters_w_list_dict_param(self):
def test_to_query_parameters_none_argument(self):
query_parameters = _helpers.to_query_parameters(None)
self.assertEqual(query_parameters, [])


class TestToBqTableRows(unittest.TestCase):
def test_empty_iterable(self):
rows_iterable = iter([])
result = _helpers.to_bq_table_rows(rows_iterable)
self.assertEqual(list(result), [])

def test_non_empty_iterable(self):
rows_iterable = [
dict(one=1.1, four=1.4, two=1.2, three=1.3),
dict(one=2.1, four=2.4, two=2.2, three=2.3),
]

result = _helpers.to_bq_table_rows(rows_iterable)

rows = list(result)
self.assertEqual(len(rows), 2)

row_1, row_2 = rows
self.assertIsInstance(row_1, table.Row)
self.assertIsInstance(row_2, table.Row)

field_value = op.itemgetter(1)

items = sorted(row_1.items(), key=field_value)
expected_items = [("one", 1.1), ("two", 1.2), ("three", 1.3), ("four", 1.4)]
self.assertEqual(items, expected_items)

items = sorted(row_2.items(), key=field_value)
expected_items = [("one", 2.1), ("two", 2.2), ("three", 2.3), ("four", 2.4)]
self.assertEqual(items, expected_items)