You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I searched in the issues and found nothing similar.
Motivation
When creating an Append for Message Queue table, as depicted in the screenshot here:
we can notice the following:
5 buckets are specified, but unless data comes in the bucket is not created
If you check the Kafka partitions; partition 3 has keys 2, 3 and 4
These keys though end up in different buckets
Paimon does a shuffle, even though the parallelism is the same because it doesn't do 1-1 mapping
Because it is a Kafka-like message queue functionality, some users are confused, as they expect the same partitioning to happen and overall have a 1-1 mapping, between a Kafka partition and a paimon bucket.
At the same time, I believe this is a really good enhancement and should also allow to remove the shuffle between the operators, thus improving performance.
Solution
No response
Anything else?
No response
Are you willing to submit a PR?
I'm willing to submit a PR!
The text was updated successfully, but these errors were encountered:
polyzos
changed the title
[Feature]
[Feature] Have a 1-1 mapping between paimon buckets and kafka partitions
Apr 23, 2024
polyzos
changed the title
[Feature] Have a 1-1 mapping between paimon buckets and kafka partitions
[Feature] 1-1 mapping between paimon buckets and kafka partitions
Apr 23, 2024
If you want 1->1 mapping,Paimon's bucket number should bigger than kafka partitions, and their should be shuffle by kafka partition id.
I think paimon already can implement your thoughts.
Here is a demo, you can define ddl like this
Kafka source table:
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format
`origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
`partition_id` int METADATA FROM 'partition' VIRTUAL, -- from Kafka connector
`offset` BIGINT METADATA VIRTUAL, -- from Kafka connector
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
Paimon sink table:
CREATE table if not exists sink_paimon_table
WITH ('connector' = 'paimon',
'bucket' = '3', -- bucket number should bigger than kafka partitions
'bucket-key' = 'partition_id', -- bucket key must be kafka partition_id
'merge-engine' = 'deduplicate',
'primary-key' = 'partition_id,offset' -- parimary key must be partition_id,offset
)
LIKE KafkaTable (EXCLUDING ALL)
So kafka source table‘s data insert into paimon table will shuffle by kafka partition_id,partion_id is a int data type which hashcode equal itself, This pipeline model will let kafka partition record 1->1 to paimon bucket.
@eric666666 Thanks a lot for this.
The problem here though is that I'm trying to use an Append for Message Queue table, so I can offload logs from Kafka, but your example suggests using a Primary Key table.
I also try using your example, but when creating the sink_paimon_table (paimon 0.7.0-incubating), Im getting
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Table column [id, a, b, dt] should include all primary key constraint [partition_id, offset]
Search before asking
Motivation
When creating an Append for Message Queue table, as depicted in the screenshot here:
we can notice the following:
Because it is a Kafka-like message queue functionality, some users are confused, as they expect the same partitioning to happen and overall have a 1-1 mapping, between a Kafka partition and a paimon bucket.
At the same time, I believe this is a really good enhancement and should also allow to remove the shuffle between the operators, thus improving performance.
Solution
No response
Anything else?
No response
Are you willing to submit a PR?
The text was updated successfully, but these errors were encountered: