Skip to content

Commit

Permalink
Support schema refs by file (#25)
Browse files Browse the repository at this point in the history
* Allow for providing references by file name

* minor bump

* fix case where null is not first in union

* black

* Address review changes

* black

* Review changes

* Eliminate duplicate enums

* Switch to type 'List' for lists in typeddict

* Fix list and optional detection

* corrected bad testcase

* review changes
  • Loading branch information
wcn00 committed Apr 26, 2021
1 parent e0220b6 commit 9ac2823
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 270 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# Changelog
## 0.10.0
### Changed
- Added the ability to provide schema references by their file names. Useful when the
- schema types don't match the file names and fastavro can't automatically load them.

## 0.9.0
### Changed
- Added array type support
Expand Down
2 changes: 1 addition & 1 deletion avro_to_python_types/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.9.0"
__version__ = "0.10.0"
from .typed_dict_from_schema import (
typed_dict_from_schema_file,
typed_dict_from_schema_string,
Expand Down
1 change: 1 addition & 0 deletions avro_to_python_types/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
SYMBOLS = "symbols"
TYPE = "type"
ITEMS = "items"
LIST = "List"
257 changes: 179 additions & 78 deletions avro_to_python_types/typed_dict_from_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
from .generate_typed_dict import GenerateTypedDict
from .schema_mapping import prim_to_type, logical_to_python_type
from enum import Enum
from fastavro.schema import load_schema, expand_schema, parse_schema
from fastavro.schema import (
expand_schema,
fullname,
load_schema,
load_schema_ordered,
parse_schema,
)
import ast
import astunparse
import black
Expand All @@ -17,6 +23,7 @@
SYMBOLS,
TYPE,
ITEMS,
LIST,
)


Expand Down Expand Up @@ -57,6 +64,14 @@ def dict_type_is_of_type(dict_type, type_name):
return False


def get_union_type(union_list: list):
"""Get the non null type from the union list"""
for field in union_list:
if isinstance(field, str) and field == NULL:
continue
return field


def get_type(types):
if not isinstance(types, list) and not isinstance(types, dict):
return types
Expand Down Expand Up @@ -129,8 +144,8 @@ def get_logical_type(types):
raise ValueError(f"unexpected error in logical type: {types}")


def resolve_enum_str(enums: list):
return "\n\n".join(enums) if len(enums) > 0 else ""
def resolve_enum_str(enums: dict):
return "\n\n".join(enums.values()) if len(enums) > 0 else ""


def _dedupe_ast(tree):
Expand Down Expand Up @@ -158,103 +173,185 @@ def _dedupe_ast(tree):

def types_for_schema(schema):
"""
This is the main function for the module. It will parse a schema and return a concrete type
which extends the TypedDict class. It currently supports all primitive types as well as
logical types except for the microsecond precision time types.
This is the main function for the module. It will parse a schema and return a
concrete type which extends the TypedDict class. It currently supports most
primitive, logical, union and array types. It does not support microsecond
precision time types and durations.
"""
body = []
tree = ast.Module(body)
body = tree.body

def type_for_schema_record(record_schema, imports, enums):
def type_for_schema_record(
record_schema, imports, enums, complex_types, import_flags
):
type_name = "".join(
word[0].upper() + word[1:] for word in record_schema["name"].split(".")
)
our_type = GenerateTypedDict(type_name)
for field in record_schema[FIELDS]:
name = field[NAME]
# nested
if field_type_is_of_type(field[TYPE], AvroSubType.RECORD.value):
nested = type_for_schema_record(field[TYPE], imports, enums)
body.append(nested.tree)
if is_nullable(field):
our_type.add_optional_element(name, nested.name)
else:
our_type.add_required_element(name, nested.name)
continue
# logical
if field_type_is_of_type(field[TYPE], LOGICAL_TYPE):
logical_type = logical_to_python_type[get_logical_type(field[TYPE])]
imports.append(
"from {} import {}\n".format(
logical_type.split(".")[0], logical_type.split(".")[1]
try:
our_type = GenerateTypedDict(type_name)
for field in record_schema[FIELDS]:
name = field[NAME]
if field_type_is_of_type(
field[TYPE], AvroSubType.RECORD.value
) and isinstance(field[TYPE], list):
"""union with complex type - This section processes the type from a
union containing an expanded type, recursively. Fastavro will expand
the types for the union the first time it encounters them and use a
reference thereafter. So the first time it will be prcessed here and
subsequently in the primitives section.
"""
union_field = get_union_type(field[TYPE])
nested = type_for_schema_record(
union_field, imports, enums, complex_types, import_flags
)
)
if is_nullable(field):
our_type.add_optional_element(name, logical_type.split(".")[1])
else:
our_type.add_required_element(name, logical_type.split(".")[1])
# enum
elif field_type_is_of_type(field[TYPE], AvroSubType.ENUM.value):
imports.append(
"from {} import {}\n".format(AvroSubType.ENUM.value, ENUM_CLASS)
)
"""
The enum class name is composed the same way as the typedict
name is
"""
enum_class_name = "".join(
word[0].upper() + word[1:]
for word in get_enum_class(field[TYPE]).split(".")
)
enum_class = f"class {enum_class_name}(Enum):\n"
for e in get_enum_symbols(field[TYPE]):
enum_class += f" {e} = '{e}'\n"
enum_class += "\n\n"
enums.append(enum_class)
if is_nullable(field):
our_type.add_optional_element(name, enum_class_name)
else:
our_type.add_required_element(name, enum_class_name)
# array
elif field_type_is_of_type(field[TYPE], AvroSubType.ARRAY.value):
"""
Arrays are either primitive types or nested records
"""
items_type = get_array_items(field[TYPE])
if field_type_is_of_type(items_type, AvroSubType.RECORD.value):
nested = type_for_schema_record(items_type, imports, enums)
body.append(nested.tree)
if is_nullable(field):
our_type.add_optional_element(name, f"list({nested.name})")
our_type.add_optional_element(name, nested.name)
import_flags[OPTIONAL] = True
else:
our_type.add_required_element(name, f"list({nested.name})")
else:
our_type.add_required_element(name, nested.name)
complex_types.append(nested.name)
elif field_type_is_of_type(field[TYPE], AvroSubType.RECORD.value):
"""nested - This processes an expanded nested type recursively."""
nested = type_for_schema_record(
field[TYPE], imports, enums, complex_types, import_flags
)
body.append(nested.tree)
if is_nullable(field):
our_type.add_optional_element(
name, f"list({prim_to_type[items_type]})"
our_type.add_optional_element(name, nested.name)
import_flags[OPTIONAL] = True
else:
our_type.add_required_element(name, nested.name)
complex_types.append(nested.name)
elif field_type_is_of_type(field[TYPE], LOGICAL_TYPE):
"""logical - This section processes logical types. This necessitates
importing packages like date, datetime, uuid and decimal hence the
imports collection"""
logical_type = logical_to_python_type[get_logical_type(field[TYPE])]
imports.append(
"from {} import {}\n".format(
logical_type.split(".")[0], logical_type.split(".")[1]
)
)
if is_nullable(field):
our_type.add_optional_element(name, logical_type.split(".")[1])
import_flags[OPTIONAL] = True
else:
our_type.add_required_element(
name, f"list({prim_to_type[items_type]})"
our_type.add_required_element(name, logical_type.split(".")[1])
elif field_type_is_of_type(field[TYPE], AvroSubType.ENUM.value):
"""Enumerations are processed by adding an enum class to the
python file. At the moment that means that the same enum used in
different schemas will result in that enum being duplicated, but
with a different name. We can revisit that if necessary.
"""
imports.append(
"from {} import {}\n".format(AvroSubType.ENUM.value, ENUM_CLASS)
)
""" The enum class name is composed the same way as the typedict
name is """
enum_class_name = "".join(
word[0].upper() + word[1:]
for word in get_enum_class(field[TYPE]).split(".")
)
enum_class = f"class {enum_class_name}(Enum):\n"
if not enum_class in enums.keys():
for e in get_enum_symbols(field[TYPE]):
enum_class += f" {e} = '{e}'\n"
enum_class += "\n\n"
enums[enum_class] = enum_class
if is_nullable(field):
our_type.add_optional_element(name, enum_class_name)
import_flags[OPTIONAL] = True
else:
our_type.add_required_element(name, enum_class_name)
complex_types.append(enum_class_name)
# array
elif field_type_is_of_type(field[TYPE], AvroSubType.ARRAY.value):
"""Array types are either primitive or complex. Note that the
element added to the ast tree is a list of some type element"""
items_type = get_array_items(field[TYPE])
if field_type_is_of_type(items_type, AvroSubType.RECORD.value):
"""Arrays is for a complex nested type"""
nested = type_for_schema_record(
items_type, imports, enums, complex_types, import_flags
)
# primitive
else:
_type = get_type(field[TYPE])
if is_nullable(field):
our_type.add_optional_element(name, prim_to_type[_type])
body.append(nested.tree)
if is_nullable(field):
our_type.add_optional_element(name, f"List[{nested.name}]")
import_flags[OPTIONAL] = True
else:
our_type.add_required_element(name, f"List[{nested.name}]")
complex_types.append(nested.name)
else:
"""Array is of a prmitive type"""
if not items_type in prim_to_type.keys():
items_type_name = "".join(
word[0].upper() + word[1:]
for word in items_type.split(".")
)
array_type = (
items_type_name
if items_type_name in complex_types
else prim_to_type[items_type]
)
else:
array_type = prim_to_type[items_type]
if is_nullable(field):
our_type.add_optional_element(name, f"List[{array_type}]")
import_flags[OPTIONAL] = True
else:
our_type.add_required_element(name, f"List[{array_type}]")
import_flags[LIST] = True
# primitive
else:
our_type.add_required_element(name, prim_to_type[_type])
"""Ths section process a primitive type or a named complex type."""
if isinstance(field[TYPE], list):
for fld in field[TYPE]:
if fld != NULL:
field_type = fld
else:
field_type = get_type(field[TYPE])
if not field_type in prim_to_type.keys():
field_type_name = "".join(
word[0].upper() + word[1:] for word in field_type.split(".")
)
reference_type = (
field_type_name
if field_type_name in complex_types
else prim_to_type[field_type]
)
else:
reference_type = prim_to_type[field_type]
if is_nullable(field):
our_type.add_optional_element(name, reference_type)
import_flags[OPTIONAL] = True
else:
our_type.add_required_element(name, reference_type)
except Exception as e:
"""If an error occurs while processing a field provide an error message
containing the schema and field where the problem is so that they
have a fighting chance to fix the prblem"""
raise Exception(
f"Failed to transform schema: [{fullname(schema)}] field: "
+ f"[{record_schema[NAME]}.{name}] for reason {e}"
)
return our_type

imports = []
enums = []
main_type = type_for_schema_record(schema, imports, enums)
enums = {}
complex_types = []
import_flags = {OPTIONAL: False, LIST: False}
main_type = type_for_schema_record(
schema, imports, enums, complex_types, import_flags
)

additional_types = []
# import the Optional type only if required
if OPTIONAL in ast.dump(main_type.tree):
if import_flags[OPTIONAL]:
additional_types.append(OPTIONAL)
if import_flags[LIST]:
additional_types.append(LIST)
additional_types.append("TypedDict")
additional_types_as_str = ", ".join(additional_types)

Expand All @@ -276,6 +373,10 @@ def typed_dict_from_schema_string(schema_string):
return types_for_schema(schema)


def typed_dict_from_schema_file(schema_path):
schema = expand_schema(load_schema(schema_path))
def typed_dict_from_schema_file(schema_path, referenced_schema_files=None):
if referenced_schema_files:
referenced_schema_files.append(schema_path)
schema = load_schema_ordered(referenced_schema_files)
else:
schema = expand_schema(load_schema(schema_path))
return types_for_schema(schema)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "avro-to-python-types"
version = "0.9.0"
version = "0.10.0"
description = "A library for converting avro schemas to python types."
readme = "README.md"
authors = ["Dan Green-Leipciger"]
Expand Down

0 comments on commit 9ac2823

Please sign in to comment.