Skip to content

Commit

Permalink
BUG: fix AttributeError with BQ Storage API to download empty results (
Browse files Browse the repository at this point in the history
…#310)

* BUG: fix AttributeError with BQ Storage API to download empty results

Refactors timestamp helpers to their own file to help reduce the size of
the gbq module.

* blacken

* fix lint

* fix test_zero_rows

* update release date
  • Loading branch information
tswast committed Feb 13, 2020
1 parent 612f165 commit e177978
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 135 deletions.
9 changes: 9 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
Changelog
=========

.. _changelog-0.13.1:

0.13.1 / 2020-02-13
-------------------

- Fix ``AttributeError`` with BQ Storage API to download empty results.
(:issue:`299`)


.. _changelog-0.13.0:

0.13.0 / 2019-12-12
Expand Down
30 changes: 7 additions & 23 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
try:
# The BigQuery Storage API client is an optional dependency. It is only
# required when use_bqstorage_api=True.
from google.cloud import bigquery_storage
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage = None
bigquery_storage_v1beta1 = None

from pandas_gbq.exceptions import AccessDenied
import pandas_gbq.schema
import pandas_gbq.timestamp


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -564,7 +566,7 @@ def _download_results(
df = _cast_empty_df_dtypes(schema_fields, df)

# Ensure any TIMESTAMP columns are tz-aware.
df = _localize_df(schema_fields, df)
df = pandas_gbq.timestamp.localize_df(df, schema_fields)

logger.debug("Got {} rows.\n".format(rows_iter.total_rows))
return df
Expand Down Expand Up @@ -784,29 +786,11 @@ def _cast_empty_df_dtypes(schema_fields, df):
return df


def _localize_df(schema_fields, df):
"""Localize any TIMESTAMP columns to tz-aware type.
In pandas versions before 0.24.0, DatetimeTZDtype cannot be used as the
dtype in Series/DataFrame construction, so localize those columns after
the DataFrame is constructed.
"""
for field in schema_fields:
column = str(field["name"])
if field["mode"].upper() == "REPEATED":
continue

if field["type"].upper() == "TIMESTAMP" and df[column].dt.tz is None:
df[column] = df[column].dt.tz_localize("UTC")

return df


def _make_bqstorage_client(use_bqstorage_api, credentials):
if not use_bqstorage_api:
return None

if bigquery_storage is None:
if bigquery_storage_v1beta1 is None:
raise ImportError(
"Install the google-cloud-bigquery-storage and fastavro/pyarrow "
"packages to use the BigQuery Storage API."
Expand All @@ -818,7 +802,7 @@ def _make_bqstorage_client(use_bqstorage_api, credentials):
client_info = google.api_core.gapic_v1.client_info.ClientInfo(
user_agent="pandas-{}".format(pandas.__version__)
)
return bigquery_storage.BigQueryStorageClient(
return bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=credentials, client_info=client_info
)

Expand Down
40 changes: 40 additions & 0 deletions pandas_gbq/timestamp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Helpers for working with TIMESTAMP data type.
Private module.
"""


def localize_df(df, schema_fields):
"""Localize any TIMESTAMP columns to tz-aware type.
In pandas versions before 0.24.0, DatetimeTZDtype cannot be used as the
dtype in Series/DataFrame construction, so localize those columns after
the DataFrame is constructed.
Parameters
----------
schema_fields: sequence of dict
BigQuery schema in parsed JSON data format.
df: pandaas.DataFrame
DataFrame in which to localize TIMESTAMP columns.
Returns
-------
pandas.DataFrame
DataFrame with localized TIMESTAMP columns.
"""
if len(df.index) == 0:
# If there are no rows, there is nothing to do.
# Fix for https://github.com/pydata/pandas-gbq/issues/299
return df

for field in schema_fields:
column = str(field["name"])
if "mode" in field and field["mode"].upper() == "REPEATED":
continue

if field["type"].upper() == "TIMESTAMP" and df[column].dt.tz is None:
df[column] = df[column].dt.tz_localize("UTC")

return df
78 changes: 78 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import google.oauth2.service_account
import pytest


@pytest.fixture(params=["env"])
def project(request, project_id):
if request.param == "env":
return project_id
elif request.param == "none":
return None


@pytest.fixture()
def credentials(private_key_path):
return google.oauth2.service_account.Credentials.from_service_account_file(
private_key_path
)


@pytest.fixture()
def gbq_connector(project, credentials):
from pandas_gbq import gbq

return gbq.GbqConnector(project, credentials=credentials)


@pytest.fixture()
def random_dataset(bigquery_client, random_dataset_id):
from google.cloud import bigquery

dataset_ref = bigquery_client.dataset(random_dataset_id)
dataset = bigquery.Dataset(dataset_ref)
bigquery_client.create_dataset(dataset)
return dataset


@pytest.fixture()
def tokyo_dataset(bigquery_client, random_dataset_id):
from google.cloud import bigquery

dataset_ref = bigquery_client.dataset(random_dataset_id)
dataset = bigquery.Dataset(dataset_ref)
dataset.location = "asia-northeast1"
bigquery_client.create_dataset(dataset)
return random_dataset_id


@pytest.fixture()
def tokyo_table(bigquery_client, tokyo_dataset):
table_id = "tokyo_table"
# Create a random table using DDL.
# https://github.com/GoogleCloudPlatform/golang-samples/blob/2ab2c6b79a1ea3d71d8f91609b57a8fbde07ae5d/bigquery/snippets/snippet.go#L739
bigquery_client.query(
"""CREATE TABLE {}.{}
AS SELECT
2000 + CAST(18 * RAND() as INT64) as year,
IF(RAND() > 0.5,"foo","bar") as token
FROM UNNEST(GENERATE_ARRAY(0,5,1)) as r
""".format(
tokyo_dataset, table_id
),
location="asia-northeast1",
).result()
return table_id


@pytest.fixture()
def gbq_dataset(project, credentials):
from pandas_gbq import gbq

return gbq._Dataset(project, credentials=credentials)


@pytest.fixture()
def gbq_table(project, credentials, random_dataset_id):
from pandas_gbq import gbq

return gbq._Table(project, random_dataset_id, credentials=credentials)
112 changes: 0 additions & 112 deletions tests/system/test_gbq.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# -*- coding: utf-8 -*-

import sys
import uuid
from datetime import datetime

import google.oauth2.service_account
import numpy as np
import pandas
import pandas.api.types
Expand All @@ -28,76 +26,6 @@ def test_imports():
gbq._test_google_api_imports()


@pytest.fixture(params=["env"])
def project(request, project_id):
if request.param == "env":
return project_id
elif request.param == "none":
return None


@pytest.fixture()
def credentials(private_key_path):
return google.oauth2.service_account.Credentials.from_service_account_file(
private_key_path
)


@pytest.fixture()
def gbq_connector(project, credentials):
return gbq.GbqConnector(project, credentials=credentials)


@pytest.fixture()
def random_dataset(bigquery_client, random_dataset_id):
from google.cloud import bigquery

dataset_ref = bigquery_client.dataset(random_dataset_id)
dataset = bigquery.Dataset(dataset_ref)
bigquery_client.create_dataset(dataset)
return dataset


@pytest.fixture()
def tokyo_dataset(bigquery_client, random_dataset_id):
from google.cloud import bigquery

dataset_ref = bigquery_client.dataset(random_dataset_id)
dataset = bigquery.Dataset(dataset_ref)
dataset.location = "asia-northeast1"
bigquery_client.create_dataset(dataset)
return random_dataset_id


@pytest.fixture()
def tokyo_table(bigquery_client, tokyo_dataset):
table_id = "tokyo_table"
# Create a random table using DDL.
# https://github.com/GoogleCloudPlatform/golang-samples/blob/2ab2c6b79a1ea3d71d8f91609b57a8fbde07ae5d/bigquery/snippets/snippet.go#L739
bigquery_client.query(
"""CREATE TABLE {}.{}
AS SELECT
2000 + CAST(18 * RAND() as INT64) as year,
IF(RAND() > 0.5,"foo","bar") as token
FROM UNNEST(GENERATE_ARRAY(0,5,1)) as r
""".format(
tokyo_dataset, table_id
),
location="asia-northeast1",
).result()
return table_id


@pytest.fixture()
def gbq_dataset(project, credentials):
return gbq._Dataset(project, credentials=credentials)


@pytest.fixture()
def gbq_table(project, credentials, random_dataset_id):
return gbq._Table(project, random_dataset_id, credentials=credentials)


def make_mixed_dataframe_v2(test_size):
# create df to test for all BQ datatypes except RECORD
bools = np.random.randint(2, size=(1, test_size)).astype(bool)
Expand Down Expand Up @@ -600,9 +528,6 @@ def test_zero_rows(self, project_id):
empty_columns,
columns=["name", "number", "is_hurricane", "iso_time"],
)
expected_result["iso_time"] = expected_result[
"iso_time"
].dt.tz_localize("UTC")
tm.assert_frame_equal(df, expected_result, check_index_type=False)

def test_one_row_one_column(self, project_id):
Expand Down Expand Up @@ -917,43 +842,6 @@ def test_tokyo(self, tokyo_dataset, tokyo_table, project_id):
assert df["max_year"][0] >= 2000


@pytest.mark.slow(reason="Large query for BQ Storage API tests.")
def test_read_gbq_w_bqstorage_api(credentials, random_dataset):
pytest.importorskip("google.cloud.bigquery_storage")
df = gbq.read_gbq(
"""
SELECT
total_amount,
passenger_count,
trip_distance
FROM `bigquery-public-data.new_york_taxi_trips.tlc_green_trips_2014`
-- Select non-null rows for no-copy conversion from Arrow to pandas.
WHERE total_amount IS NOT NULL
AND passenger_count IS NOT NULL
AND trip_distance IS NOT NULL
LIMIT 10000000
""",
use_bqstorage_api=True,
credentials=credentials,
configuration={
"query": {
"destinationTable": {
"projectId": random_dataset.project,
"datasetId": random_dataset.dataset_id,
"tableId": "".join(
[
"test_read_gbq_w_bqstorage_api_",
str(uuid.uuid4()).replace("-", "_"),
]
),
},
"writeDisposition": "WRITE_TRUNCATE",
}
},
)
assert len(df) == 10000000


class TestToGBQIntegration(object):
@pytest.fixture(autouse=True, scope="function")
def setup(self, project, credentials, random_dataset_id):
Expand Down

0 comments on commit e177978

Please sign in to comment.