Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta table input connector #1743

Merged
merged 2 commits into from
May 15, 2024
Merged

Delta table input connector #1743

merged 2 commits into from
May 15, 2024

Conversation

ryzhyk
Copy link
Contributor

@ryzhyk ryzhyk commented May 11, 2024

This adapter supports ingesting a delta table in three modes:

  • follow - incrementally ingest transaction log starting from a specified (or the latest) version.

  • snapshot - read table snapshot, optionally sorted by a timestamp column and optionally filtered by a user-provided predicate, e.g., specifying a time range.

  • snapshot-and-follow - combines the above two modes.

The connector was tested with local FS and with S3, including with a managed delta table created by databricks in S3.

There are a couple of todos left. The biggest one is that sorting by timestamp can be very expensive. A better solution is to implement something like this.

The second issue is that in follow mode we currently read one file at a time from S3. We should look into using datafusion's ListingTable, which (hopefully) parallelizes the process.

Is this a user-visible change (yes/no): yes

@ryzhyk ryzhyk requested review from blp and snkas May 11, 2024 22:21
@ryzhyk ryzhyk force-pushed the delta-input branch 2 times, most recently from d4d2866 to e913e1c Compare May 11, 2024 23:58
Copy link
Member

@blp blp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at all of this and I did a little bit of followup reading in the deltalake crate, but I didn't study any of it enough to spot bugs in the details. If you want me to study it in detail, let me know and I'll put in the time to do that.

crates/adapters/src/integrated/delta_table/input.rs Outdated Show resolved Hide resolved
///
/// This option can be used to specify the range of event times to include in the snapshot,
/// e.g.: `ts BETWEEN '2005-01-01 00:00:00' AND '2010-12-31 23:59:59'`.
snapshot_filter: Option<String>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have to teach the compiler to push this down

crates/pipeline-types/src/transport/delta_table.rs Outdated Show resolved Hide resolved
let mut receiver_clone = receiver.clone();
select! {
_ = Self::worker_task_inner(endpoint.clone(), input_stream, receiver, init_status_sender) => {
debug!("delta_table {}: worker task terminated",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only see logging, but no other actions here


if let Some(timestamp_column) = &self.config.timestamp_column {
// Parse expression only to validate it.
let mut parser = Parser::new(&GenericDialect)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes another sql dialect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes :(

{
let start = Instant::now();
let json_file = NamedTempFile::new().unwrap();
println!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

info?

crates/adapters/src/integrated/delta_table/test.rs Outdated Show resolved Hide resolved
crates/adapters/src/integrated/delta_table/test.rs Outdated Show resolved Hide resolved
// to process `buffer_size` records.
let buffer_timeout_ms = 100;

println!("delta_table_output_test: preparing input file");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is allowed in test code?

crates/adapters/src/integrated/delta_table/test.rs Outdated Show resolved Hide resolved
crates/adapters/src/integrated/delta_table/test.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@snkas snkas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool to see the Delta table input counterpart to the existing output, great work :) It would be useful to have a demo to showcase its usage (e.g., how to specify format).

endpoint_config,
endpoint.is_fault_tolerant(),
);
let reader = match endpoint {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding a connector vs. an integrated connector, is the following understanding correct?

  • Usually a connector is decoupled from the format. The connector inherently has some "unit" in which its data is separated, it being one unit (e.g., a single file as a whole), or many units (e.g., Kafka messages). This "unit" of data is considered a binary blob that the format turns into tuples the DBSP circuit can ingest.
  • An integrated connector has a fixed format.

Would it not be simpler to have a format variant called Integrated or None, and upon matching to a transport which is integrated and for which the format variant is not that, throw an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding a connector vs. an integrated connector, is the following understanding correct?

* Usually a connector is decoupled from the format. The connector inherently has some "unit" in which its data is separated, it being one unit (e.g., a single file as a whole), or many units (e.g., Kafka messages). This "unit" of data is considered a binary blob that the format turns into tuples the DBSP circuit can ingest.

* An integrated connector has a fixed format.

Yep, this is correct.

Would it not be simpler to have a format variant called Integrated or None, and upon matching to a transport which is integrated and for which the format variant is not that, throw an error?

We could do that, but I don't think this will simplify this code. We need two branches to build two different input pipelines, the "normal' one consisting of transport -> input_probe -> format parser, and an integrated one that only has one component (the integrated adapter). Maybe I didn't understand the suggestion. Can you clarify what you had in mind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not as intimately familiar with this particular part of the code base; conceptually I imagined the building to be a function which takes as parameters (1) transport configuration and (2) format configuration, and returns a built connector. What happens internally in terms of validation, is up to the implementation of the connector transport (though, probably will make use of similar helper functions). If a combination of (transport, format) which is not valid is given, it returns an error.

I guess it does not matter as the API does not differ with the implementation, as such whichever works best in current code base sounds good. What happens currently if a format is provided even though the transport does not support it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see what you mean now. The advantage of the current way of doing this is that non-integrated connectors don't even need to know that data formats exist. But I think we could still implement your suggestion by having a special integrated connector implementation act as a wrapper around "regular" connectors and do the check. That would be cleaner, but I probably won't do this in this PR though.

What happens currently if a format is provided even though the transport does not support it?

Good point, it will be ignored, I'll fix this.

This is a small cleanup of the controller API.

Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
This adapter supports ingesting a delta table in three modes:

- follow - incrementally ingest transaction log starting from a
  specified (or the latest) version.

- snapshot - read table snapshot, optionally sorted by a timestamp
  column and optionally filtered by a user-provided predicate, e.g.,
  specifying a time range.

- snapshot-and-follow - combines the above two modes.

The connector was tested with local FS and with S3, including with a
managed delta table created by databricks in S3.

There are a couple of todos left.  The biggest one is that sorting by
timestamp can be very expensive.  A better solution is to implement
something like this:
https://docs.databricks.com/en/structured-streaming/delta-lake.html#process-initial-snapshot-without-data-being-dropped

The second issue is that in follow mode we currently read one file at a
time from S3.  We should look into using datafusion's `ListingTable`,
which (hopefully) parallelizes the process.

Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
@ryzhyk ryzhyk merged commit ccefc5e into main May 15, 2024
5 checks passed
@ryzhyk ryzhyk deleted the delta-input branch May 15, 2024 18:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants