-
Notifications
You must be signed in to change notification settings - Fork 262
Make Kafka Producer initialization pluggable #61
base: master
Are you sure you want to change the base?
Conversation
Any feedback? |
Yes I like the idea to pull out the producer lifecycle management into a strategy! Would the I think if we go this way we also can move the producer initialization into the The semantic would be as following:
I think with this spi semantic we could eventually implement a DeliveryStrategy that can monitor the availability of the kafka broker and dynamically switch between a KafkaProducer and the fallback appender without ever blocking or throwing at a caller. What do you think? |
I'll try to answer your questions:
About non blocking operations:
Rather than making the Appender#append method non blocking, I personally prefer wrapping the KafkaAppender in an AsyncAppender (blocking queue+worker thread) or an AsyncDisruptorAppender to get a non blocking appender. Your idea of merging ProducerSupplier and DeliveryStrategy into something which wraps the KafkaProducer sounds interesting. I'll try to do it. |
@danielwegener I tried to implement the contract you described. The AsynchronousDeliveryStrategy became LateAsyncDeliveryStrategy. I am not sure I like this last commit 5f0fac0 . Tell me if you wish I revert or fix it. |
Hey @gquintana. Thanks again for your effort and thoughts you have put into this. And I am very sorry that I have to put you down again and again but I again can not really find time to work on this project. And I hate it to demotivate contributors by not responding to their ideas and requests in time. Since you've have shown that you understand the challenges and in this project (likely better than I do) and since you seem to have the motivation to push thing's forward, I'd like to ask you If you want to take over this repo and lead its further development. I'd be very happy if this project would not die due to an absent maintainer (me) - especially as long as it is useful for someone. |
Hi @danielwegener I accept to help in maintaining this useful library as long as I have time to do so. Regarding, this PR can you check the last commit and give your opinion? |
* @throws KafkaException Producer initialization failed | ||
*/ | ||
protected Producer<K, V> createProducer() throws KafkaException{ | ||
return new KafkaProducer<>(new HashMap<>(producerConfig)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the copying? just as defense mechanism? or does the producer itself change the content of the config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes defensive copy
5f0fac0
to
f4007f3
Compare
Just rebased on master and fixed conflicts |
The LazyInitializer classe becomes LazyProducerSupplier.
I add an EagerProducerSupplier to create Kafka Producer on KafkaAppender start.
Fixes part of #60