Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement transaction identifiers #2327

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open

Conversation

Blajda
Copy link
Collaborator

@Blajda Blajda commented Mar 24, 2024

Description

Enable users to read transactions identifiers on table and allows writing of transactions identifiers for all operations.

Related Issue(s)

@github-actions github-actions bot added the binding/rust Issues for the Rust crate label Mar 24, 2024
@Blajda
Copy link
Collaborator Author

Blajda commented Mar 24, 2024

@roeap Please provide some feedback on how I'm interacting with the log replay.
Transaction Identifiers are at the DeltaTableState level which encapsulates a EagerSnapshot however EagerSnapshot is responsible for using LogReplayStream for reading checkpoint and commit files. Either I move the application transcription into the eager snapshot, read the same files twice, or implement some hook for when the replay is being performed.

I implemented the hook with a visitor trait called ReplayVisitor which can be used whenever a new EagerSnapshot is being created or whenever states are being updated merged. This will allow information from checkpoints & commits to flow upwards depending on what feature are being used.

Copy link
Member

@rtyler rtyler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like exposing Txn 😄 kafka-delta-ingest is the only application I've seen in real life that really makes heavy use of them 😄

@@ -337,6 +339,20 @@ impl Snapshot {
}
}

/// TODO!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

struct PrintVistor {}
impl ReplayVistor for PrintVistor {
fn visit_batch(&mut self, _batch: &RecordBatch) -> DeltaResult<()> {
println!("Hello world!");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

@Blajda Blajda Mar 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still WIP / draft. Opened this get some feedback on the new mechanism of implementing a visitor pattern for hooking into the log replay. Currently implementation is only tested when a brand new table is created. I will need to be tested with reading from existing tables.

@Blajda Blajda marked this pull request as ready for review April 2, 2024 01:18
Nekit2217 added a commit to Nekit2217/delta-rs that referenced this pull request Apr 16, 2024
@roeap
Copy link
Collaborator

roeap commented May 10, 2024

@Blajda - very sorry for the long delay! Will do a review now.

Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, and I do like the visitor pattern to extract more actions during replay.

There are some thoughts I had during review that I am keen to ghet your opinion on.


/// Create a new application transactions. See [`Txn`] for details.
pub fn new_with_last_update(
app_id: &dyn ToString,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
app_id: &dyn ToString,
app_id: impl Into<String>,

personally I usually use Into<String>, since ToString usually gests implemented by the Debug trait which would often not really make much of difference. Also just preference, but I somehow prefer impl .... then again its just preference :)

Comment on lines +33 to +37
ActionType::Txn.schema_field().clone(),
]);
pub(super) static ref CHECKPOINT_SCHEMA: StructType = StructType::new(vec![
ActionType::Add.schema_field().clone(),
ActionType::Txn.schema_field().clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it maybe make sense to make this configurable? The goal is to be as minimal as possible when reading the log and eventuially update the parquet reader to leverage push-down as best we can.

We will always require add / remove but can maybe read txns and others based on config. Another alternative would maybe be to just read the transactions separately and in another PR add a simple caching layer for reading especially the json files?

What do you think?

Comment on lines +50 to +72
fn visit_batch(&mut self, batch: &arrow_array::RecordBatch) -> DeltaResult<()> {
if batch.column_by_name("txn").is_none() {
return Ok(());
}

let txn_col = ex::extract_and_cast::<StructArray>(batch, "txn")?;
let filter = is_not_null(txn_col)?;

let filtered = filter_record_batch(batch, &filter)?;
let arr = ex::extract_and_cast::<StructArray>(&filtered, "txn")?;

let id = ex::extract_and_cast::<StringArray>(arr, "appId")?;
let version = ex::extract_and_cast::<Int64Array>(arr, "version")?;

for idx in 0..id.len() {
let app = ex::read_str(id, idx)?;
let version = ex::read_primitive(version, idx)?;

self.app_transaction_version.insert(app.to_owned(), version);
}

Ok(())
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never actually worked with the transaction IDs, but I think we may have to insert transactrion Ids only when its not yet inserted. Reason being that we replay the log from highes commit to lowest.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Transaction Identifiers
3 participants