Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to restrict the number of messages while consuming #2204

Open
8 tasks done
ksdvishnukumar opened this issue Apr 3, 2024 · 1 comment
Open
8 tasks done

How to restrict the number of messages while consuming #2204

ksdvishnukumar opened this issue Apr 3, 2024 · 1 comment

Comments

@ksdvishnukumar
Copy link

Description

I have a application which is built on DotNetCore 6.0. We have a wrapper around the Confluent Kafka Package (2.1.1). We have logic to consumer the message as a batch where the logic accumulates the messages (say 25 messages per batch) once reached the batch size and pause the consumer and process the batch of messages and send the signal to commit (manual commit). Before doing it Resume the batch the perform the commit.

As I understand, When Consume method from Confluent Kafka IConsumer is called where polling will start. The fetcher send the request to the broker to fetch the messages and put in to the internal queue or buffer then message will be delivered to the client.
When pausing the consumer, internal buffer / queue gets cleared. So this leads to re reading the same message.

I see that couple of configuration to restrict by # of bytes or KB.

  1. QueuedMaxMessagesKbytes
    As per my understanding, this is the internal queue buffer size for the fetched messages.
  2. FetchMaxBytes
    As per my understanding, this is the limit for combining all the partitions for the single consumer connection
  3. MaxPartitionFetchBytes
    As per my understanding, this is the limit for the partition level.

The consumer topic has 1 partition and it has 1000 messages each is having 500 KB.
Even though, we set the limit, Say QueuedMaxMessagesKbytes to 5 MB, FetchMaxBytes to 10 MB and MaxPartitionFetchBytes to 5 MB, every time I received the 25 messages means (25* 500KB = 12.5 MB), But am limiting with 5 MB for the QueuedMaxMessagesKbytes and MaxPartitionFetchBytes. So it means that i should be having only 10 messages in a queue. More over am pausing the consumer as well.

One more interesting point is, I read in other GitHub thread passing the Timeout value in the First call to the Consume method will pull message from the broker later TImeSpan.Zero will return messages immediately from the internal queue. With this approach I could not able to commit when AutoCommit is disabled. This works only when AutoCommitEnabled is set to true. So i prefer to use the token based consume method along with CancellationDelayMaxMs. I see that, the same property is assigned internally for the Consume with Timeout value.

THIS IS NOT AN ISSUE BUT WANTED TO UNDERSTAND THE BEHAVIOR

Could you please clarify is my understanding right or how to achieve this behavior.

How to reproduce

Any simple code able to reproduce

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file. - Not Needed
  • Confluent.Kafka nuget version. - 2.1.1
  • Apache Kafka version. - Azure Eventhub
  • Client configuration. GroupId:"test,AutoOffsetStore=true,EnableAutoCommit=false,QueuedMaxMessagesKbytes=5 MB,FetchMaxBytes=10 MB,MaxPartitionFetchBytes=5MB
  • Operating system. - Ubuntu in AKS
  • Provide logs (with "debug" : "..." as necessary in configuration).
  • Provide broker log excerpts.
  • Critical issue.
@ksdvishnukumar
Copy link
Author

Hi @mhowlett , @edenhill

Is there any workaround on this can be implemented based on the Consumer configuration?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant