Skip to content
This repository has been archived by the owner on May 27, 2022. It is now read-only.

Make Kafka Producer initialization pluggable #61

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

gquintana
Copy link
Contributor

The LazyInitializer classe becomes LazyProducerSupplier.
I add an EagerProducerSupplier to create Kafka Producer on KafkaAppender start.
Fixes part of #60

@gquintana
Copy link
Contributor Author

Any feedback?

@danielwegener
Copy link
Owner

Yes I like the idea to pull out the producer lifecycle management into a strategy!

Would the ProducerSupplier.get method interface contract expected to block until the producer is initialized completely? Can it throw? Can it return null (thats what can happen in LazyProducerSupplier right now)? Can it return a producer that will block on the next call to producer.send()?

I think if we go this way we also can move the producer initialization into the SendStrategy (the SendStrategy would otherwise be pointless because SynchronousDeliveryStrategy was a stupid idea anyway and there is only AsyncDeliveryStrategy left). The new SendStrategy would then have the responsibility to NEVER block on the send call, either buffer, drop, or delegate to fallbackAppenders (pretty much the same semantic it shoud have now but with the capability to control the actual producers lifecycle).

The semantic would be as following:

  • DeliveryStrategy.start(ContextAware contextAware, Map<String, Object> producerConfig, FailedDeliveryCallback<E> failedDeliveryCallback) - may throw, must not block, must leave the DeliveryStrategy in the state that it can accept calls to .send() after start has returned.
  • DeliveryStrategy.send(ProducerRecord<K, V> record, E event) - must not block, must not throw, must either buffer a message, enqueue it for delivery or send it to the failedDeliveryCallback.
  • DeliveryStrategy.stop() - may block for a maximum configurable amount of time, ensures that buffers are emtied if possible. Must stop accepting new messages that are offered via send(). Buffered and unsendable messages are sent to the failedDeliveryCallback.

I think with this spi semantic we could eventually implement a DeliveryStrategy that can monitor the availability of the kafka broker and dynamically switch between a KafkaProducer and the fallback appender without ever blocking or throwing at a caller. What do you think?

@gquintana
Copy link
Contributor Author

I'll try to answer your questions:

  • Would the ProducerSupplier.get method interface contract expected to block until the producer is initialized completely? The lazy implementation can block on first call to initialize the producer
  • Can it throw? I would say no, see next question
  • Can it return null (thats what can happen in LazyProducerSupplier right now)? Yes, when the state is stopped or initialization failed.

About non blocking operations:

  • Can it return a producer that will block on the next call to producer.send()? The Producer#send method can block when producer doesn't have partition/broker metadata.
  • The new SendStrategy would then have the responsibility to NEVER block on the send call This requirement seems hard to achieve since Producer#send may block for max.block.ms (see https://kafka.apache.org/documentation.html#producerconfigs then search max.block.ms). Reducing this setting may raise other issues.
  • DeliveryStrategy.start (...) must not block Having to wait for appender to be ready on application start is not big deal to me (see EagerProducerSupplier), yet I understand delaying this bootstraping time may be interesting in some use cases.

Rather than making the Appender#append method non blocking, I personally prefer wrapping the KafkaAppender in an AsyncAppender (blocking queue+worker thread) or an AsyncDisruptorAppender to get a non blocking appender.

Your idea of merging ProducerSupplier and DeliveryStrategy into something which wraps the KafkaProducer sounds interesting. I'll try to do it.

@gquintana
Copy link
Contributor Author

@danielwegener I tried to implement the contract you described. The AsynchronousDeliveryStrategy became LateAsyncDeliveryStrategy.

I am not sure I like this last commit 5f0fac0 . Tell me if you wish I revert or fix it.

@danielwegener
Copy link
Owner

Hey @gquintana. Thanks again for your effort and thoughts you have put into this. And I am very sorry that I have to put you down again and again but I again can not really find time to work on this project. And I hate it to demotivate contributors by not responding to their ideas and requests in time.

Since you've have shown that you understand the challenges and in this project (likely better than I do) and since you seem to have the motivation to push thing's forward, I'd like to ask you If you want to take over this repo and lead its further development.

I'd be very happy if this project would not die due to an absent maintainer (me) - especially as long as it is useful for someone.

@gquintana
Copy link
Contributor Author

Hi @danielwegener I accept to help in maintaining this useful library as long as I have time to do so.
However I wish someone still has a critical look and at what I am doing.

Regarding, this PR can you check the last commit and give your opinion?
I'd really like to fix the issue #60 which already bit me twice.

* @throws KafkaException Producer initialization failed
*/
protected Producer<K, V> createProducer() throws KafkaException{
return new KafkaProducer<>(new HashMap<>(producerConfig));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the copying? just as defense mechanism? or does the producer itself change the content of the config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes defensive copy

@gquintana
Copy link
Contributor Author

Just rebased on master and fixed conflicts

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants