Skip to content

Commit

Permalink
BUG: update library to support google-cloud-bigquery 2.0 (#334)
Browse files Browse the repository at this point in the history
* BUG: update library to support google-cloud-bigquery 2.0

Removes references to BQ Storage API beta endpoint.

* fix unit test mocks

* add date to changelog
  • Loading branch information
tswast committed Oct 5, 2020
1 parent 46c579a commit 0e3e3f0
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 56 deletions.
5 changes: 4 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ def bigquery_client(project_id, private_key_path):
@pytest.fixture()
def random_dataset_id(bigquery_client):
import google.api_core.exceptions
from google.cloud import bigquery

dataset_id = "".join(["pandas_gbq_", str(uuid.uuid4()).replace("-", "_")])
dataset_ref = bigquery_client.dataset(dataset_id)
dataset_ref = bigquery.DatasetReference(
bigquery_client.project, dataset_id
)
yield dataset_id
try:
bigquery_client.delete_dataset(dataset_ref, delete_contents=True)
Expand Down
5 changes: 3 additions & 2 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ Changelog

.. _changelog-0.14.0:

0.14.0 / TBD
------------
0.14.0 / 2020-10-05
-------------------

- Add ``dtypes`` argument to ``read_gbq``. Use this argument to override the
default ``dtype`` for a particular column in the query results. For
Expand All @@ -22,6 +22,7 @@ Changelog
Dependency updates
~~~~~~~~~~~~~~~~~~

- Support ``google-cloud-bigquery-storage`` 2.0 and higher. (:issue:`329`)
- Update the minimum version of ``pandas`` to 0.20.1.
(:issue:`331`)

Expand Down
9 changes: 9 additions & 0 deletions pandas_gbq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,12 @@ class InvalidPrivateKeyFormat(ValueError):
"""

pass


class PerformanceWarning(RuntimeWarning):
"""
Raised when a performance-related feature is requested, but unsupported.
Such warnings can occur when dependencies for the requested feature
aren't up-to-date.
"""
78 changes: 35 additions & 43 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,8 @@
bigquery = None
google_exceptions = None

try:
# The BigQuery Storage API client is an optional dependency. It is only
# required when use_bqstorage_api=True.
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None

from pandas_gbq.exceptions import AccessDenied
from pandas_gbq.exceptions import PerformanceWarning
import pandas_gbq.schema
import pandas_gbq.timestamp

Expand All @@ -30,7 +24,9 @@

BIGQUERY_INSTALLED_VERSION = None
BIGQUERY_CLIENT_INFO_VERSION = "1.12.0"
BIGQUERY_BQSTORAGE_VERSION = "1.24.0"
HAS_CLIENT_INFO = False
HAS_BQSTORAGE_SUPPORT = False

try:
import tqdm # noqa
Expand All @@ -39,26 +35,32 @@


def _check_google_client_version():
global BIGQUERY_INSTALLED_VERSION, HAS_CLIENT_INFO, SHOW_VERBOSE_DEPRECATION
global BIGQUERY_INSTALLED_VERSION, HAS_CLIENT_INFO, HAS_BQSTORAGE_SUPPORT, SHOW_VERBOSE_DEPRECATION

try:
import pkg_resources

except ImportError:
raise ImportError("Could not import pkg_resources (setuptools).")

# https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/bigquery/CHANGELOG.md
# https://github.com/googleapis/python-bigquery/blob/master/CHANGELOG.md
bigquery_minimum_version = pkg_resources.parse_version("1.11.0")
bigquery_client_info_version = pkg_resources.parse_version(
BIGQUERY_CLIENT_INFO_VERSION
)
bigquery_bqstorage_version = pkg_resources.parse_version(
BIGQUERY_BQSTORAGE_VERSION
)
BIGQUERY_INSTALLED_VERSION = pkg_resources.get_distribution(
"google-cloud-bigquery"
).parsed_version

HAS_CLIENT_INFO = (
BIGQUERY_INSTALLED_VERSION >= bigquery_client_info_version
)
HAS_BQSTORAGE_SUPPORT = (
BIGQUERY_INSTALLED_VERSION >= bigquery_bqstorage_version
)

if BIGQUERY_INSTALLED_VERSION < bigquery_minimum_version:
raise ImportError(
Expand Down Expand Up @@ -548,14 +550,30 @@ def _download_results(
if user_dtypes is None:
user_dtypes = {}

try:
bqstorage_client = None
if max_results is None:
# Only use the BigQuery Storage API if the full result set is requested.
bqstorage_client = _make_bqstorage_client(
self.use_bqstorage_api, self.credentials
)
if self.use_bqstorage_api and not HAS_BQSTORAGE_SUPPORT:
warnings.warn(
(
"use_bqstorage_api was set, but have google-cloud-bigquery "
"version {}. Requires google-cloud-bigquery version "
"{} or later."
).format(
BIGQUERY_INSTALLED_VERSION, BIGQUERY_BQSTORAGE_VERSION
),
PerformanceWarning,
stacklevel=4,
)

create_bqstorage_client = self.use_bqstorage_api
if max_results is not None:
create_bqstorage_client = False

to_dataframe_kwargs = {}
if HAS_BQSTORAGE_SUPPORT:
to_dataframe_kwargs[
"create_bqstorage_client"
] = create_bqstorage_client

try:
query_job.result()
# Get the table schema, so that we can list rows.
destination = self.client.get_table(query_job.destination)
Expand All @@ -568,16 +586,11 @@ def _download_results(
conversion_dtypes.update(user_dtypes)
df = rows_iter.to_dataframe(
dtypes=conversion_dtypes,
bqstorage_client=bqstorage_client,
progress_bar_type=progress_bar_type,
**to_dataframe_kwargs
)
except self.http_error as ex:
self.process_http_error(ex)
finally:
if bqstorage_client:
# Clean up open socket resources. See:
# https://github.com/pydata/pandas-gbq/issues/294
bqstorage_client.transport.channel.close()

if df.empty:
df = _cast_empty_df_dtypes(schema_fields, df)
Expand Down Expand Up @@ -763,27 +776,6 @@ def _cast_empty_df_dtypes(schema_fields, df):
return df


def _make_bqstorage_client(use_bqstorage_api, credentials):
if not use_bqstorage_api:
return None

if bigquery_storage_v1beta1 is None:
raise ImportError(
"Install the google-cloud-bigquery-storage and fastavro/pyarrow "
"packages to use the BigQuery Storage API."
)

import google.api_core.gapic_v1.client_info
import pandas

client_info = google.api_core.gapic_v1.client_info.ClientInfo(
user_agent="pandas-{}".format(pandas.__version__)
)
return bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=credentials, client_info=client_info
)


def read_gbq(
query,
project_id=None,
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def readme():
"pydata-google-auth",
"google-auth",
"google-auth-oauthlib",
"google-cloud-bigquery[bqstorage,pandas]>=1.11.1,<2.0.0dev",
"google-cloud-bigquery[bqstorage,pandas]>=1.11.1,<3.0.0dev",
]

extras = {"tqdm": "tqdm>=4.23.0"}
Expand Down
8 changes: 6 additions & 2 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ def gbq_connector(project, credentials):
def random_dataset(bigquery_client, random_dataset_id):
from google.cloud import bigquery

dataset_ref = bigquery_client.dataset(random_dataset_id)
dataset_ref = bigquery.DatasetReference(
bigquery_client.project, random_dataset_id
)
dataset = bigquery.Dataset(dataset_ref)
bigquery_client.create_dataset(dataset)
return dataset
Expand All @@ -38,7 +40,9 @@ def random_dataset(bigquery_client, random_dataset_id):
def tokyo_dataset(bigquery_client, random_dataset_id):
from google.cloud import bigquery

dataset_ref = bigquery_client.dataset(random_dataset_id)
dataset_ref = bigquery.DatasetReference(
bigquery_client.project, random_dataset_id
)
dataset = bigquery.Dataset(dataset_ref)
dataset.location = "asia-northeast1"
bigquery_client.create_dataset(dataset)
Expand Down
2 changes: 1 addition & 1 deletion tests/system/test_read_gbq_with_bqstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest


pytest.importorskip("google.cloud.bigquery_storage_v1beta1")
pytest.importorskip("google.cloud.bigquery", minversion="1.24.0")


@pytest.fixture
Expand Down
32 changes: 26 additions & 6 deletions tests/unit/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pandas_gbq import gbq


pytestmark = pytest.mark.filter_warnings(
pytestmark = pytest.mark.filterwarnings(
"ignore:credentials from Google Cloud SDK"
)
pandas_installed_version = pkg_resources.get_distribution(
Expand Down Expand Up @@ -490,9 +490,30 @@ def test_read_gbq_passes_dtypes(

mock_list_rows = mock_bigquery_client.list_rows("dest", max_results=100)

_, to_dataframe_kwargs = mock_list_rows.to_dataframe.call_args
assert to_dataframe_kwargs["dtypes"] == {"int_col": "my-custom-dtype"}


def test_read_gbq_use_bqstorage_api(
mock_bigquery_client, mock_service_account_credentials
):
gbq._check_google_client_version()
if not gbq.HAS_BQSTORAGE_SUPPORT:
pytest.skip("requires BigQuery Storage API")

mock_service_account_credentials.project_id = "service_account_project_id"
df = gbq.read_gbq(
"SELECT 1 AS int_col",
dialect="standard",
credentials=mock_service_account_credentials,
use_bqstorage_api=True,
)
assert df is not None

mock_list_rows = mock_bigquery_client.list_rows("dest", max_results=100)
mock_list_rows.to_dataframe.assert_called_once_with(
dtypes={"int_col": "my-custom-dtype"},
bqstorage_client=mock.ANY,
create_bqstorage_client=True,
dtypes=mock.ANY,
progress_bar_type=mock.ANY,
)

Expand All @@ -511,6 +532,5 @@ def test_read_gbq_calls_tqdm(

mock_list_rows = mock_bigquery_client.list_rows("dest", max_results=100)

mock_list_rows.to_dataframe.assert_called_once_with(
dtypes=mock.ANY, bqstorage_client=mock.ANY, progress_bar_type="foobar"
)
_, to_dataframe_kwargs = mock_list_rows.to_dataframe.call_args
assert to_dataframe_kwargs["progress_bar_type"] == "foobar"

0 comments on commit 0e3e3f0

Please sign in to comment.