Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for Parquet options #679

Merged
merged 4 commits into from Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 26 additions & 1 deletion google/cloud/bigquery/external_config.py
Expand Up @@ -27,6 +27,7 @@
from google.cloud.bigquery._helpers import _bytes_to_json
from google.cloud.bigquery._helpers import _int_or_none
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery.format_options import ParquetOptions
from google.cloud.bigquery.schema import SchemaField


Expand All @@ -53,6 +54,12 @@ class ExternalSourceFormat(object):
DATASTORE_BACKUP = "DATASTORE_BACKUP"
"""Specifies datastore backup format"""

ORC = "ORC"
"""Specifies ORC format."""

PARQUET = "PARQUET"
"""Specifies Parquet format."""

BIGTABLE = "BIGTABLE"
"""Specifies Bigtable format."""

Expand Down Expand Up @@ -540,7 +547,7 @@ def from_api_repr(cls, resource: dict) -> "GoogleSheetsOptions":
return config


_OPTION_CLASSES = (BigtableOptions, CSVOptions, GoogleSheetsOptions)
_OPTION_CLASSES = (BigtableOptions, CSVOptions, GoogleSheetsOptions, ParquetOptions)


class HivePartitioningOptions(object):
Expand Down Expand Up @@ -784,6 +791,24 @@ def schema(self, value):
prop = {"fields": [field.to_api_repr() for field in value]}
self._properties["schema"] = prop

@property
def parquet_options(self):
"""Optional[google.cloud.bigquery.format_options.ParquetOptions]: Additional
properties to set if ``sourceFormat`` is set to PARQUET.

See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ExternalDataConfiguration.FIELDS.parquet_options
"""
prop = self._properties.get("parquetOptions")
if prop is not None:
prop = ParquetOptions.from_api_repr(prop)
return prop

@parquet_options.setter
def parquet_options(self, value):
prop = value.to_api_repr() if value is not None else None
self._properties["parquetOptions"] = prop

def to_api_repr(self) -> dict:
"""Build an API representation of this object.

Expand Down
87 changes: 87 additions & 0 deletions google/cloud/bigquery/format_options.py
@@ -0,0 +1,87 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
from typing import Dict


class ParquetOptions:
"""Additional options if the PARQUET source format is used."""

_SOURCE_FORMAT = "PARQUET"
_RESOURCE_NAME = "parquetOptions"

def __init__(self):
plamut marked this conversation as resolved.
Show resolved Hide resolved
self._properties = {}

@property
def enum_as_string(self) -> bool:
"""Indicates whether to infer Parquet ENUM logical type as STRING instead of
BYTES by default.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ParquetOptions.FIELDS.enum_as_string
"""
return self._properties.get("enumAsString")

@enum_as_string.setter
def enum_as_string(self, value: bool) -> None:
self._properties["enumAsString"] = value

@property
def enable_list_inference(self) -> bool:
"""Indicates whether to use schema inference specifically for Parquet LIST
logical type.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ParquetOptions.FIELDS.enable_list_inference
"""
return self._properties.get("enableListInference")

@enable_list_inference.setter
def enable_list_inference(self, value: bool) -> None:
self._properties["enableListInference"] = value

@classmethod
def from_api_repr(cls, resource: Dict[str, bool]) -> "ParquetOptions":
"""Factory: construct an instance from a resource dict.

Args:
resource (Dict[str, bool]):
Definition of a :class:`~.format_options.ParquetOptions` instance in
the same representation as is returned from the API.

Returns:
:class:`~.format_options.ParquetOptions`:
Configuration parsed from ``resource``.
"""
config = cls()
config._properties = copy.deepcopy(resource)
return config

def to_api_repr(self) -> dict:
"""Build an API representation of this object.

Returns:
Dict[str, bool]:
A dictionary in the format used by the BigQuery API.
"""
result = {}

if self.enum_as_string is not None:
result["enumAsString"] = self.enum_as_string
if self.enable_list_inference is not None:
result["enableListInference"] = self.enable_list_inference

return result
plamut marked this conversation as resolved.
Show resolved Hide resolved
21 changes: 21 additions & 0 deletions google/cloud/bigquery/job/load.py
Expand Up @@ -16,6 +16,7 @@

from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
from google.cloud.bigquery.external_config import HivePartitioningOptions
from google.cloud.bigquery.format_options import ParquetOptions
from google.cloud.bigquery import _helpers
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import _to_schema_fields
Expand Down Expand Up @@ -439,6 +440,26 @@ def write_disposition(self):
def write_disposition(self, value):
self._set_sub_prop("writeDisposition", value)

@property
def parquet_options(self):
"""Optional[google.cloud.bigquery.format_options.ParquetOptions]: Additional
properties to set if ``sourceFormat`` is set to PARQUET.

