Skip to content

Commit

Permalink
feat: add support for BQ storage client to DB API
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Feb 12, 2020
1 parent 018cea1 commit cd44040
Showing 1 changed file with 77 additions and 0 deletions.
77 changes: 77 additions & 0 deletions google/cloud/bigquery/dbapi/cursor.py
Expand Up @@ -21,13 +21,19 @@
except ImportError: # Python 2.7
import collections as collections_abc

import logging

import six

from google.cloud import bigquery
from google.cloud.bigquery import job
from google.cloud.bigquery.dbapi import _helpers
from google.cloud.bigquery.dbapi import exceptions
import google.cloud.exceptions


_LOGGER = logging.getLogger(__name__)

# Per PEP 249: A 7-item sequence containing information describing one result
# column. The first two items (name and type_code) are mandatory, the other
# five are optional and are set to None if no meaningful values can be
Expand Down Expand Up @@ -192,11 +198,55 @@ def executemany(self, operation, seq_of_parameters):
for parameters in seq_of_parameters:
self.execute(operation, parameters)

def _bqstorage_fetch(self, bqstorage_client, size):
"""TODO: docstring
"""
# TODO: add assumption that query job exists check has been performed

# Since a BQ storage client instance is passed in, it means that
# bigquery_storage_v1beta1 library is available (no ImportError)
from google.cloud import bigquery_storage_v1beta1

destination = self._query_job.destination
table_reference = bigquery.table.TableReference(
bigquery.dataset.DatasetReference(
destination.project, destination.dataset_id,
),
destination.table_id,
)

# TODO: same checks as in _pandas_helpers._download_table_bqstorage()?
# (rading from partitions and snapshots)

read_session = bqstorage_client.create_read_session(
table_reference.to_bqstorage(),
"projects/{}".format(destination.project),
# only a single stream only, as DB API is not well-suited for multithreading
requested_streams=1,
)

if not read_session.streams:
return iter([]) # empty table, nothing to read

read_position = bigquery_storage_v1beta1.types.StreamPosition(
stream=read_session.streams[0],
)
read_rows_stream = bqstorage_client.read_rows(read_position)
rows_iterable = read_rows_stream.rows(read_session)
return rows_iterable

def _try_fetch(self, size=None):
"""Try to start fetching data, if not yet started.
Mutates self to indicate that iteration has started.
"""
# TODO: fetch results... using bq_storage_client...
# else: fallback to default client
# (mention in docstring higher levels up?)

# TODO: add this known issue to docstring? just as in to_dataframe*()
# (about small result sets)

if self._query_job is None:
raise exceptions.InterfaceError(
"No query results: execute() must be called before fetch."
Expand All @@ -212,6 +262,33 @@ def _try_fetch(self, size=None):

if self._query_data is None:
client = self.connection._client
bqstorage_client = self.connection._bqstorage_client

if bqstorage_client:

# TODO: Are we supposed to read somehting here in order to
# detect errors and try a fallback the table.list API?
try:
rows_iterable = self._bqstorage_fetch(bqstorage_client, size)
self._query_data = iter(rows_iterable)
return
except google.api_core.exceptions.Forbidden:
# Don't hide errors such as insufficient permissions to create
# a read session, or the API is not enabled. Both of those are
# clearly problems if the developer has explicitly asked for
# BigQuery Storage API support.
raise
except google.api_core.exceptions.GoogleAPICallError:
# There is an issue with reading from small anonymous
# query results tables. If such an error occurs, we silence
# it in order to try again with the tabledata.list API.
# than throw those errors, try reading the data again, but
# with the tabledata.list API.
_LOGGER.debug(
"Error fetching data with BQ storage client, falling "
"back to tabledata.list API."
)

rows_iter = client.list_rows(
self._query_job.destination,
selected_fields=self._query_job._query_results.schema,
Expand Down

0 comments on commit cd44040

Please sign in to comment.