Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): unit and system test for dataframe with int column with Nan values #39

Merged
merged 12 commits into from May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 64 additions & 0 deletions tests/system.py
Expand Up @@ -31,6 +31,7 @@
import psutil
import pytest
import pytz
import pkg_resources

try:
from google.cloud import bigquery_storage_v1beta1
Expand Down Expand Up @@ -125,6 +126,9 @@
(TooManyRequests, InternalServerError, ServiceUnavailable)
)

PANDAS_MINIMUM_VERSION = pkg_resources.parse_version("1.0.0")
PANDAS_INSTALLED_VERSION = pkg_resources.get_distribution("pandas").parsed_version


def _has_rows(result):
return len(result) > 0
Expand Down Expand Up @@ -742,6 +746,66 @@ def test_load_table_from_dataframe_w_automatic_schema(self):
)
self.assertEqual(table.num_rows, 3)

@unittest.skipIf(
pandas is None or PANDAS_INSTALLED_VERSION < PANDAS_MINIMUM_VERSION,
"Only `pandas version >=1.0.0` is supported",
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nullable_int64_datatype(self):
"""Test that a DataFrame containing column with None-type values and int64 datatype
can be uploaded if a BigQuery schema is specified.

https://github.com/googleapis/python-bigquery/issues/22
"""

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_dataframe_w_nullable_int64_datatype".format(
Config.CLIENT.project, dataset_id
)
table_schema = (bigquery.SchemaField("x", "INTEGER", mode="NULLABLE"),)
table = retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

df_data = collections.OrderedDict(
[("x", pandas.Series([1, 2, None, 4], dtype="Int64"))]
)
dataframe = pandas.DataFrame(df_data, columns=df_data.keys())
load_job = Config.CLIENT.load_table_from_dataframe(dataframe, table_id)
load_job.result()
table = Config.CLIENT.get_table(table_id)
self.assertEqual(tuple(table.schema), (bigquery.SchemaField("x", "INTEGER"),))
self.assertEqual(table.num_rows, 4)

@unittest.skipIf(
pandas is None or PANDAS_INSTALLED_VERSION < PANDAS_MINIMUM_VERSION,
"Only `pandas version >=1.0.0` is supported",
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema(self):
"""Test that a DataFrame containing column with None-type values and int64 datatype
can be uploaded without specifying a schema.

https://github.com/googleapis/python-bigquery/issues/22
"""

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_dataframe_w_nullable_int64_datatype".format(
Config.CLIENT.project, dataset_id
)
df_data = collections.OrderedDict(
[("x", pandas.Series([1, 2, None, 4], dtype="Int64"))]
)
dataframe = pandas.DataFrame(df_data, columns=df_data.keys())
load_job = Config.CLIENT.load_table_from_dataframe(dataframe, table_id)
load_job.result()
table = Config.CLIENT.get_table(table_id)
self.assertEqual(tuple(table.schema), (bigquery.SchemaField("x", "INTEGER"),))
self.assertEqual(table.num_rows, 4)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nulls(self):
Expand Down
96 changes: 96 additions & 0 deletions tests/unit/test_client.py
Expand Up @@ -30,6 +30,7 @@
from six.moves import http_client
import pytest
import pytz
import pkg_resources

try:
import fastparquet
Expand All @@ -56,6 +57,9 @@
bigquery_storage_v1beta1 = None
from tests.unit.helpers import make_connection

PANDAS_MINIUM_VERSION = pkg_resources.parse_version("1.0.0")
PANDAS_INSTALLED_VERSION = pkg_resources.get_distribution("pandas").parsed_version


def _make_credentials():
import google.auth.credentials
Expand Down Expand Up @@ -6973,6 +6977,98 @@ def test_load_table_from_dataframe_no_schema_warning_wo_pyarrow(self):
]
assert matches, "A missing schema deprecation warning was not raised."

@unittest.skipIf(
pandas is None or PANDAS_INSTALLED_VERSION < PANDAS_MINIUM_VERSION,
"Only `pandas version >=1.0.0` supported",
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nullable_int64_datatype(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField

client = self._make_client()
dataframe = pandas.DataFrame({"x": [1, 2, None, 4]}, dtype="Int64")
load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)

get_table_patch = mock.patch(
"google.cloud.bigquery.client.Client.get_table",
autospec=True,
return_value=mock.Mock(schema=[SchemaField("x", "INT64", "NULLABLE")]),
)

with load_patch as load_table_from_file, get_table_patch:
client.load_table_from_dataframe(
dataframe, self.TABLE_REF, location=self.LOCATION
)

load_table_from_file.assert_called_once_with(
client,
mock.ANY,
self.TABLE_REF,
num_retries=_DEFAULT_NUM_RETRIES,
rewind=True,
job_id=mock.ANY,
job_id_prefix=None,
location=self.LOCATION,
project=None,
job_config=mock.ANY,
)

sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert sent_config.source_format == job.SourceFormat.PARQUET
assert tuple(sent_config.schema) == (
SchemaField("x", "INT64", "NULLABLE", None),
)

@unittest.skipIf(
pandas is None or PANDAS_INSTALLED_VERSION < PANDAS_MINIUM_VERSION,
"Only `pandas version >=1.0.0` supported",
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField

client = self._make_client()
dataframe = pandas.DataFrame({"x": [1, 2, None, 4]}, dtype="Int64")
load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)

get_table_patch = mock.patch(
"google.cloud.bigquery.client.Client.get_table",
autospec=True,
side_effect=google.api_core.exceptions.NotFound("Table not found"),
)

with load_patch as load_table_from_file, get_table_patch:
client.load_table_from_dataframe(
dataframe, self.TABLE_REF, location=self.LOCATION
)

load_table_from_file.assert_called_once_with(
client,
mock.ANY,
self.TABLE_REF,
num_retries=_DEFAULT_NUM_RETRIES,
rewind=True,
job_id=mock.ANY,
job_id_prefix=None,
location=self.LOCATION,
project=None,
job_config=mock.ANY,
)

sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert sent_config.source_format == job.SourceFormat.PARQUET
assert tuple(sent_config.schema) == (
SchemaField("x", "INT64", "NULLABLE", None),
)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_struct_fields_error(self):
Expand Down