Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling connection errors with KafkaTemplate when cluster not available #2251

Open
garyrussell opened this issue May 3, 2022 Discussed in #2250 · 4 comments
Open

Handling connection errors with KafkaTemplate when cluster not available #2251

garyrussell opened this issue May 3, 2022 Discussed in #2250 · 4 comments

Comments

@garyrussell
Copy link
Contributor

Discussed in #2250

Originally posted by Walnussbaer May 3, 2022
Hi everyone,

I'm really strunggeling with some basic functionality that I would like to achieve using KafkaTemplate and I hope someone can help me out.
I already created a Stackoverflow entry, but didn't get much help there yet (https://stackoverflow.com/questions/72055135/spring-apache-kafka-onfailure-callback-of-kafkatemplate-not-fired-on-connection/72059673?noredirect=1#comment127371838_72059673). The Apache Kafka Spring Docs weren't much help either, as well as a very extensive Google search.

Consider the basic following scenario:
I have a simple KafkaTemplate, that shall send data to a KafkaCluster. Now consider, that the KafkaCluster goes down (not just temporarily). How can i configure the KafkaTemplate so that the wrapped producer stops trying to connect to the KafkaCluster to fetch the metadata for the given topic in case the Kafka Cluster is not reachable?

Why do I want to achieve that? Well, in a production environment, it can always be the case, that the Kafka Cluster goes down for some reason. I want to be able to detect that during the sending process of data. And worst problem ist, that, as of now, my producer thread will starve to death, because it goes into in infinite loop trying to connect to the broker.

This is my producer config:

@Configuration
public class KafkaProducerConfig {

    private String bootstrapServers = "[::1]:9091"; // wrong port to simulate unavailable connection

    @Bean
    public Map<String,Object> producerConfig() {

        // config settings for creating producers
        Map<String,Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,this.bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,10000);
        configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,4000);
        configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,6000);
        configProps.put(ProducerConfig.RETRIES_CONFIG,0);

        return configProps;
    }

    @Bean
    public ProducerFactory<String,String> producerFactory() {
        // creates a kafka producer
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean("kafkaTemplate")
    public KafkaTemplate<String,String> kafkaTemplate(){
        // template which abstracts sending data to kafka
        KafkaTemplate<String,String> kafkaTemplate = new KafkaTemplate<>(producerFactory());

        return kafkaTemplate;
    }
}

This is my service. My first approach sendMessageWithCallback() does not work, because the onFailure() method won't get invoked if the KafkaProducer cannot establish a connection to the Kafka cluster. Using my second service method sendMessageWithProperErrorHandling(), I can at least catch the TimeoutException which is thrown by the KafkaProducer when the metadata for the topic could not be fetched within MAX_BLOCK_MS_CONFIG, but still, I can't stop the producer from going into an infite loop after that first timeout. Below you also find a picutre of the infinite loop. The KafkaProducer will essentially try to connect to the KafkaCluster for the rest of it's life, creating thread starving to death. It also looks like that it completly ignores my RETRIES_CONFIG which is set to zero retires ...

@Service
public class KafkaSenderService {

    Logger logger = LoggerFactory.getLogger(KafkaSenderService.class);

    @Qualifier("kafkaTemplate")
    private final KafkaTemplate<String,String> kafkaTemplate;

    @Autowired
    public KafkaSenderService(KafkaTemplate<String,String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message, String topicName) {
        kafkaTemplate.send(topicName,message);
    }

    public void sendMessageWithCallback(String message, String topicName) {

        // possibility to add callbacks to define what shall happen in success/ error case
        ListenableFuture<SendResult<String,String>> future = kafkaTemplate.send(topicName, message);

        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            // DOES NOT WORK IF THE BROKER IS NOT AVAILABLE
            public void onFailure(Throwable ex) {
                logger.warn("Message could not be delivered. " + ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                logger.info("Your message was delivered with following offset: " + result.getRecordMetadata().offset());
            }
        });
    }

    public void sendMessageWithProperErrorHandling(String message, String topicName){

        // TODO use AdminClient to check connectivity --> sensless, what if the cluster goes down after the check was made?!

        try {
            SendResult<String,String> sendResult = kafkaTemplate.send(topicName, message).get(5000, TimeUnit.MILLISECONDS);
        } catch (Exception te) {
            System.out.println("Could not connect" + te.getMessage());
            te.printStackTrace();
        }
    }
}

Now my simple question: What is the best practice to detect connection errors during a send process and stop the sending process, when that error occurs?

The infinite loop I mentioned can be seen here:
grafik

@sjvolpe
Copy link

sjvolpe commented Feb 3, 2023

Did you ever find a solution to this gary as I'm facing the same issue.

@garyrussell
Copy link
Contributor Author

Not yet. See the linked discussion for a work around.

@sjvolpe
Copy link

sjvolpe commented Feb 3, 2023

Okay that should work - thanks for the quick response

mksahakyan added a commit to dCache/dcache that referenced this issue Apr 26, 2023
Motivation

After we upgraded to 8.2 we no longer are getting events into Kafka. We have 3 dCache instances. One 7.2 remaining still publishing to Kafka with no issue. (#7123).

The issue is that according to spring-projects/spring-kafka#2251,  the kafka-clients provide no hooks to determine that a send failed because the broker is down (spring-projects/spring-kafka#2250).
This is still not fixed so this should be fixed.

Modification

Change the LoggingProducerListener so that when TimeoutException will be catch, the error message will indicate that there is a connection issue or the broker is down.

Result

Log looks like this

24 Apr 2023 16:17:04 (pool_write) [] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 16:17:04 (pool_write) [] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 16:17:04 (NFS-localhost) [] Producer failed to send the message, the broker is down or the connection was refused

or

24 Apr 2023 17:27:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag pool_write DoorTransferFinished 0000C9CFA47686574B43B1EF9CF037A24780] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 17:27:51 (pool_write) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag NFS-localhost PoolAcceptFile 0000C9CFA47686574B43B1EF9CF037A24780] Topic billing not present in metadata after 60000 ms.
24 Apr 2023 17:27:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag pool_write DoorTransferFinished 0000C9CFA47686574B43B1EF9CF037A24780] TEST Topic billing not present in metadata after 60000 ms. class org.springframework.kafka.KafkaException
24 Apr 2023 17:28:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqZqubA pool_write DoorTransferFinished 00002B30ED198C494F25A31F589AB91F903F] Producer failed to send the message, the broker is down or the co

Target: master
8.2, 9.0
 Require-book: no
 Require-notes: yes
 Patch: https://rb.dcache.org/r/13967/
 Acked-by: Lea Morschel, Abert Rossi, Tigran Mkrtchyan
mksahakyan added a commit to mksahakyan/dcache that referenced this issue May 6, 2023
Motivation

After we upgraded to 8.2 we no longer are getting events into Kafka. We have 3 dCache instances. One 7.2 remaining still publishing to Kafka with no issue. (dCache#7123).

The issue is that according to spring-projects/spring-kafka#2251,  the kafka-clients provide no hooks to determine that a send failed because the broker is down (spring-projects/spring-kafka#2250).
This is still not fixed so this should be fixed.

Modification

Change the LoggingProducerListener so that when TimeoutException will be catch, the error message will indicate that there is a connection issue or the broker is down.

Result

Log looks like this

24 Apr 2023 16:17:04 (pool_write) [] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 16:17:04 (pool_write) [] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 16:17:04 (NFS-localhost) [] Producer failed to send the message, the broker is down or the connection was refused

or

24 Apr 2023 17:27:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag pool_write DoorTransferFinished 0000C9CFA47686574B43B1EF9CF037A24780] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 17:27:51 (pool_write) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag NFS-localhost PoolAcceptFile 0000C9CFA47686574B43B1EF9CF037A24780] Topic billing not present in metadata after 60000 ms.
24 Apr 2023 17:27:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag pool_write DoorTransferFinished 0000C9CFA47686574B43B1EF9CF037A24780] TEST Topic billing not present in metadata after 60000 ms. class org.springframework.kafka.KafkaException
24 Apr 2023 17:28:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqZqubA pool_write DoorTransferFinished 00002B30ED198C494F25A31F589AB91F903F] Producer failed to send the message, the broker is down or the co

Target: master
8.2, 9.0
 Require-book: no
 Require-notes: yes
 Patch: https://rb.dcache.org/r/13967/
 Acked-by: Lea Morschel, Abert Rossi, Tigran Mkrtchyan
mksahakyan added a commit to mksahakyan/dcache that referenced this issue May 6, 2023
Motivation

After we upgraded to 8.2 we no longer are getting events into Kafka. We have 3 dCache instances. One 7.2 remaining still publishing to Kafka with no issue. (dCache#7123).

The issue is that according to spring-projects/spring-kafka#2251,  the kafka-clients provide no hooks to determine that a send failed because the broker is down (spring-projects/spring-kafka#2250).
This is still not fixed so this should be fixed.

Modification

Change the LoggingProducerListener so that when TimeoutException will be catch, the error message will indicate that there is a connection issue or the broker is down.

Result

Log looks like this

24 Apr 2023 16:17:04 (pool_write) [] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 16:17:04 (pool_write) [] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 16:17:04 (NFS-localhost) [] Producer failed to send the message, the broker is down or the connection was refused

or

24 Apr 2023 17:27:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag pool_write DoorTransferFinished 0000C9CFA47686574B43B1EF9CF037A24780] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 17:27:51 (pool_write) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag NFS-localhost PoolAcceptFile 0000C9CFA47686574B43B1EF9CF037A24780] Topic billing not present in metadata after 60000 ms.
24 Apr 2023 17:27:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag pool_write DoorTransferFinished 0000C9CFA47686574B43B1EF9CF037A24780] TEST Topic billing not present in metadata after 60000 ms. class org.springframework.kafka.KafkaException
24 Apr 2023 17:28:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqZqubA pool_write DoorTransferFinished 00002B30ED198C494F25A31F589AB91F903F] Producer failed to send the message, the broker is down or the co

Target: master
8.2, 9.0
 Require-book: no
 Require-notes: yes
 Patch: https://rb.dcache.org/r/13967/
 Acked-by: Lea Morschel, Abert Rossi, Tigran Mkrtchyan
lemora pushed a commit to dCache/dcache that referenced this issue May 12, 2023
Motivation

After we upgraded to 8.2 we no longer are getting events into Kafka. We have 3 dCache instances. One 7.2 remaining still publishing to Kafka with no issue. (#7123).

The issue is that according to spring-projects/spring-kafka#2251,  the kafka-clients provide no hooks to determine that a send failed because the broker is down (spring-projects/spring-kafka#2250).
This is still not fixed so this should be fixed.

Modification

Change the LoggingProducerListener so that when TimeoutException will be catch, the error message will indicate that there is a connection issue or the broker is down.

Result

Log looks like this

24 Apr 2023 16:17:04 (pool_write) [] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 16:17:04 (pool_write) [] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 16:17:04 (NFS-localhost) [] Producer failed to send the message, the broker is down or the connection was refused

or

24 Apr 2023 17:27:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag pool_write DoorTransferFinished 0000C9CFA47686574B43B1EF9CF037A24780] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 17:27:51 (pool_write) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag NFS-localhost PoolAcceptFile 0000C9CFA47686574B43B1EF9CF037A24780] Topic billing not present in metadata after 60000 ms.
24 Apr 2023 17:27:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag pool_write DoorTransferFinished 0000C9CFA47686574B43B1EF9CF037A24780] TEST Topic billing not present in metadata after 60000 ms. class org.springframework.kafka.KafkaException
24 Apr 2023 17:28:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqZqubA pool_write DoorTransferFinished 00002B30ED198C494F25A31F589AB91F903F] Producer failed to send the message, the broker is down or the co

Target: master
8.2, 9.0
 Require-book: no
 Require-notes: yes
 Patch: https://rb.dcache.org/r/13967/
 Acked-by: Lea Morschel, Abert Rossi, Tigran Mkrtchyan
lemora pushed a commit to dCache/dcache that referenced this issue May 12, 2023
Motivation

After we upgraded to 8.2 we no longer are getting events into Kafka. We have 3 dCache instances. One 7.2 remaining still publishing to Kafka with no issue. (#7123).

The issue is that according to spring-projects/spring-kafka#2251,  the kafka-clients provide no hooks to determine that a send failed because the broker is down (spring-projects/spring-kafka#2250).
This is still not fixed so this should be fixed.

Modification

Change the LoggingProducerListener so that when TimeoutException will be catch, the error message will indicate that there is a connection issue or the broker is down.

Result

Log looks like this

24 Apr 2023 16:17:04 (pool_write) [] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 16:17:04 (pool_write) [] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 16:17:04 (NFS-localhost) [] Producer failed to send the message, the broker is down or the connection was refused

or

24 Apr 2023 17:27:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag pool_write DoorTransferFinished 0000C9CFA47686574B43B1EF9CF037A24780] Producer failed to send the message, the broker is down or the connection was refused
24 Apr 2023 17:27:51 (pool_write) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag NFS-localhost PoolAcceptFile 0000C9CFA47686574B43B1EF9CF037A24780] Topic billing not present in metadata after 60000 ms.
24 Apr 2023 17:27:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqLTdag pool_write DoorTransferFinished 0000C9CFA47686574B43B1EF9CF037A24780] TEST Topic billing not present in metadata after 60000 ms. class org.springframework.kafka.KafkaException
24 Apr 2023 17:28:51 (NFS-localhost) [door:NFS-localhost@dCacheDomain:AAX6FqZqubA pool_write DoorTransferFinished 00002B30ED198C494F25A31F589AB91F903F] Producer failed to send the message, the broker is down or the co

Target: master
8.2, 9.0
 Require-book: no
 Require-notes: yes
 Patch: https://rb.dcache.org/r/13967/
 Acked-by: Lea Morschel, Abert Rossi, Tigran Mkrtchyan
@JooHyukKim
Copy link
Contributor

Idk if I should file a new feature request or start from here 🤔
Can we consider a new feature?
I am facing the same issue (or personal struggle)
Suggested workaround (calling reset()) does work, but it would be nice to have some sort of strategy for this, especially when you need to implement the detection+reset functionally across multiple applications.

Combining keywords like broker + disconnection + detection + strategy + listener, we may be able to come up with something :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants