Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps: require pyarrow for pandas support #314

Merged
merged 4 commits into from Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 0 additions & 4 deletions docs/snippets.py
Expand Up @@ -26,10 +26,6 @@

import pytest

try:
import fastparquet
except (ImportError, AttributeError):
fastparquet = None
try:
import pandas
except (ImportError, AttributeError):
Expand Down
3 changes: 0 additions & 3 deletions google/cloud/bigquery/__init__.py
Expand Up @@ -38,7 +38,6 @@
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery import enums
from google.cloud.bigquery.enums import StandardSqlDataTypes
from google.cloud.bigquery.exceptions import PyarrowMissingWarning
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.external_config import BigtableOptions
from google.cloud.bigquery.external_config import BigtableColumnFamily
Expand Down Expand Up @@ -143,8 +142,6 @@
"WriteDisposition",
# EncryptionConfiguration
"EncryptionConfiguration",
# Errors and warnings
"PyarrowMissingWarning",
]


Expand Down
43 changes: 13 additions & 30 deletions google/cloud/bigquery/client.py
Expand Up @@ -58,7 +58,6 @@
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetListItem
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.exceptions import PyarrowMissingWarning
from google.cloud.bigquery.opentelemetry_tracing import create_span
from google.cloud.bigquery import job
from google.cloud.bigquery.model import Model
Expand Down Expand Up @@ -2135,29 +2134,31 @@ def load_table_from_dataframe(
[Beta] The compression method to use if intermittently
serializing ``dataframe`` to a parquet file.

If ``pyarrow`` and job config schema are used, the argument
is directly passed as the ``compression`` argument to the
underlying ``pyarrow.parquet.write_table()`` method (the
default value "snappy" gets converted to uppercase).
The argument is directly passed as the ``compression``
argument to the underlying ``pyarrow.parquet.write_table()``
method (the default value "snappy" gets converted to uppercase).
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table

If either ``pyarrow`` or job config schema are missing, the
argument is directly passed as the ``compression`` argument
to the underlying ``DataFrame.to_parquet()`` method.
If the job config schema is missing, the argument is directly
passed as the ``compression`` argument to the underlying
``DataFrame.to_parquet()`` method.
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet

Returns:
google.cloud.bigquery.job.LoadJob: A new load job.

Raises:
ImportError:
ValueError:
If a usable parquet engine cannot be found. This method
requires :mod:`pyarrow` or :mod:`fastparquet` to be
installed.
requires :mod:`pyarrow` to be installed.
TypeError:
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
class.
"""
if pyarrow is None:
# pyarrow is now the only supported parquet engine.
raise ValueError("This method requires pyarrow to be installed")

job_id = _make_job_id(job_id, job_id_prefix)

if job_config:
Expand Down Expand Up @@ -2222,7 +2223,7 @@ def load_table_from_dataframe(
os.close(tmpfd)

try:
if pyarrow and job_config.schema:
if job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()

Expand All @@ -2233,24 +2234,6 @@ def load_table_from_dataframe(
parquet_compression=parquet_compression,
)
else:
if not pyarrow:
warnings.warn(
"Loading dataframe data without pyarrow installed is "
"deprecated and will become unsupported in the future. "
"Please install the pyarrow package.",
PyarrowMissingWarning,
stacklevel=2,
)

if job_config.schema:
warnings.warn(
"job_config.schema is set, but not used to assist in "
"identifying correct types for data serialization. "
"Please install the pyarrow package.",
PendingDeprecationWarning,
stacklevel=2,
)

dataframe.to_parquet(tmppath, compression=parquet_compression)

with open(tmppath, "rb") as parquet_file:
Expand Down
17 changes: 0 additions & 17 deletions google/cloud/bigquery/exceptions.py

This file was deleted.

92 changes: 27 additions & 65 deletions google/cloud/bigquery/table.py
Expand Up @@ -50,7 +50,6 @@
from google.cloud.bigquery.schema import _build_schema_resource
from google.cloud.bigquery.schema import _parse_schema_resource
from google.cloud.bigquery.schema import _to_schema_fields
from google.cloud.bigquery.exceptions import PyarrowMissingWarning
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration

Expand Down Expand Up @@ -1679,75 +1678,38 @@ def to_dataframe(
create_bqstorage_client = False
bqstorage_client = None

if pyarrow is not None:
# If pyarrow is available, calling to_arrow, then converting to a
# pandas dataframe is about 2x faster. This is because pandas.concat is
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
record_batch = self.to_arrow(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
)
record_batch = self.to_arrow(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
)

# When converting timestamp values to nanosecond precision, the result
# can be out of pyarrow bounds. To avoid the error when converting to
# Pandas, we set the timestamp_as_object parameter to True, if necessary.
types_to_check = {
pyarrow.timestamp("us"),
pyarrow.timestamp("us", tz=pytz.UTC),
}

# When converting timestamp values to nanosecond precision, the result
# can be out of pyarrow bounds. To avoid the error when converting to
# Pandas, we set the timestamp_as_object parameter to True, if necessary.
types_to_check = {
pyarrow.timestamp("us"),
pyarrow.timestamp("us", tz=pytz.UTC),
}

for column in record_batch:
if column.type in types_to_check:
try:
column.cast("timestamp[ns]")
except pyarrow.lib.ArrowInvalid:
timestamp_as_object = True
break
else:
timestamp_as_object = False

extra_kwargs = {"timestamp_as_object": timestamp_as_object}

df = record_batch.to_pandas(date_as_object=date_as_object, **extra_kwargs)

for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df
for column in record_batch:
if column.type in types_to_check:
try:
column.cast("timestamp[ns]")
except pyarrow.lib.ArrowInvalid:
timestamp_as_object = True
break
else:
warnings.warn(
"Converting to a dataframe without pyarrow installed is "
"often slower and will become unsupported in the future. "
"Please install the pyarrow package.",
PyarrowMissingWarning,
stacklevel=2,
)
timestamp_as_object = False

# The bqstorage_client is only used if pyarrow is available, so the
# rest of this method only needs to account for tabledata.list.
progress_bar = self._get_progress_bar(progress_bar_type)
extra_kwargs = {"timestamp_as_object": timestamp_as_object}

frames = []
for frame in self.to_dataframe_iterable(dtypes=dtypes):
frames.append(frame)
df = record_batch.to_pandas(date_as_object=date_as_object, **extra_kwargs)

if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(frame))

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()

# Avoid concatting an empty list.
if not frames:
column_names = [field.name for field in self._schema]
return pandas.DataFrame(columns=column_names)
return pandas.concat(frames, ignore_index=True)
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])

return df


class _EmptyRowIterator(object):
Expand Down
5 changes: 1 addition & 4 deletions noxfile.py
Expand Up @@ -49,10 +49,7 @@ def default(session):
constraints_path,
)

# fastparquet is not included in .[all] because, in general, it's
# redundant with pyarrow. We still want to run some unit tests with
# fastparquet serialization, though.
session.install("-e", ".[all,fastparquet]", "-c", constraints_path)
session.install("-e", ".[all]", "-c", constraints_path)

session.install("ipython", "-c", constraints_path)

Expand Down
12 changes: 2 additions & 10 deletions setup.py
Expand Up @@ -47,13 +47,12 @@
"grpcio >= 1.32.0, < 2.0dev",
"pyarrow >= 1.0.0, < 2.0dev",
],
"pandas": ["pandas>=0.23.0"],
"pyarrow": [
"pandas": [
"pandas>=0.23.0",
# pyarrow 1.0.0 is required for the use of timestamp_as_object keyword.
"pyarrow >= 1.0.0, < 2.0dev",
],
"tqdm": ["tqdm >= 4.7.4, <5.0.0dev"],
"fastparquet": ["fastparquet", "python-snappy", "llvmlite>=0.34.0"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see us add "pyarrow" to the "pandas" extras now, since it's needed for both uploads and downloads to dataframe.

We can maybe refactor the pyarrow >=1.0.0,<2.0dev string into a variable since it's going to appear 3 times in setup.py now too

"opentelemetry": [
"opentelemetry-api==0.9b0",
"opentelemetry-sdk==0.9b0",
Expand All @@ -64,13 +63,6 @@
all_extras = []

for extra in extras:
if extra in (
# Skip fastparquet from "all" because it is redundant with pyarrow and
# creates a dependency on pre-release versions of numpy. See:
# https://github.com/googleapis/google-cloud-python/issues/8549
"fastparquet",
):
continue
all_extras.extend(extras[extra])

extras["all"] = all_extras
Expand Down
1 change: 0 additions & 1 deletion testing/constraints-3.6.txt
@@ -1,4 +1,3 @@
fastparquet==0.4.1
google-api-core==1.22.2
google-cloud-bigquery-storage==2.0.0
google-cloud-core==1.4.1
Expand Down
8 changes: 8 additions & 0 deletions tests/unit/test__pandas_helpers.py
Expand Up @@ -1329,3 +1329,11 @@ def test_download_dataframe_tabledata_list_dict_sequence_schema(module_under_tes
)
)
assert result.equals(expected_result)

with pytest.raises(StopIteration):
result = next(results_gen)


def test_table_data_listpage_to_dataframe_skips_stop_iteration(module_under_test):
dataframe = module_under_test._tabledata_list_page_to_dataframe([], [], {})
assert isinstance(dataframe, pandas.DataFrame)