-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delta table input connector #1743
Conversation
d4d2866
to
e913e1c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at all of this and I did a little bit of followup reading in the deltalake crate, but I didn't study any of it enough to spot bugs in the details. If you want me to study it in detail, let me know and I'll put in the time to do that.
/// | ||
/// This option can be used to specify the range of event times to include in the snapshot, | ||
/// e.g.: `ts BETWEEN '2005-01-01 00:00:00' AND '2010-12-31 23:59:59'`. | ||
snapshot_filter: Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have to teach the compiler to push this down
let mut receiver_clone = receiver.clone(); | ||
select! { | ||
_ = Self::worker_task_inner(endpoint.clone(), input_stream, receiver, init_status_sender) => { | ||
debug!("delta_table {}: worker task terminated", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only see logging, but no other actions here
|
||
if let Some(timestamp_column) = &self.config.timestamp_column { | ||
// Parse expression only to validate it. | ||
let mut parser = Parser::new(&GenericDialect) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes another sql dialect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes :(
{ | ||
let start = Instant::now(); | ||
let json_file = NamedTempFile::new().unwrap(); | ||
println!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
info?
// to process `buffer_size` records. | ||
let buffer_timeout_ms = 100; | ||
|
||
println!("delta_table_output_test: preparing input file"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is allowed in test code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool to see the Delta table input counterpart to the existing output, great work :) It would be useful to have a demo to showcase its usage (e.g., how to specify format).
endpoint_config, | ||
endpoint.is_fault_tolerant(), | ||
); | ||
let reader = match endpoint { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding a connector vs. an integrated connector, is the following understanding correct?
- Usually a connector is decoupled from the format. The connector inherently has some "unit" in which its data is separated, it being one unit (e.g., a single file as a whole), or many units (e.g., Kafka messages). This "unit" of data is considered a binary blob that the format turns into tuples the DBSP circuit can ingest.
- An integrated connector has a fixed format.
Would it not be simpler to have a format variant called Integrated
or None
, and upon matching to a transport which is integrated and for which the format variant is not that, throw an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding a connector vs. an integrated connector, is the following understanding correct?
* Usually a connector is decoupled from the format. The connector inherently has some "unit" in which its data is separated, it being one unit (e.g., a single file as a whole), or many units (e.g., Kafka messages). This "unit" of data is considered a binary blob that the format turns into tuples the DBSP circuit can ingest. * An integrated connector has a fixed format.
Yep, this is correct.
Would it not be simpler to have a format variant called
Integrated
orNone
, and upon matching to a transport which is integrated and for which the format variant is not that, throw an error?
We could do that, but I don't think this will simplify this code. We need two branches to build two different input pipelines, the "normal' one consisting of transport -> input_probe -> format parser
, and an integrated one that only has one component (the integrated adapter). Maybe I didn't understand the suggestion. Can you clarify what you had in mind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not as intimately familiar with this particular part of the code base; conceptually I imagined the building to be a function which takes as parameters (1) transport configuration and (2) format configuration, and returns a built connector. What happens internally in terms of validation, is up to the implementation of the connector transport (though, probably will make use of similar helper functions). If a combination of (transport, format) which is not valid is given, it returns an error.
I guess it does not matter as the API does not differ with the implementation, as such whichever works best in current code base sounds good. What happens currently if a format is provided even though the transport does not support it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see what you mean now. The advantage of the current way of doing this is that non-integrated connectors don't even need to know that data formats exist. But I think we could still implement your suggestion by having a special integrated connector implementation act as a wrapper around "regular" connectors and do the check. That would be cleaner, but I probably won't do this in this PR though.
What happens currently if a format is provided even though the transport does not support it?
Good point, it will be ignored, I'll fix this.
This is a small cleanup of the controller API. Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
This adapter supports ingesting a delta table in three modes: - follow - incrementally ingest transaction log starting from a specified (or the latest) version. - snapshot - read table snapshot, optionally sorted by a timestamp column and optionally filtered by a user-provided predicate, e.g., specifying a time range. - snapshot-and-follow - combines the above two modes. The connector was tested with local FS and with S3, including with a managed delta table created by databricks in S3. There are a couple of todos left. The biggest one is that sorting by timestamp can be very expensive. A better solution is to implement something like this: https://docs.databricks.com/en/structured-streaming/delta-lake.html#process-initial-snapshot-without-data-being-dropped The second issue is that in follow mode we currently read one file at a time from S3. We should look into using datafusion's `ListingTable`, which (hopefully) parallelizes the process. Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
This adapter supports ingesting a delta table in three modes:
follow - incrementally ingest transaction log starting from a specified (or the latest) version.
snapshot - read table snapshot, optionally sorted by a timestamp column and optionally filtered by a user-provided predicate, e.g., specifying a time range.
snapshot-and-follow - combines the above two modes.
The connector was tested with local FS and with S3, including with a managed delta table created by databricks in S3.
There are a couple of todos left. The biggest one is that sorting by timestamp can be very expensive. A better solution is to implement something like this.
The second issue is that in follow mode we currently read one file at a time from S3. We should look into using datafusion's
ListingTable
, which (hopefully) parallelizes the process.Is this a user-visible change (yes/no): yes