Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support and tests for struct fields #146

Merged
merged 7 commits into from Aug 3, 2020
15 changes: 8 additions & 7 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -287,13 +287,14 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
"""
if bq_schema:
bq_schema = schema._to_schema_fields(bq_schema)
for field in bq_schema:
if field.field_type in schema._STRUCT_TYPES:
raise ValueError(
"Uploading dataframes with struct (record) column types "
"is not supported. See: "
"https://github.com/googleapis/google-cloud-python/issues/8191"
)
if six.PY2:
for field in bq_schema:
if field.field_type in schema._STRUCT_TYPES:
raise ValueError(
"Uploading dataframes with struct (record) column types "
"is not supported under Python2. See: "
"https://github.com/googleapis/python-bigquery/issues/21"
)
bq_schema_index = {field.name: field for field in bq_schema}
bq_schema_unused = set(bq_schema_index.keys())
else:
Expand Down
6 changes: 2 additions & 4 deletions setup.py
Expand Up @@ -47,10 +47,8 @@
],
"pandas": ["pandas>=0.17.1"],
# Exclude PyArrow dependency from Windows Python 2.7.
'pyarrow: platform_system != "Windows" or python_version >= "3.4"': [
# Bad Linux release for 0.14.0.
# https://issues.apache.org/jira/browse/ARROW-5868
"pyarrow>=0.4.1, != 0.14.0"
'pyarrow: platform_system != "Windows" or python_version >= "3.5"': [
"pyarrow>=0.17.0"
],
"tqdm": ["tqdm >= 4.0.0, <5.0.0dev"],
"fastparquet": [
Expand Down
44 changes: 44 additions & 0 deletions tests/system.py
Expand Up @@ -131,6 +131,8 @@

PANDAS_MINIMUM_VERSION = pkg_resources.parse_version("1.0.0")
PANDAS_INSTALLED_VERSION = pkg_resources.get_distribution("pandas").parsed_version
PYARROW_MINIMUM_VERSION = pkg_resources.parse_version("0.17.0")
PYARROW_INSTALLED_VERSION = pkg_resources.get_distribution("pyarrow").parsed_version


def _has_rows(result):
Expand Down Expand Up @@ -1075,6 +1077,48 @@ def test_load_table_from_dataframe_w_explicit_schema(self):
self.assertEqual(tuple(table.schema), table_schema)
self.assertEqual(table.num_rows, 3)

@unittest.skipIf(
pyarrow is None or PYARROW_INSTALLED_VERSION < PYARROW_MINIMUM_VERSION,
"Only `pyarrow version >=0.17.0` is supported",
)
@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_load_table_from_dataframe_w_struct_datatype(self):
"""Test that a DataFrame with struct datatype can be uploaded if a
BigQuery schema is specified.

https://github.com/googleapis/python-bigquery/issues/21
"""
dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_dataframe_w_struct_datatype".format(
Config.CLIENT.project, dataset_id
)
table_schema = [
bigquery.SchemaField(
"bar",
"RECORD",
fields=[
bigquery.SchemaField("id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
],
mode="REQUIRED",
),
]
table = retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

df_data = [{"id": 1, "age": 21}, {"id": 2, "age": 22}, {"id": 2, "age": 23}]
dataframe = pandas.DataFrame(data={"bar": df_data}, columns=["bar"])

load_job = Config.CLIENT.load_table_from_dataframe(dataframe, table_id)
load_job.result()

table = Config.CLIENT.get_table(table_id)
self.assertEqual(table.schema, table_schema)
self.assertEqual(table.num_rows, 3)

def test_load_table_from_json_basic_use(self):
table_schema = (
bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
Expand Down
58 changes: 48 additions & 10 deletions tests/unit/test_client.py
Expand Up @@ -7373,19 +7373,22 @@ def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema(se

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_struct_fields_error(self):
def test_load_table_from_dataframe_struct_fields(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField

client = self._make_client()

records = [{"float_column": 3.14, "struct_column": [{"foo": 1}, {"bar": -1}]}]
dataframe = pandas.DataFrame(data=records)
records = [(3.14, {"foo": 1, "bar": 1})]
dataframe = pandas.DataFrame(
data=records, columns=["float_column", "struct_column"]
)

schema = [
SchemaField("float_column", "FLOAT"),
SchemaField(
"agg_col",
"struct_column",
"RECORD",
fields=[SchemaField("foo", "INTEGER"), SchemaField("bar", "INTEGER")],
),
Expand All @@ -7396,14 +7399,49 @@ def test_load_table_from_dataframe_struct_fields_error(self):
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)

with pytest.raises(ValueError) as exc_info, load_patch:
client.load_table_from_dataframe(
dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION
if six.PY2:
with pytest.raises(ValueError) as exc_info, load_patch:
client.load_table_from_dataframe(
dataframe,
self.TABLE_REF,
job_config=job_config,
location=self.LOCATION,
)

err_msg = str(exc_info.value)
assert "struct" in err_msg
assert "not support" in err_msg

else:
get_table_patch = mock.patch(
"google.cloud.bigquery.client.Client.get_table",
autospec=True,
side_effect=google.api_core.exceptions.NotFound("Table not found"),
)
with load_patch as load_table_from_file, get_table_patch:
client.load_table_from_dataframe(
dataframe,
self.TABLE_REF,
job_config=job_config,
location=self.LOCATION,
)

load_table_from_file.assert_called_once_with(
client,
mock.ANY,
self.TABLE_REF,
num_retries=_DEFAULT_NUM_RETRIES,
rewind=True,
job_id=mock.ANY,
job_id_prefix=None,
location=self.LOCATION,
project=None,
job_config=mock.ANY,
)

err_msg = str(exc_info.value)
assert "struct" in err_msg
assert "not support" in err_msg
sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert sent_config.source_format == job.SourceFormat.PARQUET
assert sent_config.schema == schema

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
Expand Down