Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] split_partition_queue behavior #651

Open
Malefaro opened this issue Jan 23, 2024 · 1 comment
Open

[Question] split_partition_queue behavior #651

Malefaro opened this issue Jan 23, 2024 · 1 comment

Comments

@Malefaro
Copy link

Docs about split_partition_queue says that

Note that calling Consumer::assign will deactivate any existing partition queues. You will need to call this method for every partition that should be split after every call to assign.

But I don't see this behavior.

For example in this code snippet:

    use std::sync::Arc;

    use futures::StreamExt;
    use rdkafka::{
        consumer::{Consumer, StreamConsumer},
        ClientConfig, Message, TopicPartitionList,
    };

    #[tokio::test]
    async fn test_split_partition() {
        let config = ClientConfig::new()
            .set("group.id", "tests")
            .set("bootstrap.servers", "0.0.0.0:9092")
            .set("enable.partition.eof", "false")
            .set("session.timeout.ms", "6000")
            .set("enable.auto.commit", "true")
            .set_log_level(rdkafka::config::RDKafkaLogLevel::Debug)
            .clone();

        let client: StreamConsumer = config.create().unwrap();
        let client = Arc::new(client);
        let topic = "some_topic";
        let spq = client.split_partition_queue(topic, 0).unwrap();
        let mut tpl = TopicPartitionList::new();
        tpl.add_partition_offset(topic, 0, rdkafka::Offset::End)
            .expect("failed to add partition");

        client
            .incremental_assign(&tpl)
            .expect("failed to incremental_assign");

        let mut stream = spq.stream();
        println!("waiting msg");
        for i in 0..20 {
            let msg = stream.next().await;
            println!("got msg: {msg:?}");
        }

        tpl.add_partition_offset(
            "other_topic",
            0,
            rdkafka::Offset::End,
        )
        .unwrap();

        client.assign(&tpl).unwrap();

        println!("reading new after assign");
        loop {
            let msg = stream.next().await;
            println!("got msg: {msg:?}");
        }
    }

After assign with "other_topic" (and initial "some_topic") I still receiving new messages from topic (I checked that's definitely new messages that was produced after assign)

Also docs says

Note that there may be buffered messages for the specified partition that will continue to be returned by StreamConsumer::recv. For best results, call split_partition_queue before the first call to StreamConsumer::recv.
You must periodically await StreamConsumer::recv, even if no messages are expected, to serve events.

May I lose some messages in StreamPartitionQueue if I call split_partition_queue after first call to StreamConsumer::recv ?

Should I use StreamConsumer::pause while calling split_partition_queue and StreamConsumer::resume after to not lose any messages in created queue?

I need dynamicaly subscribe and unsubscribe from some topics. Different topics has different handlers.
If I will spawn background task that always call recv() on StreamConsumer I won't be able to call split_partition_queue before the first call to StreamConsumer::recv.

Initially I wanted to use incremental_assign for subscribing, but I don't know if the note about assign applies to incremental_assign. But I can't reproduce assign behavior, so can not test incremental_assign behavior.

So in short my questions is:

  1. Can I use incremental_assign/incremental_unassign to not recreate all split_partition_queue queues after call?
  2. Is it safe (in term of losing messages in created StreamPartitionQueue) to call split_partition_queue after first call to StreamConsumer::recv (if I have background task that just ignores any messages from StreamConsumer::recv)
    (my flow is first call split_partition_queue, then incremental_assign)
@plasticbox
Copy link

And I would like to have an example of a case where multiple topics are received as a Stream and rebalanced when using split_partition_queue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants