From d792ce09388a6ee3706777915dd2818d4c854f79 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 2 Jun 2021 09:17:20 +0200 Subject: [PATCH] feat: add support for Parquet options (#679) * feat: add support for Parquet options For load jobs and external tables config. * Simplify ParquetOptions.to_api_repr() Co-authored by Tres Seaver. * Expose ParquetOptions in top level namespace * Parquet options should be reflected in options --- google/cloud/bigquery/__init__.py | 2 + google/cloud/bigquery/external_config.py | 28 ++++++- google/cloud/bigquery/format_options.py | 80 ++++++++++++++++++ google/cloud/bigquery/job/load.py | 21 +++++ tests/unit/job/test_load_config.py | 35 ++++++++ tests/unit/test_external_config.py | 100 +++++++++++++++++++++++ tests/unit/test_format_options.py | 41 ++++++++++ 7 files changed, 306 insertions(+), 1 deletion(-) create mode 100644 google/cloud/bigquery/format_options.py create mode 100644 tests/unit/test_format_options.py diff --git a/google/cloud/bigquery/__init__.py b/google/cloud/bigquery/__init__.py index ec08b2c84..f031cd81d 100644 --- a/google/cloud/bigquery/__init__.py +++ b/google/cloud/bigquery/__init__.py @@ -47,6 +47,7 @@ from google.cloud.bigquery.external_config import CSVOptions from google.cloud.bigquery.external_config import GoogleSheetsOptions from google.cloud.bigquery.external_config import ExternalSourceFormat +from google.cloud.bigquery.format_options import ParquetOptions from google.cloud.bigquery.job import Compression from google.cloud.bigquery.job import CopyJob from google.cloud.bigquery.job import CopyJobConfig @@ -136,6 +137,7 @@ "BigtableColumn", "CSVOptions", "GoogleSheetsOptions", + "ParquetOptions", "DEFAULT_RETRY", # Enum Constants "enums", diff --git a/google/cloud/bigquery/external_config.py b/google/cloud/bigquery/external_config.py index ef4d569fa..0c49d2d76 100644 --- a/google/cloud/bigquery/external_config.py +++ b/google/cloud/bigquery/external_config.py @@ -27,6 +27,7 @@ from google.cloud.bigquery._helpers import _bytes_to_json from google.cloud.bigquery._helpers import _int_or_none from google.cloud.bigquery._helpers import _str_or_none +from google.cloud.bigquery.format_options import ParquetOptions from google.cloud.bigquery.schema import SchemaField @@ -53,6 +54,12 @@ class ExternalSourceFormat(object): DATASTORE_BACKUP = "DATASTORE_BACKUP" """Specifies datastore backup format""" + ORC = "ORC" + """Specifies ORC format.""" + + PARQUET = "PARQUET" + """Specifies Parquet format.""" + BIGTABLE = "BIGTABLE" """Specifies Bigtable format.""" @@ -540,7 +547,7 @@ def from_api_repr(cls, resource: dict) -> "GoogleSheetsOptions": return config -_OPTION_CLASSES = (BigtableOptions, CSVOptions, GoogleSheetsOptions) +_OPTION_CLASSES = (BigtableOptions, CSVOptions, GoogleSheetsOptions, ParquetOptions) class HivePartitioningOptions(object): @@ -784,6 +791,25 @@ def schema(self, value): prop = {"fields": [field.to_api_repr() for field in value]} self._properties["schema"] = prop + @property + def parquet_options(self): + """Optional[google.cloud.bigquery.format_options.ParquetOptions]: Additional + properties to set if ``sourceFormat`` is set to PARQUET. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ExternalDataConfiguration.FIELDS.parquet_options + """ + if self.source_format != ExternalSourceFormat.PARQUET: + return None + return self._options + + @parquet_options.setter + def parquet_options(self, value): + if self.source_format != ExternalSourceFormat.PARQUET: + msg = f"Cannot set Parquet options, source format is {self.source_format}" + raise TypeError(msg) + self._options = value + def to_api_repr(self) -> dict: """Build an API representation of this object. diff --git a/google/cloud/bigquery/format_options.py b/google/cloud/bigquery/format_options.py new file mode 100644 index 000000000..2c9a2ce20 --- /dev/null +++ b/google/cloud/bigquery/format_options.py @@ -0,0 +1,80 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +from typing import Dict + + +class ParquetOptions: + """Additional options if the PARQUET source format is used.""" + + _SOURCE_FORMAT = "PARQUET" + _RESOURCE_NAME = "parquetOptions" + + def __init__(self): + self._properties = {} + + @property + def enum_as_string(self) -> bool: + """Indicates whether to infer Parquet ENUM logical type as STRING instead of + BYTES by default. + + See + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ParquetOptions.FIELDS.enum_as_string + """ + return self._properties.get("enumAsString") + + @enum_as_string.setter + def enum_as_string(self, value: bool) -> None: + self._properties["enumAsString"] = value + + @property + def enable_list_inference(self) -> bool: + """Indicates whether to use schema inference specifically for Parquet LIST + logical type. + + See + https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ParquetOptions.FIELDS.enable_list_inference + """ + return self._properties.get("enableListInference") + + @enable_list_inference.setter + def enable_list_inference(self, value: bool) -> None: + self._properties["enableListInference"] = value + + @classmethod + def from_api_repr(cls, resource: Dict[str, bool]) -> "ParquetOptions": + """Factory: construct an instance from a resource dict. + + Args: + resource (Dict[str, bool]): + Definition of a :class:`~.format_options.ParquetOptions` instance in + the same representation as is returned from the API. + + Returns: + :class:`~.format_options.ParquetOptions`: + Configuration parsed from ``resource``. + """ + config = cls() + config._properties = copy.deepcopy(resource) + return config + + def to_api_repr(self) -> dict: + """Build an API representation of this object. + + Returns: + Dict[str, bool]: + A dictionary in the format used by the BigQuery API. + """ + return copy.deepcopy(self._properties) diff --git a/google/cloud/bigquery/job/load.py b/google/cloud/bigquery/job/load.py index b8174af3e..41d38dd74 100644 --- a/google/cloud/bigquery/job/load.py +++ b/google/cloud/bigquery/job/load.py @@ -16,6 +16,7 @@ from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration from google.cloud.bigquery.external_config import HivePartitioningOptions +from google.cloud.bigquery.format_options import ParquetOptions from google.cloud.bigquery import _helpers from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.schema import _to_schema_fields @@ -439,6 +440,26 @@ def write_disposition(self): def write_disposition(self, value): self._set_sub_prop("writeDisposition", value) + @property + def parquet_options(self): + """Optional[google.cloud.bigquery.format_options.ParquetOptions]: Additional + properties to set if ``sourceFormat`` is set to PARQUET. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.parquet_options + """ + prop = self._get_sub_prop("parquetOptions") + if prop is not None: + prop = ParquetOptions.from_api_repr(prop) + return prop + + @parquet_options.setter + def parquet_options(self, value): + if value is not None: + self._set_sub_prop("parquetOptions", value.to_api_repr()) + else: + self._del_sub_prop("parquetOptions") + class LoadJob(_AsyncJob): """Asynchronous job for loading data into a table. diff --git a/tests/unit/job/test_load_config.py b/tests/unit/job/test_load_config.py index 63f15ec5a..b0729e428 100644 --- a/tests/unit/job/test_load_config.py +++ b/tests/unit/job/test_load_config.py @@ -700,3 +700,38 @@ def test_write_disposition_setter(self): self.assertEqual( config._properties["load"]["writeDisposition"], write_disposition ) + + def test_parquet_options_missing(self): + config = self._get_target_class()() + self.assertIsNone(config.parquet_options) + + def test_parquet_options_hit(self): + config = self._get_target_class()() + config._properties["load"]["parquetOptions"] = dict( + enumAsString=True, enableListInference=False + ) + self.assertTrue(config.parquet_options.enum_as_string) + self.assertFalse(config.parquet_options.enable_list_inference) + + def test_parquet_options_setter(self): + from google.cloud.bigquery.format_options import ParquetOptions + + parquet_options = ParquetOptions.from_api_repr( + dict(enumAsString=False, enableListInference=True) + ) + config = self._get_target_class()() + + config.parquet_options = parquet_options + self.assertEqual( + config._properties["load"]["parquetOptions"], + {"enumAsString": False, "enableListInference": True}, + ) + + def test_parquet_options_setter_clearing(self): + config = self._get_target_class()() + config._properties["load"]["parquetOptions"] = dict( + enumAsString=False, enableListInference=True + ) + + config.parquet_options = None + self.assertNotIn("parquetOptions", config._properties["load"]) diff --git a/tests/unit/test_external_config.py b/tests/unit/test_external_config.py index 648a8717e..7178367ea 100644 --- a/tests/unit/test_external_config.py +++ b/tests/unit/test_external_config.py @@ -425,6 +425,106 @@ def test_to_api_repr_bigtable(self): self.assertEqual(got_resource, exp_resource) + def test_parquet_options_getter(self): + from google.cloud.bigquery.format_options import ParquetOptions + + parquet_options = ParquetOptions.from_api_repr( + {"enumAsString": True, "enableListInference": False} + ) + ec = external_config.ExternalConfig( + external_config.ExternalSourceFormat.PARQUET + ) + + self.assertIsNone(ec.parquet_options.enum_as_string) + self.assertIsNone(ec.parquet_options.enable_list_inference) + + ec._options = parquet_options + + self.assertTrue(ec.parquet_options.enum_as_string) + self.assertFalse(ec.parquet_options.enable_list_inference) + + self.assertIs(ec.parquet_options, ec.options) + + def test_parquet_options_getter_non_parquet_format(self): + ec = external_config.ExternalConfig(external_config.ExternalSourceFormat.CSV) + self.assertIsNone(ec.parquet_options) + + def test_parquet_options_setter(self): + from google.cloud.bigquery.format_options import ParquetOptions + + parquet_options = ParquetOptions.from_api_repr( + {"enumAsString": False, "enableListInference": True} + ) + ec = external_config.ExternalConfig( + external_config.ExternalSourceFormat.PARQUET + ) + + ec.parquet_options = parquet_options + + # Setting Parquet options should be reflected in the generic options attribute. + self.assertFalse(ec.options.enum_as_string) + self.assertTrue(ec.options.enable_list_inference) + + def test_parquet_options_setter_non_parquet_format(self): + from google.cloud.bigquery.format_options import ParquetOptions + + parquet_options = ParquetOptions.from_api_repr( + {"enumAsString": False, "enableListInference": True} + ) + ec = external_config.ExternalConfig(external_config.ExternalSourceFormat.CSV) + + with self.assertRaisesRegex(TypeError, "Cannot set.*source format is CSV"): + ec.parquet_options = parquet_options + + def test_from_api_repr_parquet(self): + from google.cloud.bigquery.format_options import ParquetOptions + + resource = _copy_and_update( + self.BASE_RESOURCE, + { + "sourceFormat": "PARQUET", + "parquetOptions": {"enumAsString": True, "enableListInference": False}, + }, + ) + + ec = external_config.ExternalConfig.from_api_repr(resource) + + self._verify_base(ec) + self.assertEqual(ec.source_format, external_config.ExternalSourceFormat.PARQUET) + self.assertIsInstance(ec.options, ParquetOptions) + self.assertTrue(ec.parquet_options.enum_as_string) + self.assertFalse(ec.parquet_options.enable_list_inference) + + got_resource = ec.to_api_repr() + + self.assertEqual(got_resource, resource) + + del resource["parquetOptions"]["enableListInference"] + ec = external_config.ExternalConfig.from_api_repr(resource) + self.assertIsNone(ec.options.enable_list_inference) + got_resource = ec.to_api_repr() + self.assertEqual(got_resource, resource) + + def test_to_api_repr_parquet(self): + from google.cloud.bigquery.format_options import ParquetOptions + + ec = external_config.ExternalConfig( + external_config.ExternalSourceFormat.PARQUET + ) + options = ParquetOptions.from_api_repr( + dict(enumAsString=False, enableListInference=True) + ) + ec._options = options + + exp_resource = { + "sourceFormat": external_config.ExternalSourceFormat.PARQUET, + "parquetOptions": {"enumAsString": False, "enableListInference": True}, + } + + got_resource = ec.to_api_repr() + + self.assertEqual(got_resource, exp_resource) + def _copy_and_update(d, u): d = copy.deepcopy(d) diff --git a/tests/unit/test_format_options.py b/tests/unit/test_format_options.py new file mode 100644 index 000000000..ab5f9e05c --- /dev/null +++ b/tests/unit/test_format_options.py @@ -0,0 +1,41 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class TestParquetOptions: + @staticmethod + def _get_target_class(): + from google.cloud.bigquery.format_options import ParquetOptions + + return ParquetOptions + + def test_ctor(self): + config = self._get_target_class()() + assert config.enum_as_string is None + assert config.enable_list_inference is None + + def test_from_api_repr(self): + config = self._get_target_class().from_api_repr( + {"enumAsString": False, "enableListInference": True} + ) + assert not config.enum_as_string + assert config.enable_list_inference + + def test_to_api_repr(self): + config = self._get_target_class()() + config.enum_as_string = True + config.enable_list_inference = False + + result = config.to_api_repr() + assert result == {"enumAsString": True, "enableListInference": False}