Skip to content

Commit

Permalink
feat: add support for Parquet options (#679)
Browse files Browse the repository at this point in the history
* feat: add support for Parquet options

For load jobs and external tables config.

* Simplify ParquetOptions.to_api_repr()

Co-authored by Tres Seaver.

* Expose ParquetOptions in top level namespace

* Parquet options should be reflected in options
  • Loading branch information
plamut committed Jun 2, 2021
1 parent a0a9fa2 commit d792ce0
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 1 deletion.
2 changes: 2 additions & 0 deletions google/cloud/bigquery/__init__.py
Expand Up @@ -47,6 +47,7 @@
from google.cloud.bigquery.external_config import CSVOptions
from google.cloud.bigquery.external_config import GoogleSheetsOptions
from google.cloud.bigquery.external_config import ExternalSourceFormat
from google.cloud.bigquery.format_options import ParquetOptions
from google.cloud.bigquery.job import Compression
from google.cloud.bigquery.job import CopyJob
from google.cloud.bigquery.job import CopyJobConfig
Expand Down Expand Up @@ -136,6 +137,7 @@
"BigtableColumn",
"CSVOptions",
"GoogleSheetsOptions",
"ParquetOptions",
"DEFAULT_RETRY",
# Enum Constants
"enums",
Expand Down
28 changes: 27 additions & 1 deletion google/cloud/bigquery/external_config.py
Expand Up @@ -27,6 +27,7 @@
from google.cloud.bigquery._helpers import _bytes_to_json
from google.cloud.bigquery._helpers import _int_or_none
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery.format_options import ParquetOptions
from google.cloud.bigquery.schema import SchemaField


Expand All @@ -53,6 +54,12 @@ class ExternalSourceFormat(object):
DATASTORE_BACKUP = "DATASTORE_BACKUP"
"""Specifies datastore backup format"""

ORC = "ORC"
"""Specifies ORC format."""

PARQUET = "PARQUET"
"""Specifies Parquet format."""

BIGTABLE = "BIGTABLE"
"""Specifies Bigtable format."""

Expand Down Expand Up @@ -540,7 +547,7 @@ def from_api_repr(cls, resource: dict) -> "GoogleSheetsOptions":
return config


_OPTION_CLASSES = (BigtableOptions, CSVOptions, GoogleSheetsOptions)
_OPTION_CLASSES = (BigtableOptions, CSVOptions, GoogleSheetsOptions, ParquetOptions)


class HivePartitioningOptions(object):
Expand Down Expand Up @@ -784,6 +791,25 @@ def schema(self, value):
prop = {"fields": [field.to_api_repr() for field in value]}
self._properties["schema"] = prop

@property
def parquet_options(self):
"""Optional[google.cloud.bigquery.format_options.ParquetOptions]: Additional
properties to set if ``sourceFormat`` is set to PARQUET.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ExternalDataConfiguration.FIELDS.parquet_options
"""
if self.source_format != ExternalSourceFormat.PARQUET:
return None
return self._options

@parquet_options.setter
def parquet_options(self, value):
if self.source_format != ExternalSourceFormat.PARQUET:
msg = f"Cannot set Parquet options, source format is {self.source_format}"
raise TypeError(msg)
self._options = value

def to_api_repr(self) -> dict:
"""Build an API representation of this object.
Expand Down
80 changes: 80 additions & 0 deletions google/cloud/bigquery/format_options.py
@@ -0,0 +1,80 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
from typing import Dict


class ParquetOptions:
"""Additional options if the PARQUET source format is used."""

_SOURCE_FORMAT = "PARQUET"
_RESOURCE_NAME = "parquetOptions"

def __init__(self):
self._properties = {}

@property
def enum_as_string(self) -> bool:
"""Indicates whether to infer Parquet ENUM logical type as STRING instead of
BYTES by default.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ParquetOptions.FIELDS.enum_as_string
"""
return self._properties.get("enumAsString")

@enum_as_string.setter
def enum_as_string(self, value: bool) -> None:
self._properties["enumAsString"] = value

@property
def enable_list_inference(self) -> bool:
"""Indicates whether to use schema inference specifically for Parquet LIST
logical type.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ParquetOptions.FIELDS.enable_list_inference
"""
return self._properties.get("enableListInference")

@enable_list_inference.setter
def enable_list_inference(self, value: bool) -> None:
self._properties["enableListInference"] = value

@classmethod
def from_api_repr(cls, resource: Dict[str, bool]) -> "ParquetOptions":
"""Factory: construct an instance from a resource dict.
Args:
resource (Dict[str, bool]):
Definition of a :class:`~.format_options.ParquetOptions` instance in
the same representation as is returned from the API.
Returns:
:class:`~.format_options.ParquetOptions`:
Configuration parsed from ``resource``.
"""
config = cls()
config._properties = copy.deepcopy(resource)
return config

def to_api_repr(self) -> dict:
"""Build an API representation of this object.
Returns:
Dict[str, bool]:
A dictionary in the format used by the BigQuery API.
"""
return copy.deepcopy(self._properties)
21 changes: 21 additions & 0 deletions google/cloud/bigquery/job/load.py
Expand Up @@ -16,6 +16,7 @@

from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
from google.cloud.bigquery.external_config import HivePartitioningOptions
from google.cloud.bigquery.format_options import ParquetOptions
from google.cloud.bigquery import _helpers
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import _to_schema_fields
Expand Down Expand Up @@ -439,6 +440,26 @@ def write_disposition(self):
def write_disposition(self, value):
self._set_sub_prop("writeDisposition", value)

@property
def parquet_options(self):
"""Optional[google.cloud.bigquery.format_options.ParquetOptions]: Additional
properties to set if ``sourceFormat`` is set to PARQUET.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.parquet_options
"""
prop = self._get_sub_prop("parquetOptions")
if prop is not None:
prop = ParquetOptions.from_api_repr(prop)
return prop

@parquet_options.setter
def parquet_options(self, value):
if value is not None:
self._set_sub_prop("parquetOptions", value.to_api_repr())
else:
self._del_sub_prop("parquetOptions")


class LoadJob(_AsyncJob):
"""Asynchronous job for loading data into a table.
Expand Down
35 changes: 35 additions & 0 deletions tests/unit/job/test_load_config.py
Expand Up @@ -700,3 +700,38 @@ def test_write_disposition_setter(self):
self.assertEqual(
config._properties["load"]["writeDisposition"], write_disposition
)

def test_parquet_options_missing(self):
config = self._get_target_class()()
self.assertIsNone(config.parquet_options)

def test_parquet_options_hit(self):
config = self._get_target_class()()
config._properties["load"]["parquetOptions"] = dict(
enumAsString=True, enableListInference=False
)
self.assertTrue(config.parquet_options.enum_as_string)
self.assertFalse(config.parquet_options.enable_list_inference)

def test_parquet_options_setter(self):
from google.cloud.bigquery.format_options import ParquetOptions

parquet_options = ParquetOptions.from_api_repr(
dict(enumAsString=False, enableListInference=True)
)
config = self._get_target_class()()

config.parquet_options = parquet_options
self.assertEqual(
config._properties["load"]["parquetOptions"],
{"enumAsString": False, "enableListInference": True},
)

def test_parquet_options_setter_clearing(self):
config = self._get_target_class()()
config._properties["load"]["parquetOptions"] = dict(
enumAsString=False, enableListInference=True
)

config.parquet_options = None
self.assertNotIn("parquetOptions", config._properties["load"])
100 changes: 100 additions & 0 deletions tests/unit/test_external_config.py
Expand Up @@ -425,6 +425,106 @@ def test_to_api_repr_bigtable(self):

self.assertEqual(got_resource, exp_resource)

def test_parquet_options_getter(self):
from google.cloud.bigquery.format_options import ParquetOptions

parquet_options = ParquetOptions.from_api_repr(
{"enumAsString": True, "enableListInference": False}
)
ec = external_config.ExternalConfig(
external_config.ExternalSourceFormat.PARQUET
)

self.assertIsNone(ec.parquet_options.enum_as_string)
self.assertIsNone(ec.parquet_options.enable_list_inference)

ec._options = parquet_options

self.assertTrue(ec.parquet_options.enum_as_string)
self.assertFalse(ec.parquet_options.enable_list_inference)

self.assertIs(ec.parquet_options, ec.options)

def test_parquet_options_getter_non_parquet_format(self):
ec = external_config.ExternalConfig(external_config.ExternalSourceFormat.CSV)
self.assertIsNone(ec.parquet_options)

def test_parquet_options_setter(self):
from google.cloud.bigquery.format_options import ParquetOptions

parquet_options = ParquetOptions.from_api_repr(
{"enumAsString": False, "enableListInference": True}
)
ec = external_config.ExternalConfig(
external_config.ExternalSourceFormat.PARQUET
)

ec.parquet_options = parquet_options

# Setting Parquet options should be reflected in the generic options attribute.
self.assertFalse(ec.options.enum_as_string)
self.assertTrue(ec.options.enable_list_inference)

def test_parquet_options_setter_non_parquet_format(self):
from google.cloud.bigquery.format_options import ParquetOptions

parquet_options = ParquetOptions.from_api_repr(
{"enumAsString": False, "enableListInference": True}
)
ec = external_config.ExternalConfig(external_config.ExternalSourceFormat.CSV)

with self.assertRaisesRegex(TypeError, "Cannot set.*source format is CSV"):
ec.parquet_options = parquet_options

def test_from_api_repr_parquet(self):
from google.cloud.bigquery.format_options import ParquetOptions

resource = _copy_and_update(
self.BASE_RESOURCE,
{
"sourceFormat": "PARQUET",
"parquetOptions": {"enumAsString": True, "enableListInference": False},
},
)

ec = external_config.ExternalConfig.from_api_repr(resource)

self._verify_base(ec)
self.assertEqual(ec.source_format, external_config.ExternalSourceFormat.PARQUET)
self.assertIsInstance(ec.options, ParquetOptions)
self.assertTrue(ec.parquet_options.enum_as_string)
self.assertFalse(ec.parquet_options.enable_list_inference)

got_resource = ec.to_api_repr()

self.assertEqual(got_resource, resource)

del resource["parquetOptions"]["enableListInference"]
ec = external_config.ExternalConfig.from_api_repr(resource)
self.assertIsNone(ec.options.enable_list_inference)
got_resource = ec.to_api_repr()
self.assertEqual(got_resource, resource)

def test_to_api_repr_parquet(self):
from google.cloud.bigquery.format_options import ParquetOptions

ec = external_config.ExternalConfig(
external_config.ExternalSourceFormat.PARQUET
)
options = ParquetOptions.from_api_repr(
dict(enumAsString=False, enableListInference=True)
)
ec._options = options

exp_resource = {
"sourceFormat": external_config.ExternalSourceFormat.PARQUET,
"parquetOptions": {"enumAsString": False, "enableListInference": True},
}

got_resource = ec.to_api_repr()

self.assertEqual(got_resource, exp_resource)


def _copy_and_update(d, u):
d = copy.deepcopy(d)
Expand Down
41 changes: 41 additions & 0 deletions tests/unit/test_format_options.py
@@ -0,0 +1,41 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class TestParquetOptions:
@staticmethod
def _get_target_class():
from google.cloud.bigquery.format_options import ParquetOptions

return ParquetOptions

def test_ctor(self):
config = self._get_target_class()()
assert config.enum_as_string is None
assert config.enable_list_inference is None

def test_from_api_repr(self):
config = self._get_target_class().from_api_repr(
{"enumAsString": False, "enableListInference": True}
)
assert not config.enum_as_string
assert config.enable_list_inference

def test_to_api_repr(self):
config = self._get_target_class()()
config.enum_as_string = True
config.enable_list_inference = False

result = config.to_api_repr()
assert result == {"enumAsString": True, "enableListInference": False}

0 comments on commit d792ce0

Please sign in to comment.