Skip to content

Commit

Permalink
Remove BQ Storage v1beta1 compatibility code
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Sep 25, 2020
1 parent c75e9cd commit be1459f
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 611 deletions.
64 changes: 14 additions & 50 deletions google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,19 +578,7 @@ def _bqstorage_page_to_dataframe(column_names, dtypes, page):
def _download_table_bqstorage_stream(
download_state, bqstorage_client, session, stream, worker_queue, page_to_item
):
# Passing a BQ Storage client in implies that the BigQuery Storage library
# is available and can be imported.
from google.cloud import bigquery_storage_v1beta1

# We want to preserve comaptibility with the v1beta1 BQ Storage clients,
# thus adjust constructing the rowstream if needed.
# The assumption is that the caller provides a BQ Storage `session` that is
# compatible with the version of the BQ Storage client passed in.
if isinstance(bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient):
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
rowstream = bqstorage_client.read_rows(position).rows(session)
else:
rowstream = bqstorage_client.read_rows(stream.name).rows(session)
rowstream = bqstorage_client.read_rows(stream.name).rows(session)

for page in rowstream.pages:
if download_state.done:
Expand Down Expand Up @@ -625,8 +613,7 @@ def _download_table_bqstorage(

# Passing a BQ Storage client in implies that the BigQuery Storage library
# is available and can be imported.
from google.cloud import bigquery_storage_v1
from google.cloud import bigquery_storage_v1beta1
from google.cloud.bigquery import storage

if "$" in table.table_id:
raise ValueError(
Expand All @@ -637,41 +624,18 @@ def _download_table_bqstorage(

requested_streams = 1 if preserve_order else 0

# We want to preserve comaptibility with the v1beta1 BQ Storage clients,
# thus adjust the session creation if needed.
if isinstance(bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient):
warnings.warn(
"Support for BigQuery Storage v1beta1 clients is deprecated, please "
"consider upgrading the client to BigQuery Storage v1 stable version.",
category=DeprecationWarning,
)
read_options = bigquery_storage_v1beta1.types.TableReadOptions()

if selected_fields is not None:
for field in selected_fields:
read_options.selected_fields.append(field.name)

session = bqstorage_client.create_read_session(
table.to_bqstorage(v1beta1=True),
"projects/{}".format(project_id),
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
read_options=read_options,
requested_streams=requested_streams,
)
else:
requested_session = bigquery_storage_v1.types.ReadSession(
table=table.to_bqstorage(),
data_format=bigquery_storage_v1.enums.DataFormat.ARROW,
)
if selected_fields is not None:
for field in selected_fields:
requested_session.read_options.selected_fields.append(field.name)

session = bqstorage_client.create_read_session(
parent="projects/{}".format(project_id),
read_session=requested_session,
max_stream_count=requested_streams,
)
requested_session = storage.types.ReadSession(
table=table.to_bqstorage(), data_format=storage.types.DataFormat.ARROW
)
if selected_fields is not None:
for field in selected_fields:
requested_session.read_options.selected_fields.append(field.name)

session = bqstorage_client.create_read_session(
parent="projects/{}".format(project_id),
read_session=requested_session,
max_stream_count=requested_streams,
)

_LOGGER.debug(
"Started reading table '{}.{}.{}' with BQ Storage API session '{}'.".format(
Expand Down
54 changes: 13 additions & 41 deletions google/cloud/bigquery/dbapi/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import collections
import copy
import warnings

try:
from collections import abc as collections_abc
Expand Down Expand Up @@ -267,54 +266,27 @@ def _bqstorage_fetch(self, bqstorage_client):
A sequence of rows, represented as dictionaries.
"""
# Hitting this code path with a BQ Storage client instance implies that
# bigquery_storage_v1* can indeed be imported here without errors.
from google.cloud import bigquery_storage_v1
from google.cloud import bigquery_storage_v1beta1
# bigquery.storage can indeed be imported here without errors.
from google.cloud.bigquery import storage

table_reference = self._query_job.destination

is_v1beta1_client = isinstance(
bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient
requested_session = storage.types.ReadSession(
table=table_reference.to_bqstorage(),
data_format=storage.types.DataFormat.ARROW,
)
read_session = bqstorage_client.create_read_session(
parent="projects/{}".format(table_reference.project),
read_session=requested_session,
# a single stream only, as DB API is not well-suited for multithreading
max_stream_count=1,
)

# We want to preserve compatibility with the v1beta1 BQ Storage clients,
# thus adjust the session creation if needed.
if is_v1beta1_client:
warnings.warn(
"Support for BigQuery Storage v1beta1 clients is deprecated, please "
"consider upgrading the client to BigQuery Storage v1 stable version.",
category=DeprecationWarning,
)
read_session = bqstorage_client.create_read_session(
table_reference.to_bqstorage(v1beta1=True),
"projects/{}".format(table_reference.project),
# a single stream only, as DB API is not well-suited for multithreading
requested_streams=1,
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
)
else:
requested_session = bigquery_storage_v1.types.ReadSession(
table=table_reference.to_bqstorage(),
data_format=bigquery_storage_v1.enums.DataFormat.ARROW,
)
read_session = bqstorage_client.create_read_session(
parent="projects/{}".format(table_reference.project),
read_session=requested_session,
# a single stream only, as DB API is not well-suited for multithreading
max_stream_count=1,
)

if not read_session.streams:
return iter([]) # empty table, nothing to read

if is_v1beta1_client:
read_position = bigquery_storage_v1beta1.types.StreamPosition(
stream=read_session.streams[0],
)
read_rows_stream = bqstorage_client.read_rows(read_position)
else:
stream_name = read_session.streams[0].name
read_rows_stream = bqstorage_client.read_rows(stream_name)
stream_name = read_session.streams[0].name
read_rows_stream = bqstorage_client.read_rows(stream_name)

rows_iterable = read_rows_stream.rows(read_session)
return rows_iterable
Expand Down
4 changes: 3 additions & 1 deletion google/cloud/bigquery/magics/magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,4 +676,6 @@ def _close_transports(client, bqstorage_client):
"""
client.close()
if bqstorage_client is not None:
bqstorage_client.transport.channel.close()
# import pudb; pu.db
# bqstorage_client.transport.channel.close()
bqstorage_client._transport.grpc_channel.close()
64 changes: 13 additions & 51 deletions google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@

import six

try:
# Needed for the to_bqstorage() method.
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None

try:
import pandas
except ImportError: # pragma: NO COVER
Expand Down Expand Up @@ -228,7 +222,7 @@ def to_api_repr(self):
"tableId": self._table_id,
}

def to_bqstorage(self, v1beta1=False):
def to_bqstorage(self):
"""Construct a BigQuery Storage API representation of this table.
Install the ``google-cloud-bigquery-storage`` package to use this
Expand All @@ -237,41 +231,21 @@ def to_bqstorage(self, v1beta1=False):
If the ``table_id`` contains a partition identifier (e.g.
``my_table$201812``) or a snapshot identifier (e.g.
``mytable@1234567890``), it is ignored. Use
:class:`google.cloud.bigquery_storage_v1.types.ReadSession.TableReadOptions`
:class:`google.cloud.bigquery.storage.types.ReadSession.TableReadOptions`
to filter rows by partition. Use
:class:`google.cloud.bigquery_storage_v1.types.ReadSession.TableModifiers`
:class:`google.cloud.bigquery.storage.types.ReadSession.TableModifiers`
to select a specific snapshot to read from.
Args:
v1beta1 (Optiona[bool]):
If :data:`True`, return representation compatible with BigQuery
Storage ``v1beta1`` version. Defaults to :data:`False`.
Returns:
Union[str, google.cloud.bigquery_storage_v1beta1.types.TableReference:]:
A reference to this table in the BigQuery Storage API.
Raises:
ValueError:
If ``v1beta1`` compatibility is requested, but the
:mod:`google.cloud.bigquery_storage_v1beta1` module cannot be imported.
str: A reference to this table in the BigQuery Storage API.
"""
if v1beta1 and bigquery_storage_v1beta1 is None:
raise ValueError(_NO_BQSTORAGE_ERROR)

table_id, _, _ = self._table_id.partition("@")
table_id, _, _ = table_id.partition("$")

if v1beta1:
table_ref = bigquery_storage_v1beta1.types.TableReference(
project_id=self._project,
dataset_id=self._dataset_id,
table_id=table_id,
)
else:
table_ref = "projects/{}/datasets/{}/tables/{}".format(
self._project, self._dataset_id, table_id,
)
table_ref = "projects/{}/datasets/{}/tables/{}".format(
self._project, self._dataset_id, table_id,
)

return table_ref

Expand Down Expand Up @@ -876,19 +850,13 @@ def to_api_repr(self):
"""
return copy.deepcopy(self._properties)

def to_bqstorage(self, v1beta1=False):
def to_bqstorage(self):
"""Construct a BigQuery Storage API representation of this table.
Args:
v1beta1 (Optiona[bool]):
If :data:`True`, return representation compatible with BigQuery
Storage ``v1beta1`` version. Defaults to :data:`False`.
Returns:
Union[str, google.cloud.bigquery_storage_v1beta1.types.TableReference:]:
A reference to this table in the BigQuery Storage API.
str: A reference to this table in the BigQuery Storage API.
"""
return self.reference.to_bqstorage(v1beta1=v1beta1)
return self.reference.to_bqstorage()

def _build_resource(self, filter_fields):
"""Generate a resource for ``update``."""
Expand Down Expand Up @@ -1096,19 +1064,13 @@ def from_string(cls, full_table_id):
{"tableReference": TableReference.from_string(full_table_id).to_api_repr()}
)

def to_bqstorage(self, v1beta1=False):
def to_bqstorage(self):
"""Construct a BigQuery Storage API representation of this table.
Args:
v1beta1 (Optiona[bool]):
If :data:`True`, return representation compatible with BigQuery
Storage ``v1beta1`` version. Defaults to :data:`False`.
Returns:
Union[str, google.cloud.bigquery_storage_v1beta1.types.TableReference:]:
A reference to this table in the BigQuery Storage API.
str: A reference to this table in the BigQuery Storage API.
"""
return self.reference.to_bqstorage(v1beta1=v1beta1)
return self.reference.to_bqstorage()


def _row_from_mapping(mapping, schema):
Expand Down

0 comments on commit be1459f

Please sign in to comment.