From 801e4c0574b7e421aa3a28cafec6fd6bcce940dd Mon Sep 17 00:00:00 2001 From: Carlos de la Guardia Date: Mon, 12 Oct 2020 17:10:07 -0500 Subject: [PATCH] deps: require pyarrow for pandas support (#314) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [X] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [X] Ensure the tests and linter pass - [X] Code coverage does not decrease (if any source code was changed) - [X] Appropriate docs were updated (if necessary) Fixes #265 🦕 --- docs/snippets.py | 4 - google/cloud/bigquery/__init__.py | 3 - google/cloud/bigquery/client.py | 43 ++---- google/cloud/bigquery/exceptions.py | 17 -- google/cloud/bigquery/table.py | 92 ++++------- noxfile.py | 5 +- setup.py | 12 +- testing/constraints-3.6.txt | 1 - tests/unit/test__pandas_helpers.py | 8 + tests/unit/test_client.py | 144 +---------------- tests/unit/test_job.py | 39 ----- tests/unit/test_table.py | 232 ++++++---------------------- 12 files changed, 97 insertions(+), 503 deletions(-) delete mode 100644 google/cloud/bigquery/exceptions.py diff --git a/docs/snippets.py b/docs/snippets.py index bc6b58020..8c106e63d 100644 --- a/docs/snippets.py +++ b/docs/snippets.py @@ -26,10 +26,6 @@ import pytest -try: - import fastparquet -except (ImportError, AttributeError): - fastparquet = None try: import pandas except (ImportError, AttributeError): diff --git a/google/cloud/bigquery/__init__.py b/google/cloud/bigquery/__init__.py index e83e70084..b8d1cc4d7 100644 --- a/google/cloud/bigquery/__init__.py +++ b/google/cloud/bigquery/__init__.py @@ -38,7 +38,6 @@ from google.cloud.bigquery.dataset import DatasetReference from google.cloud.bigquery import enums from google.cloud.bigquery.enums import StandardSqlDataTypes -from google.cloud.bigquery.exceptions import PyarrowMissingWarning from google.cloud.bigquery.external_config import ExternalConfig from google.cloud.bigquery.external_config import BigtableOptions from google.cloud.bigquery.external_config import BigtableColumnFamily @@ -143,8 +142,6 @@ "WriteDisposition", # EncryptionConfiguration "EncryptionConfiguration", - # Errors and warnings - "PyarrowMissingWarning", ] diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index fcb18385d..2afffab80 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -58,7 +58,6 @@ from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.dataset import DatasetListItem from google.cloud.bigquery.dataset import DatasetReference -from google.cloud.bigquery.exceptions import PyarrowMissingWarning from google.cloud.bigquery.opentelemetry_tracing import create_span from google.cloud.bigquery import job from google.cloud.bigquery.model import Model @@ -2135,29 +2134,31 @@ def load_table_from_dataframe( [Beta] The compression method to use if intermittently serializing ``dataframe`` to a parquet file. - If ``pyarrow`` and job config schema are used, the argument - is directly passed as the ``compression`` argument to the - underlying ``pyarrow.parquet.write_table()`` method (the - default value "snappy" gets converted to uppercase). + The argument is directly passed as the ``compression`` + argument to the underlying ``pyarrow.parquet.write_table()`` + method (the default value "snappy" gets converted to uppercase). https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table - If either ``pyarrow`` or job config schema are missing, the - argument is directly passed as the ``compression`` argument - to the underlying ``DataFrame.to_parquet()`` method. + If the job config schema is missing, the argument is directly + passed as the ``compression`` argument to the underlying + ``DataFrame.to_parquet()`` method. https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet Returns: google.cloud.bigquery.job.LoadJob: A new load job. Raises: - ImportError: + ValueError: If a usable parquet engine cannot be found. This method - requires :mod:`pyarrow` or :mod:`fastparquet` to be - installed. + requires :mod:`pyarrow` to be installed. TypeError: If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ + if pyarrow is None: + # pyarrow is now the only supported parquet engine. + raise ValueError("This method requires pyarrow to be installed") + job_id = _make_job_id(job_id, job_id_prefix) if job_config: @@ -2222,7 +2223,7 @@ def load_table_from_dataframe( os.close(tmpfd) try: - if pyarrow and job_config.schema: + if job_config.schema: if parquet_compression == "snappy": # adjust the default value parquet_compression = parquet_compression.upper() @@ -2233,24 +2234,6 @@ def load_table_from_dataframe( parquet_compression=parquet_compression, ) else: - if not pyarrow: - warnings.warn( - "Loading dataframe data without pyarrow installed is " - "deprecated and will become unsupported in the future. " - "Please install the pyarrow package.", - PyarrowMissingWarning, - stacklevel=2, - ) - - if job_config.schema: - warnings.warn( - "job_config.schema is set, but not used to assist in " - "identifying correct types for data serialization. " - "Please install the pyarrow package.", - PendingDeprecationWarning, - stacklevel=2, - ) - dataframe.to_parquet(tmppath, compression=parquet_compression) with open(tmppath, "rb") as parquet_file: diff --git a/google/cloud/bigquery/exceptions.py b/google/cloud/bigquery/exceptions.py deleted file mode 100644 index 93490ef97..000000000 --- a/google/cloud/bigquery/exceptions.py +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -class PyarrowMissingWarning(DeprecationWarning): - pass diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index a72bacb74..01e8815da 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -50,7 +50,6 @@ from google.cloud.bigquery.schema import _build_schema_resource from google.cloud.bigquery.schema import _parse_schema_resource from google.cloud.bigquery.schema import _to_schema_fields -from google.cloud.bigquery.exceptions import PyarrowMissingWarning from google.cloud.bigquery.external_config import ExternalConfig from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration @@ -1679,75 +1678,38 @@ def to_dataframe( create_bqstorage_client = False bqstorage_client = None - if pyarrow is not None: - # If pyarrow is available, calling to_arrow, then converting to a - # pandas dataframe is about 2x faster. This is because pandas.concat is - # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is - # usually no-copy. - record_batch = self.to_arrow( - progress_bar_type=progress_bar_type, - bqstorage_client=bqstorage_client, - create_bqstorage_client=create_bqstorage_client, - ) + record_batch = self.to_arrow( + progress_bar_type=progress_bar_type, + bqstorage_client=bqstorage_client, + create_bqstorage_client=create_bqstorage_client, + ) + + # When converting timestamp values to nanosecond precision, the result + # can be out of pyarrow bounds. To avoid the error when converting to + # Pandas, we set the timestamp_as_object parameter to True, if necessary. + types_to_check = { + pyarrow.timestamp("us"), + pyarrow.timestamp("us", tz=pytz.UTC), + } - # When converting timestamp values to nanosecond precision, the result - # can be out of pyarrow bounds. To avoid the error when converting to - # Pandas, we set the timestamp_as_object parameter to True, if necessary. - types_to_check = { - pyarrow.timestamp("us"), - pyarrow.timestamp("us", tz=pytz.UTC), - } - - for column in record_batch: - if column.type in types_to_check: - try: - column.cast("timestamp[ns]") - except pyarrow.lib.ArrowInvalid: - timestamp_as_object = True - break - else: - timestamp_as_object = False - - extra_kwargs = {"timestamp_as_object": timestamp_as_object} - - df = record_batch.to_pandas(date_as_object=date_as_object, **extra_kwargs) - - for column in dtypes: - df[column] = pandas.Series(df[column], dtype=dtypes[column]) - return df + for column in record_batch: + if column.type in types_to_check: + try: + column.cast("timestamp[ns]") + except pyarrow.lib.ArrowInvalid: + timestamp_as_object = True + break else: - warnings.warn( - "Converting to a dataframe without pyarrow installed is " - "often slower and will become unsupported in the future. " - "Please install the pyarrow package.", - PyarrowMissingWarning, - stacklevel=2, - ) + timestamp_as_object = False - # The bqstorage_client is only used if pyarrow is available, so the - # rest of this method only needs to account for tabledata.list. - progress_bar = self._get_progress_bar(progress_bar_type) + extra_kwargs = {"timestamp_as_object": timestamp_as_object} - frames = [] - for frame in self.to_dataframe_iterable(dtypes=dtypes): - frames.append(frame) + df = record_batch.to_pandas(date_as_object=date_as_object, **extra_kwargs) - if progress_bar is not None: - # In some cases, the number of total rows is not populated - # until the first page of rows is fetched. Update the - # progress bar's total to keep an accurate count. - progress_bar.total = progress_bar.total or self.total_rows - progress_bar.update(len(frame)) - - if progress_bar is not None: - # Indicate that the download has finished. - progress_bar.close() - - # Avoid concatting an empty list. - if not frames: - column_names = [field.name for field in self._schema] - return pandas.DataFrame(columns=column_names) - return pandas.concat(frames, ignore_index=True) + for column in dtypes: + df[column] = pandas.Series(df[column], dtype=dtypes[column]) + + return df class _EmptyRowIterator(object): diff --git a/noxfile.py b/noxfile.py index 42d8f9356..db1dcffde 100644 --- a/noxfile.py +++ b/noxfile.py @@ -49,10 +49,7 @@ def default(session): constraints_path, ) - # fastparquet is not included in .[all] because, in general, it's - # redundant with pyarrow. We still want to run some unit tests with - # fastparquet serialization, though. - session.install("-e", ".[all,fastparquet]", "-c", constraints_path) + session.install("-e", ".[all]", "-c", constraints_path) session.install("ipython", "-c", constraints_path) diff --git a/setup.py b/setup.py index be7296081..abd5cef95 100644 --- a/setup.py +++ b/setup.py @@ -47,13 +47,12 @@ "grpcio >= 1.32.0, < 2.0dev", "pyarrow >= 1.0.0, < 2.0dev", ], - "pandas": ["pandas>=0.23.0"], - "pyarrow": [ + "pandas": [ + "pandas>=0.23.0", # pyarrow 1.0.0 is required for the use of timestamp_as_object keyword. "pyarrow >= 1.0.0, < 2.0dev", ], "tqdm": ["tqdm >= 4.7.4, <5.0.0dev"], - "fastparquet": ["fastparquet", "python-snappy", "llvmlite>=0.34.0"], "opentelemetry": [ "opentelemetry-api==0.9b0", "opentelemetry-sdk==0.9b0", @@ -64,13 +63,6 @@ all_extras = [] for extra in extras: - if extra in ( - # Skip fastparquet from "all" because it is redundant with pyarrow and - # creates a dependency on pre-release versions of numpy. See: - # https://github.com/googleapis/google-cloud-python/issues/8549 - "fastparquet", - ): - continue all_extras.extend(extras[extra]) extras["all"] = all_extras diff --git a/testing/constraints-3.6.txt b/testing/constraints-3.6.txt index a9f4faa92..798804941 100644 --- a/testing/constraints-3.6.txt +++ b/testing/constraints-3.6.txt @@ -1,4 +1,3 @@ -fastparquet==0.4.1 google-api-core==1.22.2 google-cloud-bigquery-storage==2.0.0 google-cloud-core==1.4.1 diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index c1073066d..bdb1c56ea 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -1329,3 +1329,11 @@ def test_download_dataframe_tabledata_list_dict_sequence_schema(module_under_tes ) ) assert result.equals(expected_result) + + with pytest.raises(StopIteration): + result = next(results_gen) + + +def test_table_data_listpage_to_dataframe_skips_stop_iteration(module_under_test): + dataframe = module_under_test._tabledata_list_page_to_dataframe([], [], {}) + assert isinstance(dataframe, pandas.DataFrame) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index f44201ab8..737c1aef7 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -32,10 +32,6 @@ import pytz import pkg_resources -try: - import fastparquet -except (ImportError, AttributeError): # pragma: NO COVER - fastparquet = None try: import pandas except (ImportError, AttributeError): # pragma: NO COVER @@ -7838,80 +7834,6 @@ def test_load_table_from_dataframe_unknown_table(self): job_config=mock.ANY, ) - @unittest.skipIf(pandas is None, "Requires `pandas`") - @unittest.skipIf(fastparquet is None, "Requires `fastparquet`") - def test_load_table_from_dataframe_no_pyarrow_warning(self): - from google.cloud.bigquery.client import PyarrowMissingWarning - - client = self._make_client() - - # Pick at least one column type that translates to Pandas dtype - # "object". A string column matches that. - records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] - dataframe = pandas.DataFrame(records) - - get_table_patch = mock.patch( - "google.cloud.bigquery.client.Client.get_table", - autospec=True, - side_effect=google.api_core.exceptions.NotFound("Table not found"), - ) - load_patch = mock.patch( - "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True - ) - pyarrow_patch = mock.patch("google.cloud.bigquery.client.pyarrow", None) - pyarrow_patch_helpers = mock.patch( - "google.cloud.bigquery._pandas_helpers.pyarrow", None - ) - catch_warnings = warnings.catch_warnings(record=True) - - with get_table_patch, load_patch, pyarrow_patch, pyarrow_patch_helpers, catch_warnings as warned: - client.load_table_from_dataframe( - dataframe, self.TABLE_REF, location=self.LOCATION - ) - - matches = [ - warning for warning in warned if warning.category is PyarrowMissingWarning - ] - assert matches, "A missing pyarrow deprecation warning was not raised." - - @unittest.skipIf(pandas is None, "Requires `pandas`") - @unittest.skipIf(fastparquet is None, "Requires `fastparquet`") - def test_load_table_from_dataframe_no_schema_warning_wo_pyarrow(self): - client = self._make_client() - - # Pick at least one column type that translates to Pandas dtype - # "object". A string column matches that. - records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] - dataframe = pandas.DataFrame(records) - - get_table_patch = mock.patch( - "google.cloud.bigquery.client.Client.get_table", - autospec=True, - side_effect=google.api_core.exceptions.NotFound("Table not found"), - ) - load_patch = mock.patch( - "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True - ) - pyarrow_patch = mock.patch("google.cloud.bigquery.client.pyarrow", None) - pyarrow_patch_helpers = mock.patch( - "google.cloud.bigquery._pandas_helpers.pyarrow", None - ) - catch_warnings = warnings.catch_warnings(record=True) - - with get_table_patch, load_patch, pyarrow_patch, pyarrow_patch_helpers, catch_warnings as warned: - client.load_table_from_dataframe( - dataframe, self.TABLE_REF, location=self.LOCATION - ) - - matches = [ - warning - for warning in warned - if warning.category in (DeprecationWarning, PendingDeprecationWarning) - and "could not be detected" in str(warning) - and "please provide a schema" in str(warning) - ] - assert matches, "A missing schema deprecation warning was not raised." - @unittest.skipIf( pandas is None or PANDAS_INSTALLED_VERSION < PANDAS_MINIUM_VERSION, "Only `pandas version >=1.0.0` supported", @@ -8182,7 +8104,6 @@ def test_load_table_from_dataframe_w_partial_schema_extra_types(self): assert "unknown_col" in message @unittest.skipIf(pandas is None, "Requires `pandas`") - @unittest.skipIf(fastparquet is None, "Requires `fastparquet`") def test_load_table_from_dataframe_w_partial_schema_missing_types(self): from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES from google.cloud.bigquery import job @@ -8236,55 +8157,6 @@ def test_load_table_from_dataframe_w_partial_schema_missing_types(self): assert sent_config.source_format == job.SourceFormat.PARQUET assert sent_config.schema is None - @unittest.skipIf(pandas is None, "Requires `pandas`") - @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") - def test_load_table_from_dataframe_w_schema_wo_pyarrow(self): - from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES - from google.cloud.bigquery import job - from google.cloud.bigquery.schema import SchemaField - - client = self._make_client() - records = [{"name": u"Monty", "age": 100}, {"name": u"Python", "age": 60}] - dataframe = pandas.DataFrame(records, columns=["name", "age"]) - schema = (SchemaField("name", "STRING"), SchemaField("age", "INTEGER")) - job_config = job.LoadJobConfig(schema=schema) - - load_patch = mock.patch( - "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True - ) - pyarrow_patch = mock.patch("google.cloud.bigquery.client.pyarrow", None) - - with load_patch as load_table_from_file, pyarrow_patch, warnings.catch_warnings( - record=True - ) as warned: - client.load_table_from_dataframe( - dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION - ) - - assert warned # there should be at least one warning - for warning in warned: - assert "pyarrow" in str(warning) - assert issubclass( - warning.category, (DeprecationWarning, PendingDeprecationWarning) - ) - - load_table_from_file.assert_called_once_with( - client, - mock.ANY, - self.TABLE_REF, - num_retries=_DEFAULT_NUM_RETRIES, - rewind=True, - job_id=mock.ANY, - job_id_prefix=None, - location=self.LOCATION, - project=None, - job_config=mock.ANY, - ) - - sent_config = load_table_from_file.mock_calls[0][2]["job_config"] - assert sent_config.source_format == job.SourceFormat.PARQUET - assert tuple(sent_config.schema) == schema - @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe_w_schema_arrow_custom_compression(self): @@ -8320,7 +8192,7 @@ def test_load_table_from_dataframe_w_schema_arrow_custom_compression(self): @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") - def test_load_table_from_dataframe_wo_pyarrow_custom_compression(self): + def test_load_table_from_dataframe_wo_pyarrow_raises_error(self): client = self._make_client() records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] dataframe = pandas.DataFrame(records) @@ -8338,8 +8210,8 @@ def test_load_table_from_dataframe_wo_pyarrow_custom_compression(self): dataframe, "to_parquet", wraps=dataframe.to_parquet ) - with load_patch, get_table_patch, pyarrow_patch, to_parquet_patch as to_parquet_spy: - with warnings.catch_warnings(record=True) as warned: + with load_patch, get_table_patch, pyarrow_patch, to_parquet_patch: + with pytest.raises(ValueError): client.load_table_from_dataframe( dataframe, self.TABLE_REF, @@ -8347,16 +8219,6 @@ def test_load_table_from_dataframe_wo_pyarrow_custom_compression(self): parquet_compression="gzip", ) - call_args = to_parquet_spy.call_args - assert call_args is not None - assert call_args.kwargs.get("compression") == "gzip" - - assert len(warned) == 2 - warning = warned[0] - assert "Loading dataframe data without pyarrow" in str(warning) - warning = warned[1] - assert "Please install the pyarrow package" in str(warning) - @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_load_table_from_dataframe_w_nulls(self): diff --git a/tests/unit/test_job.py b/tests/unit/test_job.py index fb042e18c..d21489616 100644 --- a/tests/unit/test_job.py +++ b/tests/unit/test_job.py @@ -5802,45 +5802,6 @@ def test_to_dataframe_column_date_dtypes(self): self.assertEqual(df.date.dtype.name, "datetime64[ns]") - @unittest.skipIf(pandas is None, "Requires `pandas`") - def test_to_dataframe_column_date_dtypes_wo_pyarrow(self): - begun_resource = self._make_resource() - query_resource = { - "jobComplete": True, - "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, - "totalRows": "1", - "schema": {"fields": [{"name": "date", "type": "DATE"}]}, - } - row_data = [ - ["1999-12-01"], - ] - rows = [{"f": [{"v": field} for field in row]} for row in row_data] - query_resource["rows"] = rows - done_resource = copy.deepcopy(begun_resource) - done_resource["status"] = {"state": "DONE"} - connection = _make_connection( - begun_resource, query_resource, done_resource, query_resource - ) - client = _make_client(project=self.PROJECT, connection=connection) - job = self._make_one(self.JOB_ID, self.QUERY, client) - - with mock.patch("google.cloud.bigquery.table.pyarrow", None): - with warnings.catch_warnings(record=True) as warned: - df = job.to_dataframe( - date_as_object=False, create_bqstorage_client=False - ) - - self.assertIsInstance(df, pandas.DataFrame) - self.assertEqual(len(df), 1) # verify the number of rows - exp_columns = [field["name"] for field in query_resource["schema"]["fields"]] - self.assertEqual(list(df), exp_columns) # verify the column names - - self.assertEqual(df.date.dtype.name, "object") - - assert len(warned) == 1 - warning = warned[0] - assert "without pyarrow" in str(warning) - @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(tqdm is None, "Requires `tqdm`") @mock.patch("tqdm.tqdm") diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index 12169658e..fe17d2852 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -2148,6 +2148,49 @@ def test_to_dataframe_iterable(self): self.assertEqual(df_2["name"][0], "Sven") self.assertEqual(df_2["age"][0], 33) + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_to_dataframe_iterable_with_dtypes(self): + from google.cloud.bigquery.schema import SchemaField + import types + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + + path = "/foo" + api_request = mock.Mock( + side_effect=[ + { + "rows": [{"f": [{"v": "Bengt"}, {"v": "32"}]}], + "pageToken": "NEXTPAGE", + }, + {"rows": [{"f": [{"v": "Sven"}, {"v": "33"}]}]}, + ] + ) + + row_iterator = self._make_one( + _mock_client(), api_request, path, schema, page_size=1, max_results=5 + ) + dfs = row_iterator.to_dataframe_iterable(dtypes={"age": "int32"}) + + self.assertIsInstance(dfs, types.GeneratorType) + + df_1 = next(dfs) + self.assertIsInstance(df_1, pandas.DataFrame) + self.assertEqual(df_1.name.dtype.name, "object") + self.assertEqual(df_1.age.dtype.name, "int32") + self.assertEqual(len(df_1), 1) # verify the number of rows + self.assertEqual( + df_1["name"][0], "Bengt" + ) # verify the first value of 'name' column + self.assertEqual(df_1["age"][0], 32) # verify the first value of 'age' column + + df_2 = next(dfs) + self.assertEqual(len(df_2), 1) # verify the number of rows + self.assertEqual(df_2["name"][0], "Sven") + self.assertEqual(df_2["age"][0], 33) + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( bigquery_storage is None, "Requires `google-cloud-bigquery-storage`" @@ -2327,38 +2370,6 @@ def test_to_dataframe_datetime_out_of_pyarrow_bounds(self): [dt.datetime(4567, 1, 1), dt.datetime(9999, 12, 31)], ) - @unittest.skipIf(pandas is None, "Requires `pandas`") - def test_to_dataframe_warning_wo_pyarrow(self): - from google.cloud.bigquery.client import PyarrowMissingWarning - from google.cloud.bigquery.schema import SchemaField - - schema = [ - SchemaField("name", "STRING", mode="REQUIRED"), - SchemaField("age", "INTEGER", mode="REQUIRED"), - ] - rows = [ - {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, - {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, - ] - path = "/foo" - api_request = mock.Mock(return_value={"rows": rows}) - row_iterator = self._make_one(_mock_client(), api_request, path, schema) - - no_pyarrow_patch = mock.patch("google.cloud.bigquery.table.pyarrow", new=None) - catch_warnings = warnings.catch_warnings(record=True) - - with no_pyarrow_patch, catch_warnings as warned: - df = row_iterator.to_dataframe() - - self.assertIsInstance(df, pandas.DataFrame) - self.assertEqual(len(df), 2) - matches = [ - warning for warning in warned if warning.category is PyarrowMissingWarning - ] - self.assertTrue( - matches, msg="A missing pyarrow deprecation warning was not raised." - ) - @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf(tqdm is None, "Requires `tqdm`") @mock.patch("tqdm.tqdm_gui") @@ -2399,50 +2410,6 @@ def test_to_dataframe_progress_bar( progress_bar_mock().close.assert_called_once() self.assertEqual(len(df), 4) - @unittest.skipIf(pandas is None, "Requires `pandas`") - @unittest.skipIf(tqdm is None, "Requires `tqdm`") - @mock.patch("tqdm.tqdm_gui") - @mock.patch("tqdm.tqdm_notebook") - @mock.patch("tqdm.tqdm") - def test_to_dataframe_progress_bar_wo_pyarrow( - self, tqdm_mock, tqdm_notebook_mock, tqdm_gui_mock - ): - from google.cloud.bigquery.schema import SchemaField - - schema = [ - SchemaField("name", "STRING", mode="REQUIRED"), - SchemaField("age", "INTEGER", mode="REQUIRED"), - ] - rows = [ - {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, - {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, - {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, - {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, - ] - path = "/foo" - api_request = mock.Mock(return_value={"rows": rows}) - - progress_bars = ( - ("tqdm", tqdm_mock), - ("tqdm_notebook", tqdm_notebook_mock), - ("tqdm_gui", tqdm_gui_mock), - ) - - for progress_bar_type, progress_bar_mock in progress_bars: - row_iterator = self._make_one(_mock_client(), api_request, path, schema) - with mock.patch("google.cloud.bigquery.table.pyarrow", None): - with warnings.catch_warnings(record=True) as warned: - df = row_iterator.to_dataframe(progress_bar_type=progress_bar_type) - - progress_bar_mock.assert_called() - progress_bar_mock().update.assert_called() - progress_bar_mock().close.assert_called_once() - self.assertEqual(len(df), 4) - - self.assertEqual(len(warned), 1) - warning = warned[0] - self.assertTrue("without pyarrow" in str(warning)) - @unittest.skipIf(pandas is None, "Requires `pandas`") @mock.patch("google.cloud.bigquery.table.tqdm", new=None) def test_to_dataframe_no_tqdm_no_progress_bar(self): @@ -2557,57 +2524,6 @@ def test_to_dataframe_w_empty_results(self): self.assertEqual(len(df), 0) # verify the number of rows self.assertEqual(list(df), ["name", "age"]) # verify the column names - @unittest.skipIf(pandas is None, "Requires `pandas`") - def test_to_dataframe_w_empty_results_wo_pyarrow(self): - from google.cloud.bigquery.schema import SchemaField - - with mock.patch("google.cloud.bigquery.table.pyarrow", None): - schema = [ - SchemaField("name", "STRING", mode="REQUIRED"), - SchemaField("age", "INTEGER", mode="REQUIRED"), - ] - api_request = mock.Mock(return_value={"rows": []}) - row_iterator = self._make_one(_mock_client(), api_request, schema=schema) - - with warnings.catch_warnings(record=True) as warned: - df = row_iterator.to_dataframe() - - self.assertIsInstance(df, pandas.DataFrame) - self.assertEqual(len(df), 0) # verify the number of rows - self.assertEqual(list(df), ["name", "age"]) # verify the column names - - self.assertEqual(len(warned), 1) - warning = warned[0] - self.assertTrue("without pyarrow" in str(warning)) - - @unittest.skipIf(pandas is None, "Requires `pandas`") - def test_to_dataframe_w_no_results_wo_pyarrow(self): - from google.cloud.bigquery.schema import SchemaField - - with mock.patch("google.cloud.bigquery.table.pyarrow", None): - schema = [ - SchemaField("name", "STRING", mode="REQUIRED"), - SchemaField("age", "INTEGER", mode="REQUIRED"), - ] - api_request = mock.Mock(return_value={"rows": []}) - row_iterator = self._make_one(_mock_client(), api_request, schema=schema) - - def empty_iterable(dtypes=None): - return [] - - row_iterator.to_dataframe_iterable = empty_iterable - - with warnings.catch_warnings(record=True) as warned: - df = row_iterator.to_dataframe() - - self.assertIsInstance(df, pandas.DataFrame) - self.assertEqual(len(df), 0) # verify the number of rows - self.assertEqual(list(df), ["name", "age"]) # verify the column names - - self.assertEqual(len(warned), 1) - warning = warned[0] - self.assertTrue("without pyarrow" in str(warning)) - @unittest.skipIf(pandas is None, "Requires `pandas`") def test_to_dataframe_w_various_types_nullable(self): import datetime @@ -3424,68 +3340,6 @@ def test_to_dataframe_concat_categorical_dtype_w_pyarrow(self): # Don't close the client if it was passed in. bqstorage_client._transport.grpc_channel.close.assert_not_called() - @unittest.skipIf(pandas is None, "Requires `pandas`") - def test_to_dataframe_concat_categorical_dtype_wo_pyarrow(self): - from google.cloud.bigquery.schema import SchemaField - - schema = [ - SchemaField("col_str", "STRING"), - SchemaField("col_category", "STRING"), - ] - row_data = [ - [u"foo", u"low"], - [u"bar", u"medium"], - [u"baz", u"low"], - [u"foo_page2", u"medium"], - [u"bar_page2", u"high"], - [u"baz_page2", u"low"], - ] - path = "/foo" - - rows = [{"f": [{"v": field} for field in row]} for row in row_data[:3]] - rows_page2 = [{"f": [{"v": field} for field in row]} for row in row_data[3:]] - api_request = mock.Mock( - side_effect=[{"rows": rows, "pageToken": "NEXTPAGE"}, {"rows": rows_page2}] - ) - - row_iterator = self._make_one(_mock_client(), api_request, path, schema) - - mock_pyarrow = mock.patch("google.cloud.bigquery.table.pyarrow", None) - catch_warnings = warnings.catch_warnings(record=True) - - with mock_pyarrow, catch_warnings as warned: - got = row_iterator.to_dataframe( - dtypes={ - "col_category": pandas.core.dtypes.dtypes.CategoricalDtype( - categories=["low", "medium", "high"], ordered=False, - ), - }, - ) - - self.assertIsInstance(got, pandas.DataFrame) - self.assertEqual(len(got), 6) # verify the number of rows - expected_columns = [field.name for field in schema] - self.assertEqual(list(got), expected_columns) # verify the column names - - # Are column types correct? - expected_dtypes = [ - pandas.core.dtypes.dtypes.np.dtype("O"), # the default for string data - pandas.core.dtypes.dtypes.CategoricalDtype( - categories=["low", "medium", "high"], ordered=False, - ), - ] - self.assertEqual(list(got.dtypes), expected_dtypes) - - # And the data in the categorical column? - self.assertEqual( - list(got["col_category"]), - ["low", "medium", "low", "medium", "high", "low"], - ) - - self.assertEqual(len(warned), 1) - warning = warned[0] - self.assertTrue("without pyarrow" in str(warning)) - class TestPartitionRange(unittest.TestCase): def _get_target_class(self):