Skip to content

Commit

Permalink
fix: support ARRAY data type when loading from DataFrame with Parquet (
Browse files Browse the repository at this point in the history
…#980)

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)

Fixes #19 🦕
  • Loading branch information
judahrand committed Oct 7, 2021
1 parent aacc521 commit 1e59083
Show file tree
Hide file tree
Showing 5 changed files with 483 additions and 45 deletions.
11 changes: 11 additions & 0 deletions google/cloud/bigquery/_helpers.py
Expand Up @@ -107,6 +107,9 @@ def verify_version(self):
class PyarrowVersions:
"""Version comparisons for pyarrow package."""

# https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414
_PYARROW_BAD_VERSIONS = frozenset([packaging.version.Version("2.0.0")])

def __init__(self):
self._installed_version = None

Expand All @@ -126,6 +129,14 @@ def installed_version(self) -> packaging.version.Version:

return self._installed_version

@property
def is_bad_version(self) -> bool:
return self.installed_version in self._PYARROW_BAD_VERSIONS

@property
def use_compliant_nested_type(self) -> bool:
return self.installed_version.major >= 4

def try_import(self, raise_if_error: bool = False) -> Any:
"""Verify that a recent enough version of pyarrow extra is
installed.
Expand Down
50 changes: 39 additions & 11 deletions google/cloud/bigquery/_pandas_helpers.py
Expand Up @@ -79,8 +79,8 @@ def _to_wkb(v):
_PANDAS_DTYPE_TO_BQ = {
"bool": "BOOLEAN",
"datetime64[ns, UTC]": "TIMESTAMP",
# BigQuery does not support uploading DATETIME values from Parquet files.
# See: https://github.com/googleapis/google-cloud-python/issues/9996
# TODO: Update to DATETIME in V3
# https://github.com/googleapis/python-bigquery/issues/985
"datetime64[ns]": "TIMESTAMP",
"float32": "FLOAT",
"float64": "FLOAT",
Expand Down Expand Up @@ -396,7 +396,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
# column, but it was not found.
if bq_schema_unused:
raise ValueError(
u"bq_schema contains fields not present in dataframe: {}".format(
"bq_schema contains fields not present in dataframe: {}".format(
bq_schema_unused
)
)
Expand All @@ -405,7 +405,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
# pyarrow, if available.
if unknown_type_fields:
if not pyarrow:
msg = u"Could not determine the type of columns: {}".format(
msg = "Could not determine the type of columns: {}".format(
", ".join(field.name for field in unknown_type_fields)
)
warnings.warn(msg)
Expand Down Expand Up @@ -444,7 +444,14 @@ def augment_schema(dataframe, current_bq_schema):
continue

arrow_table = pyarrow.array(dataframe[field.name])
detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.type.id)

if pyarrow.types.is_list(arrow_table.type):
# `pyarrow.ListType`
detected_mode = "REPEATED"
detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.values.type.id)
else:
detected_mode = field.mode
detected_type = ARROW_SCALAR_IDS_TO_BQ.get(arrow_table.type.id)

if detected_type is None:
unknown_type_fields.append(field)
Expand All @@ -453,15 +460,15 @@ def augment_schema(dataframe, current_bq_schema):
new_field = schema.SchemaField(
name=field.name,
field_type=detected_type,
mode=field.mode,
mode=detected_mode,
description=field.description,
fields=field.fields,
)
augmented_schema.append(new_field)

if unknown_type_fields:
warnings.warn(
u"Pyarrow could not determine the type of columns: {}.".format(
"Pyarrow could not determine the type of columns: {}.".format(
", ".join(field.name for field in unknown_type_fields)
)
)
Expand Down Expand Up @@ -500,7 +507,7 @@ def dataframe_to_arrow(dataframe, bq_schema):
extra_fields = bq_field_names - column_and_index_names
if extra_fields:
raise ValueError(
u"bq_schema contains fields not present in dataframe: {}".format(
"bq_schema contains fields not present in dataframe: {}".format(
extra_fields
)
)
Expand All @@ -510,7 +517,7 @@ def dataframe_to_arrow(dataframe, bq_schema):
missing_fields = column_names - bq_field_names
if missing_fields:
raise ValueError(
u"bq_schema is missing fields from dataframe: {}".format(missing_fields)
"bq_schema is missing fields from dataframe: {}".format(missing_fields)
)

arrow_arrays = []
Expand All @@ -530,7 +537,13 @@ def dataframe_to_arrow(dataframe, bq_schema):
return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names)


def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SNAPPY"):
def dataframe_to_parquet(
dataframe,
bq_schema,
filepath,
parquet_compression="SNAPPY",
parquet_use_compliant_nested_type=True,
):
"""Write dataframe as a Parquet file, according to the desired BQ schema.
This function requires the :mod:`pyarrow` package. Arrow is used as an
Expand All @@ -551,14 +564,29 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
The compression codec to use by the the ``pyarrow.parquet.write_table``
serializing method. Defaults to "SNAPPY".
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
parquet_use_compliant_nested_type (bool):
Whether the ``pyarrow.parquet.write_table`` serializing method should write
compliant Parquet nested type (lists). Defaults to ``True``.
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
This argument is ignored for ``pyarrow`` versions earlier than ``4.0.0``.
"""
pyarrow = _helpers.PYARROW_VERSIONS.try_import(raise_if_error=True)

import pyarrow.parquet

kwargs = (
{"use_compliant_nested_type": parquet_use_compliant_nested_type}
if _helpers.PYARROW_VERSIONS.use_compliant_nested_type
else {}
)

bq_schema = schema._to_schema_fields(bq_schema)
arrow_table = dataframe_to_arrow(dataframe, bq_schema)
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)
pyarrow.parquet.write_table(
arrow_table, filepath, compression=parquet_compression, **kwargs,
)


def _row_iterator_page_to_arrow(page, column_names, arrow_types):
Expand Down
72 changes: 43 additions & 29 deletions google/cloud/bigquery/client.py
Expand Up @@ -27,19 +27,11 @@
import json
import math
import os
import packaging.version
import tempfile
from typing import Any, BinaryIO, Dict, Iterable, Optional, Sequence, Tuple, Union
import uuid
import warnings

try:
import pyarrow

_PYARROW_VERSION = packaging.version.parse(pyarrow.__version__)
except ImportError: # pragma: NO COVER
pyarrow = None

from google import resumable_media # type: ignore
from google.resumable_media.requests import MultipartUpload
from google.resumable_media.requests import ResumableUpload
Expand Down Expand Up @@ -103,6 +95,10 @@
from google.cloud.bigquery.table import TableListItem
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.format_options import ParquetOptions
from google.cloud.bigquery import _helpers

pyarrow = _helpers.PYARROW_VERSIONS.try_import()


_DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB
Expand All @@ -128,8 +124,6 @@
# https://github.com/googleapis/python-bigquery/issues/438
_MIN_GET_QUERY_RESULTS_TIMEOUT = 120

# https://github.com/googleapis/python-bigquery/issues/781#issuecomment-883497414
_PYARROW_BAD_VERSIONS = frozenset([packaging.version.Version("2.0.0")])

TIMEOUT_HEADER = "X-Server-Timeout"

Expand Down Expand Up @@ -2469,10 +2463,10 @@ def load_table_from_dataframe(
They are supported when using the PARQUET source format, but
due to the way they are encoded in the ``parquet`` file,
a mismatch with the existing table schema can occur, so
100% compatibility cannot be guaranteed for REPEATED fields when
REPEATED fields are not properly supported when using ``pyarrow<4.0.0``
using the parquet format.
https://github.com/googleapis/python-bigquery/issues/17
https://github.com/googleapis/python-bigquery/issues/19
Args:
dataframe (pandas.DataFrame):
Expand Down Expand Up @@ -2519,18 +2513,18 @@ def load_table_from_dataframe(
:attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are
supported.
parquet_compression (Optional[str]):
[Beta] The compression method to use if intermittently
serializing ``dataframe`` to a parquet file.
The argument is directly passed as the ``compression``
argument to the underlying ``pyarrow.parquet.write_table()``
method (the default value "snappy" gets converted to uppercase).
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
If the job config schema is missing, the argument is directly
passed as the ``compression`` argument to the underlying
``DataFrame.to_parquet()`` method.
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
[Beta] The compression method to use if intermittently
serializing ``dataframe`` to a parquet file.
The argument is directly passed as the ``compression``
argument to the underlying ``pyarrow.parquet.write_table()``
method (the default value "snappy" gets converted to uppercase).
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html#pyarrow-parquet-write-table
If the job config schema is missing, the argument is directly
passed as the ``compression`` argument to the underlying
``DataFrame.to_parquet()`` method.
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
Expand Down Expand Up @@ -2562,6 +2556,16 @@ def load_table_from_dataframe(
if job_config.source_format is None:
# default value
job_config.source_format = job.SourceFormat.PARQUET

if (
job_config.source_format == job.SourceFormat.PARQUET
and job_config.parquet_options is None
):
parquet_options = ParquetOptions()
# default value
parquet_options.enable_list_inference = True
job_config.parquet_options = parquet_options

if job_config.source_format not in supported_formats:
raise ValueError(
"Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format(
Expand Down Expand Up @@ -2628,12 +2632,12 @@ def load_table_from_dataframe(
try:

if job_config.source_format == job.SourceFormat.PARQUET:
if _PYARROW_VERSION in _PYARROW_BAD_VERSIONS:
if _helpers.PYARROW_VERSIONS.is_bad_version:
msg = (
"Loading dataframe data in PARQUET format with pyarrow "
f"{_PYARROW_VERSION} can result in data corruption. It is "
"therefore *strongly* advised to use a different pyarrow "
"version or a different source format. "
f"{_helpers.PYARROW_VERSIONS.installed_version} can result in data "
"corruption. It is therefore *strongly* advised to use a "
"different pyarrow version or a different source format. "
"See: https://github.com/googleapis/python-bigquery/issues/781"
)
warnings.warn(msg, category=RuntimeWarning)
Expand All @@ -2647,9 +2651,19 @@ def load_table_from_dataframe(
job_config.schema,
tmppath,
parquet_compression=parquet_compression,
parquet_use_compliant_nested_type=True,
)
else:
dataframe.to_parquet(tmppath, compression=parquet_compression)
dataframe.to_parquet(
tmppath,
engine="pyarrow",
compression=parquet_compression,
**(
{"use_compliant_nested_type": True}
if _helpers.PYARROW_VERSIONS.use_compliant_nested_type
else {}
),
)

else:

Expand Down

0 comments on commit 1e59083

Please sign in to comment.