Skip to content

Commit

Permalink
Use BigQuery Storage client in Cursor if available
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Feb 13, 2020
1 parent 6560183 commit d5f7beb
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 0 deletions.
20 changes: 20 additions & 0 deletions google/cloud/bigquery/dbapi/_helpers.py
Expand Up @@ -24,6 +24,7 @@
import six

from google.cloud import bigquery
from google.cloud.bigquery import table
from google.cloud.bigquery.dbapi import exceptions


Expand Down Expand Up @@ -218,3 +219,22 @@ def array_like(value):
return isinstance(value, collections_abc.Sequence) and not isinstance(
value, (six.text_type, six.binary_type, bytearray)
)


def to_bq_table_rows(rows_iterable):
"""Convert table rows to BigQuery table Row instances.
Args:
rows_iterable (Iterable[Mapping]):
An iterable of row data items to convert to ``Row`` instances.
Returns:
Iterable[google.cloud.bigquery.table.Row]
"""

def to_table_row(row):
values = tuple(row.values())
keys_to_index = {key: i for i, key in enumerate(row.keys())}
return table.Row(values, keys_to_index)

return (to_table_row(row_data) for row_data in rows_iterable)
66 changes: 66 additions & 0 deletions google/cloud/bigquery/dbapi/cursor.py
Expand Up @@ -21,13 +21,18 @@
except ImportError: # Python 2.7
import collections as collections_abc

import logging

import six

from google.cloud.bigquery import job
from google.cloud.bigquery.dbapi import _helpers
from google.cloud.bigquery.dbapi import exceptions
import google.cloud.exceptions


_LOGGER = logging.getLogger(__name__)

# Per PEP 249: A 7-item sequence containing information describing one result
# column. The first two items (name and type_code) are mandatory, the other
# five are optional and are set to None if no meaningful values can be
Expand Down Expand Up @@ -212,13 +217,74 @@ def _try_fetch(self, size=None):

if self._query_data is None:
client = self.connection._client
bqstorage_client = self.connection._bqstorage_client

if bqstorage_client:
try:
rows_iterable = self._bqstorage_fetch(bqstorage_client)
self._query_data = _helpers.to_bq_table_rows(rows_iterable)
return
except google.api_core.exceptions.Forbidden:
# Don't hide errors such as insufficient permissions to create
# a read session, or the API is not enabled. Both of those are
# clearly problems if the developer has explicitly asked for
# BigQuery Storage API support.
raise
except google.api_core.exceptions.GoogleAPICallError:
# There is an issue with reading from small anonymous
# query results tables. If such an error occurs, we silence
# it in order to try again with the tabledata.list API.
_LOGGER.debug(
"Error fetching data with BigQuery Storage API, "
"falling back to tabledata.list API."
)

rows_iter = client.list_rows(
self._query_job.destination,
selected_fields=self._query_job._query_results.schema,
page_size=self.arraysize,
)
self._query_data = iter(rows_iter)

def _bqstorage_fetch(self, bqstorage_client):
"""Start fetching data with the BigQuery Storage API.
The method assumes that the data about the relevant query job already
exists internally.
Args:
bqstorage_client(\
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
):
A client tha know how to talk to the BigQuery Storage API.
Returns:
Iterable[Mapping]:
A sequence of rows, represented as dictionaries.
"""
# NOTE: Given that BQ storage client instance is passed in, it means
# that bigquery_storage_v1beta1 library is available (no ImportError).
from google.cloud import bigquery_storage_v1beta1

table_reference = self._query_job.destination

read_session = bqstorage_client.create_read_session(
table_reference.to_bqstorage(),
"projects/{}".format(table_reference.project),
# only a single stream only, as DB API is not well-suited for multithreading
requested_streams=1,
)

if not read_session.streams:
return iter([]) # empty table, nothing to read

read_position = bigquery_storage_v1beta1.types.StreamPosition(
stream=read_session.streams[0],
)
read_rows_stream = bqstorage_client.read_rows(read_position)
rows_iterable = read_rows_stream.rows(read_session)
return rows_iterable

def fetchone(self):
"""Fetch a single row from the results of the last ``execute*()`` call.
Expand Down
34 changes: 34 additions & 0 deletions tests/unit/test_dbapi__helpers.py
Expand Up @@ -15,9 +15,11 @@
import datetime
import decimal
import math
import operator as op
import unittest

import google.cloud._helpers
from google.cloud.bigquery import table
from google.cloud.bigquery.dbapi import _helpers
from google.cloud.bigquery.dbapi import exceptions

Expand Down Expand Up @@ -185,3 +187,35 @@ def test_to_query_parameters_w_list_dict_param(self):
def test_to_query_parameters_none_argument(self):
query_parameters = _helpers.to_query_parameters(None)
self.assertEqual(query_parameters, [])


class TestToBqTableRows(unittest.TestCase):
def test_empty_iterable(self):
rows_iterable = iter([])
result = _helpers.to_bq_table_rows(rows_iterable)
self.assertEqual(list(result), [])

def test_non_empty_iterable(self):
rows_iterable = [
dict(foo=1.1, bar=1.2, baz=1.3, quux=1.4),
dict(foo=2.1, bar=2.2, baz=2.3, quux=2.4),
]

result = _helpers.to_bq_table_rows(rows_iterable)

rows = list(result)
self.assertEqual(len(rows), 2)

row_1, row_2 = rows
self.assertIsInstance(row_1, table.Row)
self.assertIsInstance(row_2, table.Row)

field_value = op.itemgetter(1)

items = sorted(row_1.items(), key=field_value)
expected_items = [("foo", 1.1), ("bar", 1.2), ("baz", 1.3), ("quux", 1.4)]
self.assertEqual(items, expected_items)

items = sorted(row_2.items(), key=field_value)
expected_items = [("foo", 2.1), ("bar", 2.2), ("baz", 2.3), ("quux", 2.4)]
self.assertEqual(items, expected_items)

0 comments on commit d5f7beb

Please sign in to comment.