You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Note that calling Consumer::assign will deactivate any existing partition queues. You will need to call this method for every partition that should be split after every call to assign.
But I don't see this behavior.
For example in this code snippet:
use std::sync::Arc;use futures::StreamExt;use rdkafka::{
consumer::{Consumer,StreamConsumer},ClientConfig,Message,TopicPartitionList,};#[tokio::test]asyncfntest_split_partition(){let config = ClientConfig::new().set("group.id","tests").set("bootstrap.servers","0.0.0.0:9092").set("enable.partition.eof","false").set("session.timeout.ms","6000").set("enable.auto.commit","true").set_log_level(rdkafka::config::RDKafkaLogLevel::Debug).clone();let client:StreamConsumer = config.create().unwrap();let client = Arc::new(client);let topic = "some_topic";let spq = client.split_partition_queue(topic,0).unwrap();letmut tpl = TopicPartitionList::new();
tpl.add_partition_offset(topic,0, rdkafka::Offset::End).expect("failed to add partition");
client
.incremental_assign(&tpl).expect("failed to incremental_assign");letmut stream = spq.stream();println!("waiting msg");for i in0..20{let msg = stream.next().await;println!("got msg: {msg:?}");}
tpl.add_partition_offset("other_topic",0,
rdkafka::Offset::End,).unwrap();
client.assign(&tpl).unwrap();println!("reading new after assign");loop{let msg = stream.next().await;println!("got msg: {msg:?}");}}
After assign with "other_topic" (and initial "some_topic") I still receiving new messages from topic (I checked that's definitely new messages that was produced after assign)
Also docs says
Note that there may be buffered messages for the specified partition that will continue to be returned by StreamConsumer::recv. For best results, call split_partition_queue before the first call to StreamConsumer::recv.
You must periodically await StreamConsumer::recv, even if no messages are expected, to serve events.
May I lose some messages in StreamPartitionQueue if I call split_partition_queue after first call to StreamConsumer::recv ?
I need dynamicaly subscribe and unsubscribe from some topics. Different topics has different handlers.
If I will spawn background task that always call recv() on StreamConsumer I won't be able to call split_partition_queue before the first call to StreamConsumer::recv.
Initially I wanted to use incremental_assign for subscribing, but I don't know if the note about assign applies to incremental_assign. But I can't reproduce assign behavior, so can not test incremental_assign behavior.
So in short my questions is:
Can I use incremental_assign/incremental_unassign to not recreate all split_partition_queue queues after call?
Is it safe (in term of losing messages in created StreamPartitionQueue) to call split_partition_queue after first call to StreamConsumer::recv (if I have background task that just ignores any messages from StreamConsumer::recv)
(my flow is first call split_partition_queue, then incremental_assign)
The text was updated successfully, but these errors were encountered:
Docs about
split_partition_queue
says thatBut I don't see this behavior.
For example in this code snippet:
After
assign
with"other_topic"
(and initial"some_topic"
) I still receiving new messages fromtopic
(I checked that's definitely new messages that was produced after assign)Also docs says
May I lose some messages in
StreamPartitionQueue
if I callsplit_partition_queue
after first call toStreamConsumer::recv
?Should I use StreamConsumer::pause while calling
split_partition_queue
and StreamConsumer::resume after to not lose any messages in created queue?I need dynamicaly subscribe and unsubscribe from some topics. Different topics has different handlers.
If I will spawn background task that always call
recv()
onStreamConsumer
I won't be able tocall split_partition_queue before the first call to StreamConsumer::recv
.Initially I wanted to use
incremental_assign
for subscribing, but I don't know if the note aboutassign
applies toincremental_assign
. But I can't reproduceassign
behavior, so can not testincremental_assign
behavior.So in short my questions is:
incremental_assign
/incremental_unassign
to not recreate allsplit_partition_queue
queues after call?StreamPartitionQueue
) to callsplit_partition_queue
after first call toStreamConsumer::recv
(if I have background task that just ignores any messages fromStreamConsumer::recv
)(my flow is first call
split_partition_queue
, thenincremental_assign
)The text was updated successfully, but these errors were encountered: