Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle consuming streams with no data #29

Merged
merged 1 commit into from Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 60 additions & 5 deletions google/cloud/bigquery_storage_v1/reader.py
Expand Up @@ -287,7 +287,13 @@ def to_arrow(self):
record_batches = []
for page in self.pages:
record_batches.append(page.to_arrow())
return pyarrow.Table.from_batches(record_batches)

if record_batches:
return pyarrow.Table.from_batches(record_batches)

# No data, return an empty Table.
self._stream_parser._parse_arrow_schema()
return pyarrow.Table.from_batches([], schema=self._stream_parser._schema)

def to_dataframe(self, dtypes=None):
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
Expand Down Expand Up @@ -323,17 +329,66 @@ def to_dataframe(self, dtypes=None):
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
schema_type = self._read_session.WhichOneof("schema")

if schema_type == "arrow_schema":
record_batch = self.to_arrow()
df = record_batch.to_pandas()
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df

frames = []
for page in self.pages:
frames.append(page.to_dataframe(dtypes=dtypes))
return pandas.concat(frames)
frames = [page.to_dataframe(dtypes=dtypes) for page in self.pages]

if frames:
return pandas.concat(frames)

# No data, construct an empty dataframe with columns matching the schema.
# The result should be consistent with what an empty ARROW stream would produce.
self._stream_parser._parse_avro_schema()
schema = self._stream_parser._avro_schema_json

column_dtypes = self._dtypes_from_avro(schema["fields"])
column_dtypes.update(dtypes)

df = pandas.DataFrame(columns=column_dtypes.keys())
for column in df:
df[column] = pandas.Series([], dtype=column_dtypes[column])

return df

def _dtypes_from_avro(self, avro_fields):
"""Determine Pandas dtypes for columns in Avro schema.

Args:
avro_fields (Iterable[Mapping[str, Any]]):
Avro fields' metadata.

Returns:
colelctions.OrderedDict[str, str]:
Column names with their corresponding Pandas dtypes.
"""
result = collections.OrderedDict()

type_map = {"long": "int64", "double": "float64", "boolean": "bool"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine for this change, but I wonder if we should consider consolidating all our various type/schema conversion code in the storage library and bigquery. Is there demand outside of our own usages (e.g. in other storage APIs) that we should consider moving this into a more central dependency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in some libs working closely with the BigQuery API? cc: @tswast

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to have a look at it in the near future. It might be worth to wait for all the pending fixes and dependency version updates, though, to see how of that "lipstick" logic for types is still needed.


for field_info in avro_fields:
# If a type is an union of multiple types, pick the first type
# that is not "null".
if isinstance(field_info["type"], list):
type_info = next(item for item in field_info["type"] if item != "null")

if isinstance(type_info, six.string_types):
field_dtype = type_map.get(type_info, "object")
else:
logical_type = type_info.get("logicalType")
if logical_type == "timestamp-micros":
field_dtype = "datetime64[ns, UTC]"
else:
field_dtype = "object"

result[field_info["name"]] = field_dtype

return result


class ReadRowsPage(object):
Expand Down
86 changes: 86 additions & 0 deletions tests/unit/test_reader_v1.py
Expand Up @@ -645,6 +645,92 @@ def test_to_dataframe_w_dtypes_arrow(class_under_test):
)


def test_to_dataframe_empty_w_scalars_avro(class_under_test):
avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS)
read_session = _generate_avro_read_session(avro_schema)
avro_blocks = _bq_to_avro_blocks([], avro_schema)
reader = class_under_test(avro_blocks, mock_client, "", 0, {})

got = reader.to_dataframe(read_session)

expected = pandas.DataFrame(columns=SCALAR_COLUMN_NAMES)
expected["int_col"] = expected["int_col"].astype("int64")
expected["float_col"] = expected["float_col"].astype("float64")
expected["bool_col"] = expected["bool_col"].astype("bool")
expected["ts_col"] = expected["ts_col"].astype("datetime64[ns, UTC]")

pandas.testing.assert_frame_equal(
got.reset_index(drop=True), # reset_index to ignore row labels
expected.reset_index(drop=True),
)


def test_to_dataframe_empty_w_scalars_arrow(class_under_test):
arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS)
read_session = _generate_arrow_read_session(arrow_schema)
arrow_batches = _bq_to_arrow_batches([], arrow_schema)
reader = class_under_test(arrow_batches, mock_client, "", 0, {})

got = reader.to_dataframe(read_session)

expected = pandas.DataFrame([], columns=SCALAR_COLUMN_NAMES)
expected["int_col"] = expected["int_col"].astype("int64")
expected["float_col"] = expected["float_col"].astype("float64")
expected["bool_col"] = expected["bool_col"].astype("bool")
expected["ts_col"] = expected["ts_col"].astype("datetime64[ns, UTC]")

pandas.testing.assert_frame_equal(
got.reset_index(drop=True), # reset_index to ignore row labels
expected.reset_index(drop=True),
)


def test_to_dataframe_empty_w_dtypes_avro(class_under_test, mock_client):
avro_schema = _bq_to_avro_schema(
[
{"name": "bigfloat", "type": "float64"},
{"name": "lilfloat", "type": "float64"},
]
)
read_session = _generate_avro_read_session(avro_schema)
avro_blocks = _bq_to_avro_blocks([], avro_schema)
reader = class_under_test(avro_blocks, mock_client, "", 0, {})

got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"})

expected = pandas.DataFrame([], columns=["bigfloat", "lilfloat"])
expected["bigfloat"] = expected["bigfloat"].astype("float64")
expected["lilfloat"] = expected["lilfloat"].astype("float16")

pandas.testing.assert_frame_equal(
got.reset_index(drop=True), # reset_index to ignore row labels
expected.reset_index(drop=True),
)


def test_to_dataframe_empty_w_dtypes_arrow(class_under_test, mock_client):
arrow_schema = _bq_to_arrow_schema(
[
{"name": "bigfloat", "type": "float64"},
{"name": "lilfloat", "type": "float64"},
]
)
read_session = _generate_arrow_read_session(arrow_schema)
arrow_batches = _bq_to_arrow_batches([], arrow_schema)
reader = class_under_test(arrow_batches, mock_client, "", 0, {})

got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"})

expected = pandas.DataFrame([], columns=["bigfloat", "lilfloat"])
expected["bigfloat"] = expected["bigfloat"].astype("float64")
expected["lilfloat"] = expected["lilfloat"].astype("float16")

pandas.testing.assert_frame_equal(
got.reset_index(drop=True), # reset_index to ignore row labels
expected.reset_index(drop=True),
)


def test_to_dataframe_by_page(class_under_test, mock_client):
bq_columns = [
{"name": "int_col", "type": "int64"},
Expand Down