-
Notifications
You must be signed in to change notification settings - Fork 878
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
confluent-kafka-python vs aiokafka for Asynchronous applications #1723
Comments
Hello, we have an example using asyncio. |
The example is great, but there's no async consumer demonstrated. I've ended up writing this.
Regarding the FastAPI part, we have something like this:
|
Yes, it's infuriating how badly it is supported by Confluent |
Interestingly we have*) very similar asyncio wrapper around Rebalance callbacks we use in the following way: assign_queue = asyncio.Queue[Sequence[TopicPartition]]()
def on_assign(_: Consumer, partitions: Sequence[ConfluentTopicPartition])
assign_queue.put_nowait(partitions)
consumer.subscribe(topics, on_assign=on_assign) Then you can have separate task that awaits the queue of assigned partitions. *) I cannot show full code since it's proprietary. |
Description
NOTE: This is a question and not an issue
I am building an application with FastAPI, this application will also need to consume messages from kafka. Thus we want to run the consumer asynchronously.
I read https://www.confluent.io/blog/kafka-python-asyncio-integration/ and understood that we can introduce async nature for confluent-kafka-plugin. While exploring other libraries, I stumbled on aiokafka, I am trying to understand the differences between the libraries and I found a specific issue with an interesting comment from one of the repo's members - aio-libs/aiokafka#665 (comment)
With this I am unable to understand if confluent-kafka-python offers complete async support as mentioned in #185
Can you please help me clarify this and unblock ?
How to reproduce
Checklist
Please provide the following information:
confluent_kafka.version()
andconfluent_kafka.libversion()
):{...}
'debug': '..'
as necessary)The text was updated successfully, but these errors were encountered: