Skip to content

Commit

Permalink
fix: handle consuming streams with no data (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Jun 4, 2020
1 parent 77b373b commit 56d1b1f
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 5 deletions.
65 changes: 60 additions & 5 deletions google/cloud/bigquery_storage_v1/reader.py
Expand Up @@ -287,7 +287,13 @@ def to_arrow(self):
record_batches = []
for page in self.pages:
record_batches.append(page.to_arrow())
return pyarrow.Table.from_batches(record_batches)

if record_batches:
return pyarrow.Table.from_batches(record_batches)

# No data, return an empty Table.
self._stream_parser._parse_arrow_schema()
return pyarrow.Table.from_batches([], schema=self._stream_parser._schema)

def to_dataframe(self, dtypes=None):
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
Expand Down Expand Up @@ -323,17 +329,66 @@ def to_dataframe(self, dtypes=None):
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
schema_type = self._read_session.WhichOneof("schema")

if schema_type == "arrow_schema":
record_batch = self.to_arrow()
df = record_batch.to_pandas()
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df

frames = []
for page in self.pages:
frames.append(page.to_dataframe(dtypes=dtypes))
return pandas.concat(frames)
frames = [page.to_dataframe(dtypes=dtypes) for page in self.pages]

if frames:
return pandas.concat(frames)

# No data, construct an empty dataframe with columns matching the schema.
# The result should be consistent with what an empty ARROW stream would produce.
self._stream_parser._parse_avro_schema()
schema = self._stream_parser._avro_schema_json

column_dtypes = self._dtypes_from_avro(schema["fields"])
column_dtypes.update(dtypes)

df = pandas.DataFrame(columns=column_dtypes.keys())
for column in df:
df[column] = pandas.Series([], dtype=column_dtypes[column])

return df

def _dtypes_from_avro(self, avro_fields):
"""Determine Pandas dtypes for columns in Avro schema.
Args:
avro_fields (Iterable[Mapping[str, Any]]):
Avro fields' metadata.
Returns:
colelctions.OrderedDict[str, str]:
Column names with their corresponding Pandas dtypes.
"""
result = collections.OrderedDict()

type_map = {"long": "int64", "double": "float64", "boolean": "bool"}

for field_info in avro_fields:
# If a type is an union of multiple types, pick the first type
# that is not "null".
if isinstance(field_info["type"], list):
type_info = next(item for item in field_info["type"] if item != "null")

if isinstance(type_info, six.string_types):
field_dtype = type_map.get(type_info, "object")
else:
logical_type = type_info.get("logicalType")
if logical_type == "timestamp-micros":
field_dtype = "datetime64[ns, UTC]"
else:
field_dtype = "object"

result[field_info["name"]] = field_dtype

return result


class ReadRowsPage(object):
Expand Down
86 changes: 86 additions & 0 deletions tests/unit/test_reader_v1.py
Expand Up @@ -645,6 +645,92 @@ def test_to_dataframe_w_dtypes_arrow(class_under_test):
)


def test_to_dataframe_empty_w_scalars_avro(class_under_test):
avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS)
read_session = _generate_avro_read_session(avro_schema)
avro_blocks = _bq_to_avro_blocks([], avro_schema)
reader = class_under_test(avro_blocks, mock_client, "", 0, {})

got = reader.to_dataframe(read_session)

expected = pandas.DataFrame(columns=SCALAR_COLUMN_NAMES)
expected["int_col"] = expected["int_col"].astype("int64")
expected["float_col"] = expected["float_col"].astype("float64")
expected["bool_col"] = expected["bool_col"].astype("bool")
expected["ts_col"] = expected["ts_col"].astype("datetime64[ns, UTC]")

pandas.testing.assert_frame_equal(
got.reset_index(drop=True), # reset_index to ignore row labels
expected.reset_index(drop=True),
)


def test_to_dataframe_empty_w_scalars_arrow(class_under_test):
arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS)
read_session = _generate_arrow_read_session(arrow_schema)
arrow_batches = _bq_to_arrow_batches([], arrow_schema)
reader = class_under_test(arrow_batches, mock_client, "", 0, {})

got = reader.to_dataframe(read_session)

expected = pandas.DataFrame([], columns=SCALAR_COLUMN_NAMES)
expected["int_col"] = expected["int_col"].astype("int64")
expected["float_col"] = expected["float_col"].astype("float64")
expected["bool_col"] = expected["bool_col"].astype("bool")
expected["ts_col"] = expected["ts_col"].astype("datetime64[ns, UTC]")

pandas.testing.assert_frame_equal(
got.reset_index(drop=True), # reset_index to ignore row labels
expected.reset_index(drop=True),
)


def test_to_dataframe_empty_w_dtypes_avro(class_under_test, mock_client):
avro_schema = _bq_to_avro_schema(
[
{"name": "bigfloat", "type": "float64"},
{"name": "lilfloat", "type": "float64"},
]
)
read_session = _generate_avro_read_session(avro_schema)
avro_blocks = _bq_to_avro_blocks([], avro_schema)
reader = class_under_test(avro_blocks, mock_client, "", 0, {})

got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"})

expected = pandas.DataFrame([], columns=["bigfloat", "lilfloat"])
expected["bigfloat"] = expected["bigfloat"].astype("float64")
expected["lilfloat"] = expected["lilfloat"].astype("float16")

pandas.testing.assert_frame_equal(
got.reset_index(drop=True), # reset_index to ignore row labels
expected.reset_index(drop=True),
)


def test_to_dataframe_empty_w_dtypes_arrow(class_under_test, mock_client):
arrow_schema = _bq_to_arrow_schema(
[
{"name": "bigfloat", "type": "float64"},
{"name": "lilfloat", "type": "float64"},
]
)
read_session = _generate_arrow_read_session(arrow_schema)
arrow_batches = _bq_to_arrow_batches([], arrow_schema)
reader = class_under_test(arrow_batches, mock_client, "", 0, {})

got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"})

expected = pandas.DataFrame([], columns=["bigfloat", "lilfloat"])
expected["bigfloat"] = expected["bigfloat"].astype("float64")
expected["lilfloat"] = expected["lilfloat"].astype("float16")

pandas.testing.assert_frame_equal(
got.reset_index(drop=True), # reset_index to ignore row labels
expected.reset_index(drop=True),
)


def test_to_dataframe_by_page(class_under_test, mock_client):
bq_columns = [
{"name": "int_col", "type": "int64"},
Expand Down

0 comments on commit 56d1b1f

Please sign in to comment.