Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement append only task writer. #74

Open
Tracked by #10641
liurenjie1024 opened this issue Jul 5, 2023 · 7 comments
Open
Tracked by #10641

Implement append only task writer. #74

liurenjie1024 opened this issue Jul 5, 2023 · 7 comments

Comments

@liurenjie1024
Copy link
Contributor

An append-only task writer accepts an optional partitioner, file appender factory as arguments. When it receives records, it dispatches records to different file writer according to the partition key(generated by partitioner), and inserts it. When it finished, it returns generated data file structs.

Notice that this will be the api used directly by compute engines such as risingwave, ballista. We can refer to following implementation as an example.

https://github.com/apache/iceberg/blob/e340ad5be04e902398c576f431810c3dfa4fe717/core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java#L28

@ZENOTME
Copy link
Contributor

ZENOTME commented Jul 6, 2023

How did this api used to by compute engines? Such as:

// Create a writer to write data file
let task_writer = table.data_writer();
task_writer.write()
let data_files = task_writer.close();

// apply these data file to the table
let tx = table.transaction;
tx.apply(data_file);
tx.commit();

@Xuanwo
Copy link
Contributor

Xuanwo commented Jul 6, 2023

How did this api used to by compute engines?

Let's prioritize making it functional for now and refine the API later.

@liurenjie1024
Copy link
Contributor Author

How did this api used to by compute engines? Such as:

Let's use risingwave's new coordinated sink as an example:

  1. Each each sink will contains a task writer
  2. When it needs to commit, it calls task writer's commit methods to get data files. The data files will be serialized and passed to sink coordinator
  3. Sink coordinator will iceberg table apis to create a new snapshot and do commitment

@ZENOTME
Copy link
Contributor

ZENOTME commented Jul 10, 2023

This issue can be close now

@liurenjie1024
Copy link
Contributor Author

We need to add support for partition spec. cc @ZENOTME

@ZENOTME
Copy link
Contributor

ZENOTME commented Jul 13, 2023

I realize that in future we need to add position delete file writer, and the user can use like following (use different writer seperately):

// Create a writer to write data file
let append_writer = table.append_writer();
append_writer.write();
let delete_writer = table.delete_writer();
delete_writer.write();

let append_data_files = append_writer.close();
let delete_data_files = delete_writer.close();

// apply these data file to the table
let tx = table.transaction;
tx.apply(append_data_files);
tx.commit();

// apply these data file to the table
let tx = table.transaction;
tx.apply(delete_data_files);
tx.commit();

So maybe name the interface be append_writer() will be better?

@liurenjie1024
Copy link
Contributor Author

In my original design, the task writer should provides two methods:

insert_record
update_record

The internal implementation of update_record needs to maintain a map of record id to file position, this way we can keep users from using low level api of file writer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants