diff --git a/google/cloud/bigquery_storage_v1/reader.py b/google/cloud/bigquery_storage_v1/reader.py index 9b6e3e6b..09b0d99c 100644 --- a/google/cloud/bigquery_storage_v1/reader.py +++ b/google/cloud/bigquery_storage_v1/reader.py @@ -287,7 +287,13 @@ def to_arrow(self): record_batches = [] for page in self.pages: record_batches.append(page.to_arrow()) - return pyarrow.Table.from_batches(record_batches) + + if record_batches: + return pyarrow.Table.from_batches(record_batches) + + # No data, return an empty Table. + self._stream_parser._parse_arrow_schema() + return pyarrow.Table.from_batches([], schema=self._stream_parser._schema) def to_dataframe(self, dtypes=None): """Create a :class:`pandas.DataFrame` of all rows in the stream. @@ -323,6 +329,7 @@ def to_dataframe(self, dtypes=None): # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is # usually no-copy. schema_type = self._read_session.WhichOneof("schema") + if schema_type == "arrow_schema": record_batch = self.to_arrow() df = record_batch.to_pandas() @@ -330,10 +337,58 @@ def to_dataframe(self, dtypes=None): df[column] = pandas.Series(df[column], dtype=dtypes[column]) return df - frames = [] - for page in self.pages: - frames.append(page.to_dataframe(dtypes=dtypes)) - return pandas.concat(frames) + frames = [page.to_dataframe(dtypes=dtypes) for page in self.pages] + + if frames: + return pandas.concat(frames) + + # No data, construct an empty dataframe with columns matching the schema. + # The result should be consistent with what an empty ARROW stream would produce. + self._stream_parser._parse_avro_schema() + schema = self._stream_parser._avro_schema_json + + column_dtypes = self._dtypes_from_avro(schema["fields"]) + column_dtypes.update(dtypes) + + df = pandas.DataFrame(columns=column_dtypes.keys()) + for column in df: + df[column] = pandas.Series([], dtype=column_dtypes[column]) + + return df + + def _dtypes_from_avro(self, avro_fields): + """Determine Pandas dtypes for columns in Avro schema. + + Args: + avro_fields (Iterable[Mapping[str, Any]]): + Avro fields' metadata. + + Returns: + colelctions.OrderedDict[str, str]: + Column names with their corresponding Pandas dtypes. + """ + result = collections.OrderedDict() + + type_map = {"long": "int64", "double": "float64", "boolean": "bool"} + + for field_info in avro_fields: + # If a type is an union of multiple types, pick the first type + # that is not "null". + if isinstance(field_info["type"], list): + type_info = next(item for item in field_info["type"] if item != "null") + + if isinstance(type_info, six.string_types): + field_dtype = type_map.get(type_info, "object") + else: + logical_type = type_info.get("logicalType") + if logical_type == "timestamp-micros": + field_dtype = "datetime64[ns, UTC]" + else: + field_dtype = "object" + + result[field_info["name"]] = field_dtype + + return result class ReadRowsPage(object): diff --git a/tests/unit/test_reader_v1.py b/tests/unit/test_reader_v1.py index 24e5dd4e..febc872d 100644 --- a/tests/unit/test_reader_v1.py +++ b/tests/unit/test_reader_v1.py @@ -645,6 +645,92 @@ def test_to_dataframe_w_dtypes_arrow(class_under_test): ) +def test_to_dataframe_empty_w_scalars_avro(class_under_test): + avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS) + read_session = _generate_avro_read_session(avro_schema) + avro_blocks = _bq_to_avro_blocks([], avro_schema) + reader = class_under_test(avro_blocks, mock_client, "", 0, {}) + + got = reader.to_dataframe(read_session) + + expected = pandas.DataFrame(columns=SCALAR_COLUMN_NAMES) + expected["int_col"] = expected["int_col"].astype("int64") + expected["float_col"] = expected["float_col"].astype("float64") + expected["bool_col"] = expected["bool_col"].astype("bool") + expected["ts_col"] = expected["ts_col"].astype("datetime64[ns, UTC]") + + pandas.testing.assert_frame_equal( + got.reset_index(drop=True), # reset_index to ignore row labels + expected.reset_index(drop=True), + ) + + +def test_to_dataframe_empty_w_scalars_arrow(class_under_test): + arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches([], arrow_schema) + reader = class_under_test(arrow_batches, mock_client, "", 0, {}) + + got = reader.to_dataframe(read_session) + + expected = pandas.DataFrame([], columns=SCALAR_COLUMN_NAMES) + expected["int_col"] = expected["int_col"].astype("int64") + expected["float_col"] = expected["float_col"].astype("float64") + expected["bool_col"] = expected["bool_col"].astype("bool") + expected["ts_col"] = expected["ts_col"].astype("datetime64[ns, UTC]") + + pandas.testing.assert_frame_equal( + got.reset_index(drop=True), # reset_index to ignore row labels + expected.reset_index(drop=True), + ) + + +def test_to_dataframe_empty_w_dtypes_avro(class_under_test, mock_client): + avro_schema = _bq_to_avro_schema( + [ + {"name": "bigfloat", "type": "float64"}, + {"name": "lilfloat", "type": "float64"}, + ] + ) + read_session = _generate_avro_read_session(avro_schema) + avro_blocks = _bq_to_avro_blocks([], avro_schema) + reader = class_under_test(avro_blocks, mock_client, "", 0, {}) + + got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"}) + + expected = pandas.DataFrame([], columns=["bigfloat", "lilfloat"]) + expected["bigfloat"] = expected["bigfloat"].astype("float64") + expected["lilfloat"] = expected["lilfloat"].astype("float16") + + pandas.testing.assert_frame_equal( + got.reset_index(drop=True), # reset_index to ignore row labels + expected.reset_index(drop=True), + ) + + +def test_to_dataframe_empty_w_dtypes_arrow(class_under_test, mock_client): + arrow_schema = _bq_to_arrow_schema( + [ + {"name": "bigfloat", "type": "float64"}, + {"name": "lilfloat", "type": "float64"}, + ] + ) + read_session = _generate_arrow_read_session(arrow_schema) + arrow_batches = _bq_to_arrow_batches([], arrow_schema) + reader = class_under_test(arrow_batches, mock_client, "", 0, {}) + + got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"}) + + expected = pandas.DataFrame([], columns=["bigfloat", "lilfloat"]) + expected["bigfloat"] = expected["bigfloat"].astype("float64") + expected["lilfloat"] = expected["lilfloat"].astype("float16") + + pandas.testing.assert_frame_equal( + got.reset_index(drop=True), # reset_index to ignore row labels + expected.reset_index(drop=True), + ) + + def test_to_dataframe_by_page(class_under_test, mock_client): bq_columns = [ {"name": "int_col", "type": "int64"},