Skip to content

Commit

Permalink
Use BQ Storage v1 stable in DB API
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Apr 28, 2020
1 parent 10091ef commit 3f73a43
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 29 deletions.
4 changes: 2 additions & 2 deletions google/cloud/bigquery/dbapi/connection.py
Expand Up @@ -30,7 +30,7 @@ class Connection(object):
A REST API client used to connect to BigQuery. If not passed, a
client is created using default options inferred from the environment.
bqstorage_client(\
Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \
Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient] \
):
A client that uses the faster BigQuery Storage API to fetch rows from
BigQuery. If not passed, it is created using the same credentials
Expand Down Expand Up @@ -106,7 +106,7 @@ def connect(client=None, bqstorage_client=None):
A REST API client used to connect to BigQuery. If not passed, a
client is created using default options inferred from the environment.
bqstorage_client(\
Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \
Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient] \
):
A client that uses the faster BigQuery Storage API to fetch rows from
BigQuery. If not passed, it is created using the same credentials
Expand Down
25 changes: 14 additions & 11 deletions google/cloud/bigquery/dbapi/cursor.py
Expand Up @@ -259,34 +259,37 @@ def _bqstorage_fetch(self, bqstorage_client):
Args:
bqstorage_client(\
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
google.cloud.bigquery_storage_v1.BigQueryReadClient \
):
A client tha know how to talk to the BigQuery Storage API.
Returns:
Iterable[Mapping]:
A sequence of rows, represented as dictionaries.
"""
# NOTE: Given that BQ storage client instance is passed in, it means
# that bigquery_storage_v1beta1 library is available (no ImportError).
from google.cloud import bigquery_storage_v1beta1
# Hitting this code path with a BQ Storage client instance implies that
# bigquery_storage_v1 can indeed be imported here without errors.
from google.cloud import bigquery_storage_v1

table_reference = self._query_job.destination

requested_session = bigquery_storage_v1.types.ReadSession(
table=table_reference.to_bqstorage(),
data_format=bigquery_storage_v1.enums.DataFormat.AVRO,
)

read_session = bqstorage_client.create_read_session(
table_reference.to_bqstorage(),
"projects/{}".format(table_reference.project),
parent="projects/{}".format(table_reference.project),
read_session=requested_session,
# a single stream only, as DB API is not well-suited for multithreading
requested_streams=1,
max_stream_count=1,
)

if not read_session.streams:
return iter([]) # empty table, nothing to read

read_position = bigquery_storage_v1beta1.types.StreamPosition(
stream=read_session.streams[0],
)
read_rows_stream = bqstorage_client.read_rows(read_position)
stream_name = read_session.streams[0].name
read_rows_stream = bqstorage_client.read_rows(stream_name)
rows_iterable = read_rows_stream.rows(read_session)
return rows_iterable

Expand Down
12 changes: 6 additions & 6 deletions tests/unit/test_dbapi_connection.py
Expand Up @@ -19,9 +19,9 @@
import six

try:
from google.cloud import bigquery_storage_v1beta1
from google.cloud import bigquery_storage_v1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None
bigquery_storage_v1 = None


class TestConnection(unittest.TestCase):
Expand All @@ -41,9 +41,9 @@ def _mock_client(self):
return mock_client

def _mock_bqstorage_client(self):
from google.cloud.bigquery_storage_v1beta1 import client
from google.cloud.bigquery_storage_v1 import client

mock_client = mock.create_autospec(client.BigQueryStorageClient)
mock_client = mock.create_autospec(client.BigQueryReadClient)
mock_client.transport = mock.Mock(spec=["channel"])
mock_client.transport.channel = mock.Mock(spec=["close"])
return mock_client
Expand All @@ -61,7 +61,7 @@ def test_ctor_wo_bqstorage_client(self):
self.assertIs(connection._bqstorage_client, mock_bqstorage_client)

@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_ctor_w_bqstorage_client(self):
from google.cloud.bigquery.dbapi import Connection
Expand Down Expand Up @@ -99,7 +99,7 @@ def test_connect_w_client(self):
self.assertIs(connection._bqstorage_client, mock_bqstorage_client)

@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_connect_w_both_clients(self):
from google.cloud.bigquery.dbapi import connect
Expand Down
20 changes: 10 additions & 10 deletions tests/unit/test_dbapi_cursor.py
Expand Up @@ -21,9 +21,9 @@
from google.api_core import exceptions

try:
from google.cloud import bigquery_storage_v1beta1
from google.cloud import bigquery_storage_v1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None
bigquery_storage_v1 = None


class TestCursor(unittest.TestCase):
Expand Down Expand Up @@ -58,17 +58,17 @@ def _mock_client(self, rows=None, schema=None, num_dml_affected_rows=None):
return mock_client

def _mock_bqstorage_client(self, rows=None, stream_count=0):
from google.cloud.bigquery_storage_v1beta1 import client
from google.cloud.bigquery_storage_v1beta1 import types
from google.cloud.bigquery_storage_v1 import client
from google.cloud.bigquery_storage_v1 import types

if rows is None:
rows = []

mock_client = mock.create_autospec(client.BigQueryStorageClient)
mock_client = mock.create_autospec(client.BigQueryReadClient)

mock_read_session = mock.MagicMock(
streams=[
types.Stream(name="streams/stream_{}".format(i))
types.ReadStream(name="streams/stream_{}".format(i))
for i in range(stream_count)
]
)
Expand Down Expand Up @@ -242,7 +242,7 @@ def test_fetchall_w_row(self):
self.assertEqual(rows[0], (1,))

@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_fetchall_w_bqstorage_client_fetch_success(self):
from google.cloud.bigquery import dbapi
Expand Down Expand Up @@ -285,7 +285,7 @@ def test_fetchall_w_bqstorage_client_fetch_success(self):
self.assertEqual(sorted_row_data, expected_row_data)

@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_fetchall_w_bqstorage_client_fetch_no_rows(self):
from google.cloud.bigquery import dbapi
Expand All @@ -308,7 +308,7 @@ def test_fetchall_w_bqstorage_client_fetch_no_rows(self):
self.assertEqual(rows, [])

@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_fetchall_w_bqstorage_client_fetch_error_no_fallback(self):
from google.cloud.bigquery import dbapi
Expand Down Expand Up @@ -336,7 +336,7 @@ def test_fetchall_w_bqstorage_client_fetch_error_no_fallback(self):
mock_client.list_rows.assert_not_called()

@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_fetchall_w_bqstorage_client_fetch_error_fallback_on_client(self):
from google.cloud.bigquery import dbapi
Expand Down

0 comments on commit 3f73a43

Please sign in to comment.