See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.parquet_options
"""
prop = self._get_sub_prop("parquetOptions")
if prop is not None:
prop = ParquetOptions.from_api_repr(prop)
return prop

@parquet_options.setter
def parquet_options(self, value):
if value is not None:
self._set_sub_prop("parquetOptions", value.to_api_repr())
else:
self._del_sub_prop("parquetOptions")


class LoadJob(_AsyncJob):
"""Asynchronous job for loading data into a table.
Expand Down
35 changes: 35 additions & 0 deletions tests/unit/job/test_load_config.py
Expand Up @@ -700,3 +700,38 @@ def test_write_disposition_setter(self):
self.assertEqual(
config._properties["load"]["writeDisposition"], write_disposition
)

def test_parquet_options_missing(self):
config = self._get_target_class()()
self.assertIsNone(config.parquet_options)

def test_parquet_options_hit(self):
config = self._get_target_class()()
config._properties["load"]["parquetOptions"] = dict(
enumAsString=True, enableListInference=False
)
self.assertTrue(config.parquet_options.enum_as_string)
self.assertFalse(config.parquet_options.enable_list_inference)

def test_parquet_options_setter(self):
from google.cloud.bigquery.format_options import ParquetOptions

parquet_options = ParquetOptions.from_api_repr(
dict(enumAsString=False, enableListInference=True)
)
config = self._get_target_class()()

config.parquet_options = parquet_options
self.assertEqual(
config._properties["load"]["parquetOptions"],
{"enumAsString": False, "enableListInference": True},
)

def test_parquet_options_setter_clearing(self):
config = self._get_target_class()()
config._properties["load"]["parquetOptions"] = dict(
enumAsString=False, enableListInference=True
)

config.parquet_options = None
self.assertNotIn("parquetOptions", config._properties["load"])
49 changes: 49 additions & 0 deletions tests/unit/test_external_config.py
Expand Up @@ -425,6 +425,55 @@ def test_to_api_repr_bigtable(self):

self.assertEqual(got_resource, exp_resource)

def test_from_api_repr_parquet(self):
from google.cloud.bigquery.format_options import ParquetOptions

resource = _copy_and_update(
self.BASE_RESOURCE,
{
"sourceFormat": "PARQUET",
"parquetOptions": {"enumAsString": True, "enableListInference": False},
},
)

ec = external_config.ExternalConfig.from_api_repr(resource)

self._verify_base(ec)
self.assertEqual(ec.source_format, external_config.ExternalSourceFormat.PARQUET)
self.assertIsInstance(ec.options, ParquetOptions)
self.assertTrue(ec.parquet_options.enum_as_string)
self.assertFalse(ec.parquet_options.enable_list_inference)

got_resource = ec.to_api_repr()

self.assertEqual(got_resource, resource)

del resource["parquetOptions"]["enableListInference"]
ec = external_config.ExternalConfig.from_api_repr(resource)
self.assertIsNone(ec.options.enable_list_inference)
got_resource = ec.to_api_repr()
self.assertEqual(got_resource, resource)

def test_to_api_repr_parquet(self):
from google.cloud.bigquery.format_options import ParquetOptions

ec = external_config.ExternalConfig(
external_config.ExternalSourceFormat.PARQUET
)
options = ParquetOptions.from_api_repr(
dict(enumAsString=False, enableListInference=True)
)
ec._options = options

exp_resource = {
"sourceFormat": external_config.ExternalSourceFormat.PARQUET,
"parquetOptions": {"enumAsString": False, "enableListInference": True},
}

got_resource = ec.to_api_repr()

self.assertEqual(got_resource, exp_resource)


def _copy_and_update(d, u):
d = copy.deepcopy(d)
Expand Down
41 changes: 41 additions & 0 deletions tests/unit/test_format_options.py
@@ -0,0 +1,41 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class TestParquetOptions:
@staticmethod
def _get_target_class():
from google.cloud.bigquery.format_options import ParquetOptions

return ParquetOptions

def test_ctor(self):
config = self._get_target_class()()
assert config.enum_as_string is None
assert config.enable_list_inference is None

def test_from_api_repr(self):
config = self._get_target_class().from_api_repr(
{"enumAsString": False, "enableListInference": True}
)
assert not config.enum_as_string
assert config.enable_list_inference

def test_to_api_repr(self):
config = self._get_target_class()()
config.enum_as_string = True
config.enable_list_inference = False

result = config.to_api_repr()
assert result == {"enumAsString": True, "enableListInference": False}