Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce schema evolution via the -S flag #164

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

rtyler
Copy link
Member

@rtyler rtyler commented Jan 7, 2024

This set of changes implements more nuanced handling of Delta vs. message schema with the introduction of the -S command line flag which enables schema evolutions.

In order for schema evolution to work, there is a necessary performance hit. kafka-delta-ingest must determine the schema of every message that is read from Kafka, infer its schema, and if necessary add nullable columns to the Delta table. This is related to similar work in delta-rs for the RecordBatchWriter but deviates because of the mechanism by which RecordBatches and schema is handled in kafka-delta-ingest.

Sponsored-by: Raft LLC

NOTE: This pull request builds on #162

@rtyler rtyler force-pushed the schema-evolution-from-outside-in branch 3 times, most recently from e4a1ade to c1bcd37 Compare January 8, 2024 05:51
@rtyler rtyler changed the title Implementing stricter schema conformance and evolution Introduce schema evolution via the -S flag Jan 8, 2024
@rtyler rtyler force-pushed the schema-evolution-from-outside-in branch from c1bcd37 to 820421f Compare January 8, 2024 22:53
I stumbled into this while pilfering code from kafka-delta-ingest for another
project and discovered that the code in `write_values` which does
`record_batch.schema() != arrow_schema` doesn't do what we think it does.

Basically if `Decoder` "works" the schema it's going to return is just the
schema passed into it. It has no bearing on whether the JSON has the same
schema. Don't ask me why.

Using the reader's `infer_json_schema_*` functions can provide a Schema that is
useful for comparison:

        let mut value_iter = json_buffer.iter().map(|j| Ok(j.to_owned()));
        let json_schema = infer_json_schema_from_iterator(value_iter.clone()).expect("Failed to infer!");
        let decoder = Decoder::new(Arc::new(json_schema), options);
        if let Some(batch) = decoder.next_batch(&mut value_iter).expect("Failed to create RecordBatch") {
            assert_eq!(batch.schema(), arrow_schema_ref, "Schemas don't match!");
        }

What's even more interesting, is that after a certain number of fields are
removed, the Decoder no longer pretends it can Decode the JSON. I am baffled as
to why.
…l refactor

The intention here is to enable more consistent schema handling within
the writers

Sponsored-by: Raft LLC
…e writers

The DeserializedMessage carries optional inferred schema information
along with the message itself. This is useful for understanding whether
schema evolution hould happen "later" in the message processing
pipeline.

The downside of this behavior is that there will be performance impact
as arrow_json does schema inference.

Sponsored-by: Raft LLC
Turning avro off drops about 50 crates from the default build, so useful
for development, but the code would need to be cleaned up to remove this
from the default features list

See #163
Identified by `cargo +nightly udeps`
This change is a little wrapped up in the introduction of
DeserializedMessage but the trade-off for development targeting S3 is
that I am linking 382 crates every cycle as opposed to 451.

Fixes #163
I don't know why the impl was way down there 😄
This commit introduces some interplay between the IngestProcessor and
DataWriter, the latter of which needs to keep track of whether or not it
has a changed schema.

What should be done with that changed schema must necessarily live in
IngestProcessor since that will perform the Delta transaction commits at
the tail end of batch processing.

There is some potential mismatches between the schema in storage and
what the DataWriter has, so this change tries to run the runloop again
if the current schema and the evolved schema are incompatible

Closes #131

Sponsored-by: Raft LLC
This will ensure the non-evolution case stands relatively speedy!

Sponsored-by: Raft LLC
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant