Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support ARRAY data type when loading from DataFrame with Parquet #980

Merged
merged 22 commits into from Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
62fd565
fix: use compliant Parquet by default
judahrand Sep 21, 2021
f44a6ae
chore: bump minimum `pyarrow` version
judahrand Sep 21, 2021
8179ded
fix: default to `ParquetOptions.enable_list_inference is True`
judahrand Sep 22, 2021
bbdad5d
feat: detect `pyarrow.ListType` as `REPEATED`
judahrand Sep 22, 2021
839004c
fix: add tests for arrays in DataFrames
judahrand Sep 22, 2021
3cb2439
fix: add to system test to test `REPEATED` schema
judahrand Sep 22, 2021
16b9fc0
fix: only use arg when `pyarrow>=4.0.0`
judahrand Sep 22, 2021
ba1b321
Revert "chore: bump minimum `pyarrow` version"
judahrand Sep 22, 2021
5f915cd
chore: tidy up use of `_helpers.PYARROW_VERSIONS`
judahrand Sep 22, 2021
1c52bb4
Add TODOs for move to V3
judahrand Sep 24, 2021
a452b31
Use `pyarrow` type testing function
judahrand Sep 24, 2021
ffb34a6
Add unit tests for `ParquetOptions`
judahrand Sep 24, 2021
d3828b1
Update docstring
judahrand Sep 24, 2021
62137d8
Remove unused import
judahrand Sep 24, 2021
ba3f145
Remove user facing argument
judahrand Sep 27, 2021
ea54491
Fix doctring typo
judahrand Sep 27, 2021
e43c6fc
Merge branch 'main' into dataframe-arrays
judahrand Sep 27, 2021
d9c508c
Merge branch 'main' into dataframe-arrays
judahrand Sep 30, 2021
e8be400
Merge branch 'main' into dataframe-arrays
tswast Oct 6, 2021
9ec8c67
Update google/cloud/bigquery/client.py
tswast Oct 6, 2021
4fa8665
Update google/cloud/bigquery/client.py
tswast Oct 7, 2021
4714b6c
Merge branch 'main' into dataframe-arrays
tswast Oct 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions google/cloud/bigquery/_helpers.py
Expand Up @@ -107,6 +107,9 @@ def verify_version(self):
class PyarrowVersions:
"""Version comparisons for pyarrow package."""

# https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414
_PYARROW_BAD_VERSIONS = frozenset([packaging.version.Version("2.0.0")])

def __init__(self):
self._installed_version = None

Expand All @@ -126,6 +129,14 @@ def installed_version(self) -> packaging.version.Version:

return self._installed_version

@property
def is_bad_version(self) -> bool:
return self.installed_version in self._PYARROW_BAD_VERSIONS

@property
def use_compliant_nested_type(self) -> bool:
return self.installed_version.major >= 4

