Skip to content

Commit

Permalink
Adjust code to new BQ Storage 2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Sep 27, 2020
1 parent be1459f commit ace6762
Show file tree
Hide file tree
Showing 16 changed files with 209 additions and 195 deletions.
11 changes: 3 additions & 8 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -22,11 +22,6 @@
import six
from six.moves import queue

try:
from google.cloud import bigquery_storage_v1
except ImportError: # pragma: NO COVER
bigquery_storage_v1 = None

try:
import pandas
except ImportError: # pragma: NO COVER
Expand Down Expand Up @@ -613,7 +608,7 @@ def _download_table_bqstorage(

# Passing a BQ Storage client in implies that the BigQuery Storage library
# is available and can be imported.
from google.cloud.bigquery import storage
from google.cloud import bigquery_storage

if "$" in table.table_id:
raise ValueError(
Expand All @@ -624,8 +619,8 @@ def _download_table_bqstorage(

requested_streams = 1 if preserve_order else 0

requested_session = storage.types.ReadSession(
table=table.to_bqstorage(), data_format=storage.types.DataFormat.ARROW
requested_session = bigquery_storage.types.ReadSession(
table=table.to_bqstorage(), data_format=bigquery_storage.types.DataFormat.ARROW
)
if selected_fields is not None:
for field in selected_fields:
Expand Down
6 changes: 3 additions & 3 deletions google/cloud/bigquery/client.py
Expand Up @@ -435,19 +435,19 @@ def _create_bqstorage_client(self):
warning and return ``None``.
Returns:
Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]:
Optional[google.cloud.bigquery_storage.BigQueryReadClient]:
A BigQuery Storage API client.
"""
try:
from google.cloud import bigquery_storage_v1
from google.cloud import bigquery_storage
except ImportError:
warnings.warn(
"Cannot create BigQuery Storage client, the dependency "
"google-cloud-bigquery-storage is not installed."
)
return None

return bigquery_storage_v1.BigQueryReadClient(credentials=self._credentials)
return bigquery_storage.BigQueryReadClient(credentials=self._credentials)

def create_dataset(
self, dataset, exists_ok=False, retry=DEFAULT_RETRY, timeout=None
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/bigquery/dbapi/connection.py
Expand Up @@ -73,7 +73,7 @@ def close(self):

if self._owns_bqstorage_client:
# There is no close() on the BQ Storage client itself.
self._bqstorage_client.transport.channel.close()
self._bqstorage_client._transport.grpc_channel.close()

for cursor_ in self._cursors_created:
cursor_.close()
Expand Down
6 changes: 3 additions & 3 deletions google/cloud/bigquery/dbapi/cursor.py
Expand Up @@ -267,13 +267,13 @@ def _bqstorage_fetch(self, bqstorage_client):
"""
# Hitting this code path with a BQ Storage client instance implies that
# bigquery.storage can indeed be imported here without errors.
from google.cloud.bigquery import storage
from google.cloud import bigquery_storage

table_reference = self._query_job.destination

requested_session = storage.types.ReadSession(
requested_session = bigquery_storage.types.ReadSession(
table=table_reference.to_bqstorage(),
data_format=storage.types.DataFormat.ARROW,
data_format=bigquery_storage.types.DataFormat.ARROW,
)
read_session = bqstorage_client.create_read_session(
parent="projects/{}".format(table_reference.project),
Expand Down
8 changes: 3 additions & 5 deletions google/cloud/bigquery/magics/magics.py
Expand Up @@ -637,7 +637,7 @@ def _make_bqstorage_client(use_bqstorage_api, credentials):
return None

try:
from google.cloud import bigquery_storage_v1
from google.cloud import bigquery_storage
except ImportError as err:
customized_error = ImportError(
"The default BigQuery Storage API client cannot be used, install "
Expand All @@ -655,7 +655,7 @@ def _make_bqstorage_client(use_bqstorage_api, credentials):
)
six.raise_from(customized_error, err)

return bigquery_storage_v1.BigQueryReadClient(
return bigquery_storage.BigQueryReadClient(
credentials=credentials,
client_info=gapic_client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT),
)
Expand All @@ -670,12 +670,10 @@ def _close_transports(client, bqstorage_client):
Args:
client (:class:`~google.cloud.bigquery.client.Client`):
bqstorage_client
(Optional[:class:`~google.cloud.bigquery_storage_v1.BigQueryReadClient`]):
(Optional[:class:`~google.cloud.bigquery_storage.BigQueryReadClient`]):
A client for the BigQuery Storage API.
"""
client.close()
if bqstorage_client is not None:
# import pudb; pu.db
# bqstorage_client.transport.channel.close()
bqstorage_client._transport.grpc_channel.close()
2 changes: 1 addition & 1 deletion google/cloud/bigquery/table.py
Expand Up @@ -1521,7 +1521,7 @@ def to_arrow(
progress_bar.close()
finally:
if owns_bqstorage_client:
bqstorage_client.transport.channel.close()
bqstorage_client._transport.grpc_channel.close()

if record_batches:
return pyarrow.Table.from_batches(record_batches)
Expand Down
14 changes: 4 additions & 10 deletions noxfile.py
Expand Up @@ -49,16 +49,10 @@ def default(session):
constraints_path,
)

if session.python == "2.7":
# The [all] extra is not installable on Python 2.7.
session.install("-e", ".[pandas,pyarrow]", "-c", constraints_path)
elif session.python == "3.5":
session.install("-e", ".[all]", "-c", constraints_path)
else:
# fastparquet is not included in .[all] because, in general, it's
# redundant with pyarrow. We still want to run some unit tests with
# fastparquet serialization, though.
session.install("-e", ".[all,fastparquet]", "-c", constraints_path)
# fastparquet is not included in .[all] because, in general, it's
# redundant with pyarrow. We still want to run some unit tests with
# fastparquet serialization, though.
session.install("-e", ".[all,fastparquet]", "-c", constraints_path)

session.install("ipython", "-c", constraints_path)

Expand Down
9 changes: 3 additions & 6 deletions setup.py
Expand Up @@ -22,7 +22,7 @@

name = "google-cloud-bigquery"
description = "Google BigQuery API client library"
version = "1.28.0"
version = "2.0.0"
# Should be one of:
# 'Development Status :: 3 - Alpha'
# 'Development Status :: 4 - Beta'
Expand All @@ -37,7 +37,7 @@
]
extras = {
"bqstorage": [
"google-cloud-bigquery-storage >= 1.0.0, <2.0.0dev",
"google-cloud-bigquery-storage >= 2.0.0, <3.0.0dev",
# Due to an issue in pip's dependency resolver, the `grpc` extra is not
# installed, even though `google-cloud-bigquery-storage` specifies it
# as `google-api-core[grpc]`. We thus need to explicitly specify it here.
Expand Down Expand Up @@ -118,10 +118,7 @@
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
Expand All @@ -133,7 +130,7 @@
namespace_packages=namespaces,
install_requires=dependencies,
extras_require=extras,
python_requires=">=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*",
python_requires=">=3.6",
include_package_data=True,
zip_safe=False,
)
32 changes: 21 additions & 11 deletions tests/system.py
Expand Up @@ -34,9 +34,9 @@
import pkg_resources

try:
from google.cloud.bigquery import storage
from google.cloud import bigquery_storage
except ImportError: # pragma: NO COVER
storage = None
bigquery_storage = None

try:
import fastavro # to parse BQ storage client results
Expand Down Expand Up @@ -1790,10 +1790,12 @@ def test_dbapi_fetchall(self):
row_tuples = [r.values() for r in rows]
self.assertEqual(row_tuples, [(1, 2), (3, 4), (5, 6)])

@unittest.skipIf(storage is None, "Requires `google-cloud-bigquery-storage`")
@unittest.skipIf(
bigquery_storage is None, "Requires `google-cloud-bigquery-storage`"
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_dbapi_fetch_w_bqstorage_client_large_result_set(self):
bqstorage_client = storage.BigQueryReadClient(
bqstorage_client = bigquery_storage.BigQueryReadClient(
credentials=Config.CLIENT._credentials
)
cursor = dbapi.connect(Config.CLIENT, bqstorage_client).cursor()
Expand Down Expand Up @@ -1850,7 +1852,9 @@ def test_dbapi_dry_run_query(self):

self.assertEqual(list(rows), [])

@unittest.skipIf(storage is None, "Requires `google-cloud-bigquery-storage`")
@unittest.skipIf(
bigquery_storage is None, "Requires `google-cloud-bigquery-storage`"
)
def test_dbapi_connection_does_not_leak_sockets(self):
current_process = psutil.Process()
conn_count_start = len(current_process.connections())
Expand Down Expand Up @@ -2278,15 +2282,17 @@ def test_query_results_to_dataframe(self):
self.assertIsInstance(row[col], exp_datatypes[col])

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(storage is None, "Requires `google-cloud-bigquery-storage`")
@unittest.skipIf(
bigquery_storage is None, "Requires `google-cloud-bigquery-storage`"
)
def test_query_results_to_dataframe_w_bqstorage(self):
query = """
SELECT id, author, time_ts, dead
FROM `bigquery-public-data.hacker_news.comments`
LIMIT 10
"""

bqstorage_client = storage.BigQueryReadClient(
bqstorage_client = bigquery_storage.BigQueryReadClient(
credentials=Config.CLIENT._credentials
)

Expand Down Expand Up @@ -2575,7 +2581,9 @@ def _fetch_dataframe(self, query):
return Config.CLIENT.query(query).result().to_dataframe()

@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
@unittest.skipIf(storage is None, "Requires `google-cloud-bigquery-storage`")
@unittest.skipIf(
bigquery_storage is None, "Requires `google-cloud-bigquery-storage`"
)
def test_nested_table_to_arrow(self):
from google.cloud.bigquery.job import SourceFormat
from google.cloud.bigquery.job import WriteDisposition
Expand Down Expand Up @@ -2610,7 +2618,7 @@ def test_nested_table_to_arrow(self):
job_config.schema = schema
# Load a table using a local JSON file from memory.
Config.CLIENT.load_table_from_file(body, table, job_config=job_config).result()
bqstorage_client = storage.BigQueryReadClient(
bqstorage_client = bigquery_storage.BigQueryReadClient(
credentials=Config.CLIENT._credentials
)

Expand Down Expand Up @@ -2765,12 +2773,14 @@ def test_list_rows_page_size(self):
self.assertEqual(page.num_items, num_last_page)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(storage is None, "Requires `google-cloud-bigquery-storage`")
@unittest.skipIf(
bigquery_storage is None, "Requires `google-cloud-bigquery-storage`"
)
def test_list_rows_max_results_w_bqstorage(self):
table_ref = DatasetReference("bigquery-public-data", "utility_us").table(
"country_code_iso"
)
bqstorage_client = storage.BigQueryReadClient(
bqstorage_client = bigquery_storage.BigQueryReadClient(
credentials=Config.CLIENT._credentials
)

Expand Down
20 changes: 0 additions & 20 deletions tests/unit/test__pandas_helpers.py
Expand Up @@ -773,26 +773,6 @@ def test_dataframe_to_bq_schema_dict_sequence(module_under_test):
assert returned_schema == expected_schema


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(not six.PY2, reason="Requires Python 2.7")
def test_dataframe_to_bq_schema_w_struct_raises_py27(module_under_test):
dataframe = pandas.DataFrame(
data=[{"struct_field": {"int_col": 1}}, {"struct_field": {"int_col": 2}}]
)
bq_schema = [
schema.SchemaField(
"struct_field",
field_type="STRUCT",
fields=[schema.SchemaField("int_col", field_type="INT64")],
),
]

with pytest.raises(ValueError) as excinfo:
module_under_test.dataframe_to_bq_schema(dataframe, bq_schema=bq_schema)

assert "struct (record) column types is not supported" in str(excinfo.value)


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`")
def test_dataframe_to_arrow_with_multiindex(module_under_test):
Expand Down
14 changes: 7 additions & 7 deletions tests/unit/test_client.py
Expand Up @@ -62,9 +62,9 @@
from google.cloud.bigquery.dataset import DatasetReference

try:
from google.cloud import bigquery_storage_v1
from google.cloud import bigquery_storage
except (ImportError, AttributeError): # pragma: NO COVER
bigquery_storage_v1 = None
bigquery_storage = None
from test_utils.imports import maybe_fail_import
from tests.unit.helpers import make_connection

Expand Down Expand Up @@ -794,17 +794,17 @@ def test_get_dataset(self):
self.assertEqual(dataset.dataset_id, self.DS_ID)

@unittest.skipIf(
bigquery_storage_v1 is None, "Requires `google-cloud-bigquery-storage`"
bigquery_storage is None, "Requires `google-cloud-bigquery-storage`"
)
def test_create_bqstorage_client(self):
mock_client = mock.create_autospec(bigquery_storage_v1.BigQueryReadClient)
mock_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
mock_client_instance = object()
mock_client.return_value = mock_client_instance
creds = _make_credentials()
client = self._make_one(project=self.PROJECT, credentials=creds)

with mock.patch(
"google.cloud.bigquery_storage_v1.BigQueryReadClient", mock_client
"google.cloud.bigquery_storage.BigQueryReadClient", mock_client
):
bqstorage_client = client._create_bqstorage_client()

Expand All @@ -817,8 +817,8 @@ def test_create_bqstorage_client_missing_dependency(self):

def fail_bqstorage_import(name, globals, locals, fromlist, level):
# NOTE: *very* simplified, assuming a straightforward absolute import
return "bigquery_storage_v1" in name or (
fromlist is not None and "bigquery_storage_v1" in fromlist
return "bigquery_storage" in name or (
fromlist is not None and "bigquery_storage" in fromlist
)

no_bqstorage = maybe_fail_import(predicate=fail_bqstorage_import)
Expand Down

0 comments on commit ace6762

Please sign in to comment.