Skip to content

Commit

Permalink
Support both v1 stable and beta1 BQ Storage client
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Apr 30, 2020
1 parent 7d7dc73 commit 12b2171
Show file tree
Hide file tree
Showing 6 changed files with 466 additions and 48 deletions.
65 changes: 50 additions & 15 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -577,7 +577,19 @@ def _bqstorage_page_to_dataframe(column_names, dtypes, page):
def _download_table_bqstorage_stream(
download_state, bqstorage_client, session, stream, worker_queue, page_to_item
):
rowstream = bqstorage_client.read_rows(stream.name).rows(session)
# Passing a BQ Storage client in implies that the BigQuery Storage library
# is available and can be imported.
from google.cloud import bigquery_storage_v1beta1

# We want to preserve comaptibility with the v1beta1 BQ Storage clients,
# thus adjust constructing the rowstream if needed.
# The assumption is that the caller provides a BQ Storage `session` that is
# compatible with the version of the BQ Storage client passed in.
if isinstance(bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)
else:
rowstream = bqstorage_client.read_rows(stream.name).rows(session)

for page in rowstream.pages:
if download_state.done:
Expand Down Expand Up @@ -609,29 +621,52 @@ def _download_table_bqstorage(
page_to_item=None,
):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""

# Passing a BQ Storage client in implies that the BigQuery Storage library
# is available and can be imported.
from google.cloud import bigquery_storage_v1
from google.cloud import bigquery_storage_v1beta1

if "$" in table.table_id:
raise ValueError(
"Reading from a specific partition is not currently supported."
)
if "@" in table.table_id:
raise ValueError("Reading from a specific snapshot is not currently supported.")

requested_session = bigquery_storage_v1.types.ReadSession(
table=table.to_bqstorage(),
data_format=bigquery_storage_v1.enums.DataFormat.ARROW,
)

if selected_fields is not None:
for field in selected_fields:
requested_session.read_options.selected_fields.append(field.name)

requested_streams = 1 if preserve_order else 0

session = bqstorage_client.create_read_session(
parent="projects/{}".format(project_id),
read_session=requested_session,
max_stream_count=requested_streams,
)
# We want to preserve comaptibility with the v1beta1 BQ Storage clients,
# thus adjust the session creation if needed.
if isinstance(bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient):
read_options = bigquery_storage_v1beta1.types.TableReadOptions()

if selected_fields is not None:
for field in selected_fields:
read_options.selected_fields.append(field.name)

session = bqstorage_client.create_read_session(
table.to_bqstorage(v1beta1=True),
"projects/{}".format(project_id),
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
read_options=read_options,
requested_streams=requested_streams,
)
else:
requested_session = bigquery_storage_v1.types.ReadSession(
table=table.to_bqstorage(),
data_format=bigquery_storage_v1.enums.DataFormat.ARROW,
)
if selected_fields is not None:
for field in selected_fields:
requested_session.read_options.selected_fields.append(field.name)

session = bqstorage_client.create_read_session(
parent="projects/{}".format(project_id),
read_session=requested_session,
max_stream_count=requested_streams,
)

_LOGGER.debug(
"Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format(
table.project, table.dataset_id, table.table_id, session.name
Expand Down
45 changes: 33 additions & 12 deletions google/cloud/bigquery/dbapi/cursor.py
Expand Up @@ -268,28 +268,49 @@ def _bqstorage_fetch(self, bqstorage_client):
A sequence of rows, represented as dictionaries.
"""
# Hitting this code path with a BQ Storage client instance implies that
# bigquery_storage_v1 can indeed be imported here without errors.
# bigquery_storage_v1* can indeed be imported here without errors.
from google.cloud import bigquery_storage_v1
from google.cloud import bigquery_storage_v1beta1

table_reference = self._query_job.destination

requested_session = bigquery_storage_v1.types.ReadSession(
table=table_reference.to_bqstorage(),
data_format=bigquery_storage_v1.enums.DataFormat.AVRO,
is_v1beta1_client = isinstance(
bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient
)

read_session = bqstorage_client.create_read_session(
parent="projects/{}".format(table_reference.project),
read_session=requested_session,
# a single stream only, as DB API is not well-suited for multithreading
max_stream_count=1,
)
# We want to preserve comaptibility with the v1beta1 BQ Storage clients,
# thus adjust the session creation if needed.
if is_v1beta1_client:
read_session = bqstorage_client.create_read_session(
table_reference.to_bqstorage(v1beta1=True),
"projects/{}".format(table_reference.project),
# a single stream only, as DB API is not well-suited for multithreading
requested_streams=1,
)
else:
requested_session = bigquery_storage_v1.types.ReadSession(
table=table_reference.to_bqstorage(),
data_format=bigquery_storage_v1.enums.DataFormat.AVRO,
)
read_session = bqstorage_client.create_read_session(
parent="projects/{}".format(table_reference.project),
read_session=requested_session,
# a single stream only, as DB API is not well-suited for multithreading
max_stream_count=1,
)

if not read_session.streams:
return iter([]) # empty table, nothing to read

stream_name = read_session.streams[0].name
read_rows_stream = bqstorage_client.read_rows(stream_name)
if is_v1beta1_client:
read_position = bigquery_storage_v1beta1.types.StreamPosition(
stream=read_session.streams[0],
)
read_rows_stream = bqstorage_client.read_rows(read_position)
else:
stream_name = read_session.streams[0].name
read_rows_stream = bqstorage_client.read_rows(stream_name)

rows_iterable = read_rows_stream.rows(read_session)
return rows_iterable

Expand Down
62 changes: 51 additions & 11 deletions google/cloud/bigquery/table.py
Expand Up @@ -25,6 +25,12 @@

import six

try:
# Needed for the to_bqstorage() method.
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None

try:
import pandas
except ImportError: # pragma: NO COVER
Expand Down Expand Up @@ -221,7 +227,7 @@ def to_api_repr(self):
"tableId": self._table_id,
}

def to_bqstorage(self):
def to_bqstorage(self, v1beta1=False):
"""Construct a BigQuery Storage API representation of this table.
Install the ``google-cloud-bigquery-storage`` package to use this
Expand All @@ -235,15 +241,37 @@ def to_bqstorage(self):
:class:`google.cloud.bigquery_storage_v1.types.ReadSession.TableModifiers`
to select a specific snapshot to read from.
Args:
v1beta1 (Optiona[bool]):
If :data:`True`, return representation compatible with BigQuery
Storage ``v1beta1`` version. Defaults to :data:`False`.
Returns:
str: A reference to this table in the BigQuery Storage API.
Union[str, google.cloud.bigquery_storage_v1beta1.types.TableReference:]:
A reference to this table in the BigQuery Storage API.
Raises:
ValueError:
If ``v1beta1`` compatibility is requested, but the
:mod:`google.cloud.bigquery_storage_v1beta1` module cannot be imported.
"""
if v1beta1 and bigquery_storage_v1beta1 is None:
raise ValueError(_NO_BQSTORAGE_ERROR)

table_id, _, _ = self._table_id.partition("@")
table_id, _, _ = table_id.partition("$")

table_ref = "projects/{}/datasets/{}/tables/{}".format(
self._project, self._dataset_id, table_id,
)
if v1beta1:
table_ref = bigquery_storage_v1beta1.types.TableReference(
project_id=self._project,
dataset_id=self._dataset_id,
table_id=table_id,
)
else:
table_ref = "projects/{}/datasets/{}/tables/{}".format(
self._project, self._dataset_id, table_id,
)

return table_ref

def _key(self):
Expand Down Expand Up @@ -849,13 +877,19 @@ def to_api_repr(self):
"""
return copy.deepcopy(self._properties)

def to_bqstorage(self):
def to_bqstorage(self, v1beta1=False):
"""Construct a BigQuery Storage API representation of this table.
Args:
v1beta1 (Optiona[bool]):
If :data:`True`, return representation compatible with BigQuery
Storage ``v1beta1`` version. Defaults to :data:`False`.
Returns:
str: A reference to this table in the BigQuery Storage API.
Union[str, google.cloud.bigquery_storage_v1beta1.types.TableReference:]:
A reference to this table in the BigQuery Storage API.
"""
return self.reference.to_bqstorage()
return self.reference.to_bqstorage(v1beta1=v1beta1)

def _build_resource(self, filter_fields):
"""Generate a resource for ``update``."""
Expand Down Expand Up @@ -1063,13 +1097,19 @@ def from_string(cls, full_table_id):
{"tableReference": TableReference.from_string(full_table_id).to_api_repr()}
)

