-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consider catalog file when populating a schema message #90
base: master
Are you sure you want to change the base?
Conversation
The provided catalog.json should be used for the schema message generation to mismatches between schema and extracted records.
Use provided catalog in schema message
Hi @agrandotech, thanks for your contribution! In order for us to evaluate and accept your PR, we ask that you sign a contribution license agreement. It's all electronic and will take just minutes. |
if "anyOf" not in property_schema and "type" not in property_schema: | ||
return None # Could not detect data type | ||
for property_type in property_schema.get("anyOf", [property_schema.get("type")]): | ||
if "object" in property_type or property_type == "object": | ||
return True | ||
return False | ||
|
||
|
||
def is_property_selected( | ||
stream_name, | ||
breadcrumb, | ||
): | ||
"""Return True if the property is selected for extract. | ||
Breadcrumb of `[]` or `None` indicates the stream itself. Otherwise, the | ||
breadcrumb is the path to a property within the stream. | ||
The code is based on https://github.com/meltano/sdk/blob/c9c0967b0caca51fe7c87082f9e7c5dd54fa5dfa/singer_sdk/helpers/_catalog.py#L63 | ||
""" | ||
breadcrumb = breadcrumb or tuple() | ||
if isinstance(breadcrumb, str): | ||
breadcrumb = tuple([breadcrumb]) | ||
|
||
if not Context.catalog: | ||
return True | ||
|
||
catalog_entry = Context.get_catalog_entry(stream_name).to_dict() | ||
if not catalog_entry: | ||
LOGGER.warning(f"Catalog entry missing for '{stream_name}'. Skipping.") | ||
return False | ||
|
||
if not catalog_entry.get('metadata'): | ||
return True | ||
|
||
md_map = metadata.to_map(catalog_entry['metadata']) | ||
md_entry = md_map.get(breadcrumb) | ||
parent_value = None | ||
if len(breadcrumb) > 0: | ||
parent_breadcrumb = tuple(list(breadcrumb)[:-2]) | ||
parent_value = is_property_selected( | ||
stream_name, parent_breadcrumb | ||
) | ||
if parent_value is False: | ||
return parent_value | ||
|
||
if not md_entry: | ||
LOGGER.warning( | ||
f"Catalog entry missing for '{stream_name}':'{breadcrumb}'. " | ||
f"Using parent value of selected={parent_value}." | ||
) | ||
return parent_value or False | ||
|
||
if md_entry.get("inclusion") == "unsupported": | ||
return False | ||
|
||
if md_entry.get("inclusion") == "automatic": | ||
if md_entry.get("selected") is False: | ||
LOGGER.warning( | ||
f"Property '{':'.join(breadcrumb)}' was deselected while also set" | ||
"for automatic inclusion. Ignoring selected==False input." | ||
) | ||
return True | ||
|
||
if "selected" in md_entry: | ||
return bool(md_entry['selected']) | ||
|
||
if md_entry.get('inclusion') == 'available': | ||
return True | ||
|
||
raise ValueError( | ||
f"Could not detect selection status for '{stream_name}' breadcrumb " | ||
f"'{breadcrumb}' using metadata: {md_map}" | ||
) | ||
|
||
|
||
def pop_deselected_schema( | ||
schema, | ||
stream_name, | ||
breadcrumb, | ||
): | ||
"""Remove anything from schema that is not selected. | ||
Walk through schema, starting at the index in breadcrumb, recursively updating in | ||
place. | ||
This code is based on https://github.com/meltano/sdk/blob/c9c0967b0caca51fe7c87082f9e7c5dd54fa5dfa/singer_sdk/helpers/_catalog.py#L146 | ||
""" | ||
for property_name, val in list(schema.get("properties", {}).items()): | ||
property_breadcrumb = tuple( | ||
list(breadcrumb) + ["properties", property_name] | ||
) | ||
selected = is_property_selected( | ||
stream_name, property_breadcrumb | ||
) | ||
if not selected: | ||
schema["properties"].pop(property_name) | ||
continue | ||
|
||
if is_object_type(val): | ||
# call recursively in case any subproperties are deselected. | ||
pop_deselected_schema( | ||
val, stream_name, property_breadcrumb | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewer note: Those helper functions are used to remove unselected fields from the schema message
schema = load_schema(stream.tap_stream_id) | ||
singer.write_schema(stream.tap_stream_id, schema, stream.pk_fields) | ||
stream_id = stream.tap_stream_id | ||
catalog_entry = Context.get_catalog_entry(stream_id).to_dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewer note: Using Context.get_catalog_entry(stream_id).to_dict()
ensures that the catalog file is used for the schema message creation in case one is provided. If not, it uses the default schema from load_schema
You did it @agrandotech! Thank you for signing the Singer Contribution License Agreement. |
I guess it's an old PR, but would the maintainers interested in getting this merged? |
Description of change
In the current implementation, the catalog.json being passed is not considered for the creation of the schema message.
The schema always matches the schema in the schema json files within the repository.
This leads to the following issues:
We encountered this issue when we synced to a BigQuery target. The table created did not match the data in the records.
Manual QA steps
"selected": false
-> those fields should not show up in the schema messageRisks
Rollback steps