Skip to content

Commit

Permalink
feat: add AvroOptions to configure AVRO external data (#994)
Browse files Browse the repository at this point in the history
* feat: add `AvroOptions` to configure AVRO external data

Also:

* Unify `ExternalConfig` class to use `_properties` for everything. This does
  result in more code, but it should make maintenance easier as it aligns with
  our other mutable resource classes.
* Adds `bigtable_options`, `csv_options`, and `google_sheets_options`
  properties. This aligns with `parquet_options`.

* remove unnecessary check for options in to_api_repr

* add missing tests for to_api_repr

* remove redundant type identifiers
  • Loading branch information
tswast committed Sep 30, 2021
1 parent d9a03b4 commit 1a9431d
Show file tree
Hide file tree
Showing 7 changed files with 518 additions and 48 deletions.
6 changes: 6 additions & 0 deletions docs/format_options.rst
@@ -0,0 +1,6 @@
BigQuery Format Options
=======================

.. automodule:: google.cloud.bigquery.format_options
:members:
:undoc-members:
5 changes: 5 additions & 0 deletions docs/reference.rst
Expand Up @@ -167,6 +167,11 @@ External Configuration
external_config.CSVOptions
external_config.GoogleSheetsOptions

.. toctree::
:maxdepth: 2

format_options


Magics
======
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/__init__.py
Expand Up @@ -50,6 +50,7 @@
from google.cloud.bigquery.external_config import CSVOptions
from google.cloud.bigquery.external_config import GoogleSheetsOptions
from google.cloud.bigquery.external_config import ExternalSourceFormat
from google.cloud.bigquery.format_options import AvroOptions
from google.cloud.bigquery.format_options import ParquetOptions
from google.cloud.bigquery.job import Compression
from google.cloud.bigquery.job import CopyJob
Expand Down Expand Up @@ -144,6 +145,7 @@
"PolicyTagList",
"UDFResource",
"ExternalConfig",
"AvroOptions",
"BigtableOptions",
"BigtableColumnFamily",
"BigtableColumn",
Expand Down
149 changes: 123 additions & 26 deletions google/cloud/bigquery/external_config.py
Expand Up @@ -22,13 +22,13 @@

import base64
import copy
from typing import FrozenSet, Iterable, Optional
from typing import FrozenSet, Iterable, Optional, Union

from google.cloud.bigquery._helpers import _to_bytes
from google.cloud.bigquery._helpers import _bytes_to_json
from google.cloud.bigquery._helpers import _int_or_none
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery.format_options import ParquetOptions
from google.cloud.bigquery.format_options import AvroOptions, ParquetOptions
from google.cloud.bigquery.schema import SchemaField


Expand Down Expand Up @@ -548,7 +548,13 @@ def from_api_repr(cls, resource: dict) -> "GoogleSheetsOptions":
return config


_OPTION_CLASSES = (BigtableOptions, CSVOptions, GoogleSheetsOptions, ParquetOptions)
_OPTION_CLASSES = (
AvroOptions,
BigtableOptions,
CSVOptions,
GoogleSheetsOptions,
ParquetOptions,
)


class HivePartitioningOptions(object):
Expand Down Expand Up @@ -646,11 +652,6 @@ class ExternalConfig(object):

def __init__(self, source_format):
self._properties = {"sourceFormat": source_format}
self._options = None
for optcls in _OPTION_CLASSES:
if source_format == optcls._SOURCE_FORMAT:
self._options = optcls()
break

@property
def source_format(self):
Expand All @@ -663,9 +664,17 @@ def source_format(self):
return self._properties["sourceFormat"]

@property
def options(self):
"""Optional[Dict[str, Any]]: Source-specific options."""
return self._options
def options(self) -> Optional[Union[_OPTION_CLASSES]]:
"""Source-specific options."""
for optcls in _OPTION_CLASSES:
if self.source_format == optcls._SOURCE_FORMAT:
options = optcls()
self._properties.setdefault(optcls._RESOURCE_NAME, {})
options._properties = self._properties[optcls._RESOURCE_NAME]
return options

# No matching source format found.
return None

@property
def autodetect(self):
Expand Down Expand Up @@ -815,23 +824,120 @@ def schema(self, value):
self._properties["schema"] = prop

@property
def parquet_options(self):
"""Optional[google.cloud.bigquery.format_options.ParquetOptions]: Additional
properties to set if ``sourceFormat`` is set to PARQUET.
def avro_options(self) -> Optional[AvroOptions]:
"""Additional properties to set if ``sourceFormat`` is set to AVRO.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ExternalDataConfiguration.FIELDS.avro_options
"""
if self.source_format == ExternalSourceFormat.AVRO:
self._properties.setdefault(AvroOptions._RESOURCE_NAME, {})
resource = self._properties.get(AvroOptions._RESOURCE_NAME)
if resource is None:
return None
options = AvroOptions()
options._properties = resource
return options

@avro_options.setter
def avro_options(self, value):
if self.source_format != ExternalSourceFormat.AVRO:
msg = f"Cannot set Avro options, source format is {self.source_format}"
raise TypeError(msg)
self._properties[AvroOptions._RESOURCE_NAME] = value._properties

@property
def bigtable_options(self) -> Optional[BigtableOptions]:
"""Additional properties to set if ``sourceFormat`` is set to BIGTABLE.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ExternalDataConfiguration.FIELDS.bigtable_options
"""
if self.source_format == ExternalSourceFormat.BIGTABLE:
self._properties.setdefault(BigtableOptions._RESOURCE_NAME, {})
resource = self._properties.get(BigtableOptions._RESOURCE_NAME)
if resource is None:
return None
options = BigtableOptions()
options._properties = resource
return options

@bigtable_options.setter
def bigtable_options(self, value):
if self.source_format != ExternalSourceFormat.BIGTABLE:
msg = f"Cannot set Bigtable options, source format is {self.source_format}"
raise TypeError(msg)
self._properties[BigtableOptions._RESOURCE_NAME] = value._properties

@property
def csv_options(self) -> Optional[CSVOptions]:
"""Additional properties to set if ``sourceFormat`` is set to CSV.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ExternalDataConfiguration.FIELDS.csv_options
"""
if self.source_format == ExternalSourceFormat.CSV:
self._properties.setdefault(CSVOptions._RESOURCE_NAME, {})
resource = self._properties.get(CSVOptions._RESOURCE_NAME)
if resource is None:
return None
options = CSVOptions()
options._properties = resource
return options

@csv_options.setter
def csv_options(self, value):
if self.source_format != ExternalSourceFormat.CSV:
msg = f"Cannot set CSV options, source format is {self.source_format}"
raise TypeError(msg)
self._properties[CSVOptions._RESOURCE_NAME] = value._properties

@property
def google_sheets_options(self) -> Optional[GoogleSheetsOptions]:
"""Additional properties to set if ``sourceFormat`` is set to
GOOGLE_SHEETS.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ExternalDataConfiguration.FIELDS.google_sheets_options
"""
if self.source_format == ExternalSourceFormat.GOOGLE_SHEETS:
self._properties.setdefault(GoogleSheetsOptions._RESOURCE_NAME, {})
resource = self._properties.get(GoogleSheetsOptions._RESOURCE_NAME)
if resource is None:
return None
options = GoogleSheetsOptions()
options._properties = resource
return options

@google_sheets_options.setter
def google_sheets_options(self, value):
if self.source_format != ExternalSourceFormat.GOOGLE_SHEETS:
msg = f"Cannot set Google Sheets options, source format is {self.source_format}"
raise TypeError(msg)
self._properties[GoogleSheetsOptions._RESOURCE_NAME] = value._properties

@property
def parquet_options(self) -> Optional[ParquetOptions]:
"""Additional properties to set if ``sourceFormat`` is set to PARQUET.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ExternalDataConfiguration.FIELDS.parquet_options
"""
if self.source_format != ExternalSourceFormat.PARQUET:
if self.source_format == ExternalSourceFormat.PARQUET:
self._properties.setdefault(ParquetOptions._RESOURCE_NAME, {})
resource = self._properties.get(ParquetOptions._RESOURCE_NAME)
if resource is None:
return None
return self._options
options = ParquetOptions()
options._properties = resource
return options

@parquet_options.setter
def parquet_options(self, value):
if self.source_format != ExternalSourceFormat.PARQUET:
msg = f"Cannot set Parquet options, source format is {self.source_format}"
raise TypeError(msg)
self._options = value
self._properties[ParquetOptions._RESOURCE_NAME] = value._properties

def to_api_repr(self) -> dict:
"""Build an API representation of this object.
Expand All @@ -841,10 +947,6 @@ def to_api_repr(self) -> dict:
A dictionary in the format used by the BigQuery API.
"""
config = copy.deepcopy(self._properties)
if self.options is not None:
r = self.options.to_api_repr()
if r != {}:
config[self.options._RESOURCE_NAME] = r
return config

@classmethod
Expand All @@ -862,10 +964,5 @@ def from_api_repr(cls, resource: dict) -> "ExternalConfig":
ExternalConfig: Configuration parsed from ``resource``.
"""
config = cls(resource["sourceFormat"])
for optcls in _OPTION_CLASSES:
opts = resource.get(optcls._RESOURCE_NAME)
if opts is not None:
config._options = optcls.from_api_repr(opts)
break
config._properties = copy.deepcopy(resource)
return config
54 changes: 53 additions & 1 deletion google/cloud/bigquery/format_options.py
Expand Up @@ -13,7 +13,59 @@
# limitations under the License.

import copy
from typing import Dict
from typing import Dict, Optional


class AvroOptions:
"""Options if source format is set to AVRO."""

_SOURCE_FORMAT = "AVRO"
_RESOURCE_NAME = "avroOptions"

def __init__(self):
self._properties = {}

@property
def use_avro_logical_types(self) -> Optional[bool]:
"""[Optional] If sourceFormat is set to 'AVRO', indicates whether to
interpret logical types as the corresponding BigQuery data type (for
example, TIMESTAMP), instead of using the raw type (for example,
INTEGER).
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#AvroOptions.FIELDS.use_avro_logical_types
"""
return self._properties.get("useAvroLogicalTypes")

@use_avro_logical_types.setter
def use_avro_logical_types(self, value):
self._properties["useAvroLogicalTypes"] = value

@classmethod
def from_api_repr(cls, resource: Dict[str, bool]) -> "AvroOptions":
"""Factory: construct an instance from a resource dict.
Args:
resource (Dict[str, bool]):
Definition of a :class:`~.format_options.AvroOptions` instance in
the same representation as is returned from the API.
Returns:
:class:`~.format_options.AvroOptions`:
Configuration parsed from ``resource``.
"""
config = cls()
config._properties = copy.deepcopy(resource)
return config

def to_api_repr(self) -> dict:
"""Build an API representation of this object.
Returns:
Dict[str, bool]:
A dictionary in the format used by the BigQuery API.
"""
return copy.deepcopy(self._properties)


class ParquetOptions:
Expand Down

0 comments on commit 1a9431d

Please sign in to comment.