def to_bqstorage(self):
def to_bqstorage(self, v1beta1=False):
"""Construct a BigQuery Storage API representation of this table.
Args:
v1beta1 (Optiona[bool]):
If :data:`True`, return representation compatible with BigQuery
Storage ``v1beta1`` version. Defaults to :data:`False`.
Returns:
str: A reference to this table in the BigQuery Storage API.
Union[str, google.cloud.bigquery_storage_v1beta1.types.TableReference:]:
A reference to this table in the BigQuery Storage API.
"""
return self.reference.to_bqstorage()
return self.reference.to_bqstorage(v1beta1=v1beta1)


def _row_from_mapping(mapping, schema):
Expand Down
106 changes: 106 additions & 0 deletions tests/system.py
Expand Up @@ -34,8 +34,10 @@

try:
from google.cloud import bigquery_storage_v1
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage_v1 = None
bigquery_storage_v1beta1 = None

try:
import fastavro # to parse BQ storage client results
Expand Down Expand Up @@ -1648,6 +1650,56 @@ def test_dbapi_fetch_w_bqstorage_client_large_result_set(self):
]
self.assertEqual(fetched_data, expected_data)

@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
@unittest.skipIf(fastavro is None, "Requires `fastavro`")
def test_dbapi_fetch_w_bqstorage_client_v1beta1_large_result_set(self):
bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=Config.CLIENT._credentials
)
cursor = dbapi.connect(Config.CLIENT, bqstorage_client).cursor()

# Pick a large enouhg LIMIT value to assure that the fallback to the
# default client is not needed due to the result set being too small
# (a known issue that causes problems when reding such result sets with
# BQ storage client).
cursor.execute(
"""
SELECT id, `by`, time_ts
FROM `bigquery-public-data.hacker_news.comments`
ORDER BY `id` ASC
LIMIT 100000
"""
)

result_rows = [cursor.fetchone(), cursor.fetchone(), cursor.fetchone()]

field_name = operator.itemgetter(0)
fetched_data = [sorted(row.items(), key=field_name) for row in result_rows]

# Since DB API is not thread safe, only a single result stream should be
# requested by the BQ storage client, meaning that results should arrive
# in the sorted order.
expected_data = [
[
("by", "sama"),
("id", 15),
("time_ts", datetime.datetime(2006, 10, 9, 19, 51, 1, tzinfo=UTC)),
],
[
("by", "pg"),
("id", 17),
("time_ts", datetime.datetime(2006, 10, 9, 19, 52, 45, tzinfo=UTC)),
],
[
("by", "pg"),
("id", 22),
("time_ts", datetime.datetime(2006, 10, 10, 2, 18, 22, tzinfo=UTC)),
],
]
self.assertEqual(fetched_data, expected_data)

@unittest.skipIf(
bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`"
)
Expand Down Expand Up @@ -2135,6 +2187,60 @@ def test_query_results_to_dataframe_w_bqstorage(self):
if not row[col] is None:
self.assertIsInstance(row[col], exp_datatypes[col])

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_query_results_to_dataframe_w_bqstorage_v1beta1(self):
dest_dataset = self.temp_dataset(_make_dataset_id("bqstorage_to_dataframe_"))
dest_ref = dest_dataset.table("query_results")

query = """
SELECT id, author, time_ts, dead
FROM `bigquery-public-data.hacker_news.comments`
LIMIT 10
"""

bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=Config.CLIENT._credentials
)

job_configs = (
# There is a known issue reading small anonymous query result
# tables with the BQ Storage API. Writing to a destination
# table works around this issue.
bigquery.QueryJobConfig(
destination=dest_ref, write_disposition="WRITE_TRUNCATE"
),
# Check that the client is able to work around the issue with
# reading small anonymous query result tables by falling back to
# the tabledata.list API.
None,
)

for job_config in job_configs:
df = (
Config.CLIENT.query(query, job_config=job_config)
.result()
.to_dataframe(bqstorage_client)
)

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 10) # verify the number of rows
column_names = ["id", "author", "time_ts", "dead"]
self.assertEqual(list(df), column_names)
exp_datatypes = {
"id": int,
"author": six.text_type,
"time_ts": pandas.Timestamp,
"dead": bool,
}
for index, row in df.iterrows():
for col in column_names:
# all the schema fields are nullable, so None is acceptable
if not row[col] is None:
self.assertIsInstance(row[col], exp_datatypes[col])

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_insert_rows_from_dataframe(self):
SF = bigquery.SchemaField
Expand Down

0 comments on commit 12b2171

Please sign in to comment.