def try_import(self, raise_if_error: bool = False) -> Any:
"""Verify that a recent enough version of pyarrow extra is
installed.
Expand Down
50 changes: 39 additions & 11 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -79,8 +79,8 @@ def _to_wkb(v):
_PANDAS_DTYPE_TO_BQ = {
"bool": "BOOLEAN",
"datetime64[ns, UTC]": "TIMESTAMP",
# BigQuery does not support uploading DATETIME values from Parquet files.
# See: https://github.com/googleapis/google-cloud-python/issues/9996
# TODO: Update to DATETIME in V3
# https://github.com/googleapis/python-bigquery/issues/985
"datetime64[ns]": "TIMESTAMP",
"float32": "FLOAT",
"float64": "FLOAT",
Expand Down Expand Up @@ -396,7 +396,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
# column, but it was not found.
if bq_schema_unused:
raise ValueError(
u"bq_schema contains fields not present in dataframe: {}".format(
"bq_schema contains fields not present in dataframe: {}".format(
bq_schema_unused
)
)
Expand All @@ -405,7 +405,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
# pyarrow, if available.
if unknown_type_fields:
if not pyarrow:
msg = u"Could not determine the type of columns: {}".format(
msg = "Could not determine the type of columns: {}".format(
", ".join(field.name for field in unknown_type_fields)
)
warnings.warn(msg)
Expand Down Expand Up @@ -444,7 +444,14 @@ def augment_schema(dataframe, current_bq_schema):
continue

arrow_table = pyarrow.array(dataframe[field.name])
detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.type.id)

if pyarrow.types.is_list(arrow_table.type):
# `pyarrow.ListType`
detected_mode = "REPEATED"
detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.values.type.id)
else:
detected_mode = field.mode
detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.type.id)

if detected_type is None:
unknown_type_fields.append(field)
Expand All @@ -453,15 +460,15 @@ def augment_schema(dataframe, current_bq_schema):
new_field = schema.SchemaField(
name=field.name,
field_type=detected_type,
mode=field.mode,
mode=detected_mode,
description=field.description,
fields=field.fields,
)
augmented_schema.append(new_field)

if unknown_type_fields:
warnings.warn(
u"Pyarrow could not determine the type of columns: {}.".format(
"Pyarrow could not determine the type of columns: {}.".format(
", ".join(field.name for field in unknown_type_fields)
)
)
Expand Down Expand Up @@ -500,7 +507,7 @@ def dataframe_to_arrow(dataframe, bq_schema):
extra_fields = bq_field_names - column_and_index_names
if extra_fields:
raise ValueError(
u"bq_schema contains fields not present in dataframe: {}".format(
"bq_schema contains fields not present in dataframe: {}".format(
extra_fields
)
)
Expand All @@ -510,7 +517,7 @@ def dataframe_to_arrow(dataframe, bq_schema):
missing_fields = column_names - bq_field_names
if missing_fields:
raise ValueError(
u"bq_schema is missing fields from dataframe: {}".format(missing_fields)
"bq_schema is missing fields from dataframe: {}".format(missing_fields)
)

arrow_arrays = []
Expand All @@ -530,7 +537,13 @@ def dataframe_to_arrow(dataframe, bq_schema):
return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names)


def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SNAPPY"):
def dataframe_to_parquet(
dataframe,
bq_schema,
filepath,
parquet_compression="SNAPPY",
parquet_use_compliant_nested_type=True,
):
"""Write dataframe as a Parquet file, according to the desired BQ schema.
This function requires the :mod:`pyarrow` package. Arrow is used as an
Expand All @@ -551,14 +564,29 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
The compression codec to use by the the ``pyarrow.parquet.write_table``
serializing method. Defaults to "SNAPPY".
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
parquet_use_compliant_nested_type (bool):
Whether the ``pyarrow.parquet.write_table`` serializing method should write
compliant Parquet nested type (lists). Defaults to ``True``.
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``.
"""
pyarrow = _helpers.PYARROW_VERSIONS.try_import(raise_if_error=True)

import pyarrow.parquet

kwargs = (
{"use_compliant_nested_type": parquet_use_compliant_nested_type}
if _helpers.PYARROW_VERSIONS.use_compliant_nested_type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

馃憤

else {}
)

bq_schema = schema._to_schema_fields(bq_schema)
arrow_table = dataframe_to_arrow(dataframe, bq_schema)
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)
pyarrow.parquet.write_table(
arrow_table, filepath, compression=parquet_compression, **kwargs,
)


def _row_iterator_page_to_arrow(page, column_names, arrow_types):
Expand Down
72 changes: 43 additions & 29 deletions google/cloud/bigquery/client.py
Expand Up @@ -27,19 +27,11 @@
import json
import math
import os
import packaging.version
import tempfile
from typing import Any, BinaryIO, Dict, Iterable, Optional, Sequence, Tuple, Union
import uuid
import warnings

try:
import pyarrow

_PYARROW_VERSION = packaging.version.parse(pyarrow.__version__)
except ImportError: # pragma: NO COVER
pyarrow = None

from google import resumable_media # type: ignore
from google.resumable_media.requests import MultipartUpload
from google.resumable_media.requests import ResumableUpload
Expand Down Expand Up @@ -103,6 +95,10 @@
from google.cloud.bigquery.table import TableListItem
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.format_options import ParquetOptions
from google.cloud.bigquery import _helpers

pyarrow = _helpers.PYARROW_VERSIONS.try_import()


_DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB
Expand All @@ -128,8 +124,6 @@
# https://github.com/googleapis/python-bigquery/issues/438
_MIN_GET_QUERY_RESULTS_TIMEOUT = 120

# https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414
_PYARROW_BAD_VERSIONS = frozenset([packaging.version.Version("2.0.0")])

TIMEOUT_HEADER = "X-Server-Timeout"

Expand Down Expand Up @@ -2469,10 +2463,10 @@ def load_table_from_dataframe(
They are supported when using the PARQUET source format, but
due to the way they are encoded in the ``parquet`` file,
a mismatch with the existing table schema can occur, so
100% compatibility cannot be guaranteed for REPEATED fields when
REPEATED fields are not properly supported when using ``pyarrow<4.0.0``
using the parquet format.
https://github.com/googleapis/python-bigquery/issues/17
https://github.com/googleapis/python-bigquery/issues/19
Args:
dataframe (pandas.DataFrame):
Expand Down Expand Up @@ -2519,18 +2513,18 @@ def load_table_from_dataframe(
:attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are
supported.
parquet_compression (Optional[str]):
[Beta] The compression method to use if intermittently
serializing ``dataframe`` to a parquet file.
The argument is directly passed as the ``compression``
argument to the underlying ``pyarrow.parquet.write_table()``
method (the default value "snappy" gets converted to uppercase).
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
If the job config schema is missing, the argument is directly
passed as the ``compression`` argument to the underlying
``DataFrame.to_parquet()`` method.
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
[Beta] The compression method to use if intermittently
serializing ``dataframe`` to a parquet file.
The argument is directly passed as the ``compression``
argument to the underlying ``pyarrow.parquet.write_table()``
method (the default value "snappy" gets converted to uppercase).
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
If the job config schema is missing, the argument is directly
passed as the ``compression`` argument to the underlying
``DataFrame.to_parquet()`` method.
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
Expand Down Expand Up @@ -2562,6 +2556,16 @@ def load_table_from_dataframe(
if job_config.source_format is None:
# default value
job_config.source_format = job.SourceFormat.PARQUET

if (
job_config.source_format == job.SourceFormat.PARQUET
and job_config.parquet_options is None
):
parquet_options = ParquetOptions()
# default value
parquet_options.enable_list_inference = True
tswast marked this conversation as resolved.
Show resolved Hide resolved
job_config.parquet_options = parquet_options

if job_config.source_format not in supported_formats:
raise ValueError(
"Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format(
Expand Down Expand Up @@ -2628,12 +2632,12 @@ def load_table_from_dataframe(
try:

if job_config.source_format == job.SourceFormat.PARQUET:
if _PYARROW_VERSION in _PYARROW_BAD_VERSIONS:
if _helpers.PYARROW_VERSIONS.is_bad_version:
msg = (
"Loading dataframe data in PARQUET format with pyarrow "
f"{_PYARROW_VERSION} can result in data corruption. It is "
"therefore *strongly* advised to use a different pyarrow "
"version or a different source format. "
f"{_helpers.PYARROW_VERSIONS.installed_version} can result in data "
"corruption. It is therefore *strongly* advised to use a "
"different pyarrow version or a different source format. "
"See: https://github.com/googleapis/python-bigquery/issues/781"
)
warnings.warn(msg, category=RuntimeWarning)
Expand All @@ -2647,9 +2651,19 @@ def load_table_from_dataframe(
job_config.schema,
tmppath,
parquet_compression=parquet_compression,
parquet_use_compliant_nested_type=True,
)
else:
dataframe.to_parquet(tmppath, compression=parquet_compression)
dataframe.to_parquet(
tmppath,
engine="pyarrow",
compression=parquet_compression,
**(
{"use_compliant_nested_type": True}
if _helpers.PYARROW_VERSIONS.use_compliant_nested_type
else {}
),
)

else:

Expand Down