-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce schema evolution via the -S flag #164
Draft
rtyler
wants to merge
11
commits into
main
Choose a base branch
from
schema-evolution-from-outside-in
base: main
Could not load branches
Branch not found: {{ refName }}
Could not load tags
Nothing to show
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
rtyler
force-pushed
the
schema-evolution-from-outside-in
branch
3 times, most recently
from
January 8, 2024 05:51
e4a1ade
to
c1bcd37
Compare
rtyler
changed the title
Implementing stricter schema conformance and evolution
Introduce schema evolution via the -S flag
Jan 8, 2024
rtyler
force-pushed
the
schema-evolution-from-outside-in
branch
from
January 8, 2024 22:53
c1bcd37
to
820421f
Compare
I stumbled into this while pilfering code from kafka-delta-ingest for another project and discovered that the code in `write_values` which does `record_batch.schema() != arrow_schema` doesn't do what we think it does. Basically if `Decoder` "works" the schema it's going to return is just the schema passed into it. It has no bearing on whether the JSON has the same schema. Don't ask me why. Using the reader's `infer_json_schema_*` functions can provide a Schema that is useful for comparison: let mut value_iter = json_buffer.iter().map(|j| Ok(j.to_owned())); let json_schema = infer_json_schema_from_iterator(value_iter.clone()).expect("Failed to infer!"); let decoder = Decoder::new(Arc::new(json_schema), options); if let Some(batch) = decoder.next_batch(&mut value_iter).expect("Failed to create RecordBatch") { assert_eq!(batch.schema(), arrow_schema_ref, "Schemas don't match!"); } What's even more interesting, is that after a certain number of fields are removed, the Decoder no longer pretends it can Decode the JSON. I am baffled as to why.
…l refactor The intention here is to enable more consistent schema handling within the writers Sponsored-by: Raft LLC
…e writers The DeserializedMessage carries optional inferred schema information along with the message itself. This is useful for understanding whether schema evolution hould happen "later" in the message processing pipeline. The downside of this behavior is that there will be performance impact as arrow_json does schema inference. Sponsored-by: Raft LLC
Turning avro off drops about 50 crates from the default build, so useful for development, but the code would need to be cleaned up to remove this from the default features list See #163
Identified by `cargo +nightly udeps`
This change is a little wrapped up in the introduction of DeserializedMessage but the trade-off for development targeting S3 is that I am linking 382 crates every cycle as opposed to 451. Fixes #163
I don't know why the impl was way down there 😄
This commit introduces some interplay between the IngestProcessor and DataWriter, the latter of which needs to keep track of whether or not it has a changed schema. What should be done with that changed schema must necessarily live in IngestProcessor since that will perform the Delta transaction commits at the tail end of batch processing. There is some potential mismatches between the schema in storage and what the DataWriter has, so this change tries to run the runloop again if the current schema and the evolved schema are incompatible Closes #131 Sponsored-by: Raft LLC
This will ensure the non-evolution case stands relatively speedy! Sponsored-by: Raft LLC
rtyler
force-pushed
the
schema-evolution-from-outside-in
branch
from
January 9, 2024 19:13
820421f
to
50e81da
Compare
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This set of changes implements more nuanced handling of Delta vs. message schema with the introduction of the
-S
command line flag which enables schema evolutions.In order for schema evolution to work, there is a necessary performance hit. kafka-delta-ingest must determine the schema of every message that is read from Kafka, infer its schema, and if necessary add nullable columns to the Delta table. This is related to similar work in delta-rs for the
RecordBatchWriter
but deviates because of the mechanism by which RecordBatches and schema is handled in kafka-delta-ingest.Sponsored-by: Raft LLC
NOTE: This pull request builds on #162