Skip to content

Commit

Permalink
feat(bigquery): unit and system test for dataframe with int column wi…
Browse files Browse the repository at this point in the history
…th Nan values (#39)

* feat(bigquery): add unit and system tests for int columns

* feat(bigquery): cosmetic changes

* feat(bigquery): use pkg_resources for comparison

* feat(bigquery): nit
  • Loading branch information
HemangChothani committed May 13, 2020
1 parent 18eb9e8 commit 5fd840e
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 0 deletions.
64 changes: 64 additions & 0 deletions tests/system.py
Expand Up @@ -31,6 +31,7 @@
import psutil
import pytest
import pytz
import pkg_resources

try:
from google.cloud import bigquery_storage_v1beta1
Expand Down Expand Up @@ -125,6 +126,9 @@
(TooManyRequests, InternalServerError, ServiceUnavailable)
)

PANDAS_MINIMUM_VERSION = pkg_resources.parse_version("1.0.0")
PANDAS_INSTALLED_VERSION = pkg_resources.get_distribution("pandas").parsed_version


def _has_rows(result):
return len(result) > 0
Expand Down Expand Up @@ -742,6 +746,66 @@ def test_load_table_from_dataframe_w_automatic_schema(self):
)
self.assertEqual(table.num_rows, 3)

@unittest.skipIf(
pandas is None or PANDAS_INSTALLED_VERSION < PANDAS_MINIMUM_VERSION,
"Only `pandas version >=1.0.0` is supported",
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nullable_int64_datatype(self):
"""Test that a DataFrame containing column with None-type values and int64 datatype
can be uploaded if a BigQuery schema is specified.
https://github.com/googleapis/python-bigquery/issues/22
"""

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_dataframe_w_nullable_int64_datatype".format(
Config.CLIENT.project, dataset_id
)
table_schema = (bigquery.SchemaField("x", "INTEGER", mode="NULLABLE"),)
table = retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

df_data = collections.OrderedDict(
[("x", pandas.Series([1, 2, None, 4], dtype="Int64"))]
)
dataframe = pandas.DataFrame(df_data, columns=df_data.keys())
load_job = Config.CLIENT.load_table_from_dataframe(dataframe, table_id)
load_job.result()
table = Config.CLIENT.get_table(table_id)
self.assertEqual(tuple(table.schema), (bigquery.SchemaField("x", "INTEGER"),))
self.assertEqual(table.num_rows, 4)

@unittest.skipIf(
pandas is None or PANDAS_INSTALLED_VERSION < PANDAS_MINIMUM_VERSION,
"Only `pandas version >=1.0.0` is supported",
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema(self):
"""Test that a DataFrame containing column with None-type values and int64 datatype
can be uploaded without specifying a schema.
https://github.com/googleapis/python-bigquery/issues/22
"""

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_dataframe_w_nullable_int64_datatype".format(
Config.CLIENT.project, dataset_id
)
df_data = collections.OrderedDict(
[("x", pandas.Series([1, 2, None, 4], dtype="Int64"))]
)
dataframe = pandas.DataFrame(df_data, columns=df_data.keys())
load_job = Config.CLIENT.load_table_from_dataframe(dataframe, table_id)
load_job.result()
table = Config.CLIENT.get_table(table_id)
self.assertEqual(tuple(table.schema), (bigquery.SchemaField("x", "INTEGER"),))
self.assertEqual(table.num_rows, 4)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nulls(self):
Expand Down
96 changes: 96 additions & 0 deletions tests/unit/test_client.py
Expand Up @@ -30,6 +30,7 @@
from six.moves import http_client
import pytest
import pytz
import pkg_resources

try:
import fastparquet
Expand All @@ -56,6 +57,9 @@
bigquery_storage_v1beta1 = None
from tests.unit.helpers import make_connection

PANDAS_MINIUM_VERSION = pkg_resources.parse_version("1.0.0")
PANDAS_INSTALLED_VERSION = pkg_resources.get_distribution("pandas").parsed_version


def _make_credentials():
import google.auth.credentials
Expand Down Expand Up @@ -6973,6 +6977,98 @@ def test_load_table_from_dataframe_no_schema_warning_wo_pyarrow(self):
]
assert matches, "A missing schema deprecation warning was not raised."

@unittest.skipIf(
pandas is None or PANDAS_INSTALLED_VERSION < PANDAS_MINIUM_VERSION,
"Only `pandas version >=1.0.0` supported",
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nullable_int64_datatype(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField

client = self._make_client()
dataframe = pandas.DataFrame({"x": [1, 2, None, 4]}, dtype="Int64")
load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)

get_table_patch = mock.patch(
"google.cloud.bigquery.client.Client.get_table",
autospec=True,
return_value=mock.Mock(schema=[SchemaField("x", "INT64", "NULLABLE")]),
)

with load_patch as load_table_from_file, get_table_patch:
client.load_table_from_dataframe(
dataframe, self.TABLE_REF, location=self.LOCATION
)

load_table_from_file.assert_called_once_with(
client,
mock.ANY,
self.TABLE_REF,
num_retries=_DEFAULT_NUM_RETRIES,
rewind=True,
job_id=mock.ANY,
job_id_prefix=None,
location=self.LOCATION,
project=None,
job_config=mock.ANY,
)

sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert sent_config.source_format == job.SourceFormat.PARQUET
assert tuple(sent_config.schema) == (
SchemaField("x", "INT64", "NULLABLE", None),
)

@unittest.skipIf(
pandas is None or PANDAS_INSTALLED_VERSION < PANDAS_MINIUM_VERSION,
"Only `pandas version >=1.0.0` supported",
)
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField

client = self._make_client()
dataframe = pandas.DataFrame({"x": [1, 2, None, 4]}, dtype="Int64")
load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)

get_table_patch = mock.patch(
"google.cloud.bigquery.client.Client.get_table",
autospec=True,
side_effect=google.api_core.exceptions.NotFound("Table not found"),
)

with load_patch as load_table_from_file, get_table_patch:
client.load_table_from_dataframe(
dataframe, self.TABLE_REF, location=self.LOCATION
)

load_table_from_file.assert_called_once_with(
client,
mock.ANY,
self.TABLE_REF,
num_retries=_DEFAULT_NUM_RETRIES,
rewind=True,
job_id=mock.ANY,
job_id_prefix=None,
location=self.LOCATION,
project=None,
job_config=mock.ANY,
)

sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert sent_config.source_format == job.SourceFormat.PARQUET
assert tuple(sent_config.schema) == (
SchemaField("x", "INT64", "NULLABLE", None),
)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_struct_fields_error(self):
Expand Down

0 comments on commit 5fd840e

Please sign in to comment.