Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ErrorHandlingDeserializer: should failedDeserializationFunction be mandatory? #2107

Open
bencody opened this issue Feb 16, 2022 · 1 comment

Comments

@bencody
Copy link

bencody commented Feb 16, 2022

I took over maintenance of an application that uses Spring Kafka. My former colleague configured the application to make use of ErrorHandlingDeserializer, but he didn't configure a failedDeserializationFunction and didn't test what happens in case deserialization errors occur.

That application finally got deployed to a real testing environment, and the application's KafkaListener component kept getting records with null values. I couldn't figure out why the values were null. When looking at the data in the topic using the Kafka CLI tools, I saw that the records did have values. Other than those record value NPEs, there weren't any errors in the application logs.

After some time spent debugging, I found out that ErrorHandlingDeserializer is silently swallowing exceptions when failedDeserializationFunction isn't set, and it passes along null values.

Is this a good design? Would one ever want to use ErrorHandlingDeserializer without a failedDeserializationFunction, or should it be a mandatory field?

@garyrussell
Copy link
Contributor

garyrussell commented Feb 16, 2022

Perhaps you have a batch listener - with a record listener, such records are sent directly to the error handler and are never sent to the listener.

With a batch listener, the entire batch is sent to the listener and you have to examine the headers to see if a deserialization exception occurred on any of the records.

See https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

When using an ErrorHandlingDeserializer with a batch listener, you must check for the deserialization exceptions in message headers. When used with a RecoveringBatchErrorHandler, you can use that header to determine which record the exception failed on and communicate to the error handler via a BatchListenerFailedException.

@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
    for (int i = 0; i < in.size(); i++) {
        Thing thing = in.get(i);
        if (thing == null
                && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
            DeserializationException deserEx = ListenerUtils.byteArrayToDeserializationException(this.logger,
                    (byte[]) headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
            if (deserEx != null) {
                logger.error(deserEx, "Record at index " + i + " could not be deserialized");
            }
            throw new BatchListenerFailedException("Deserialization", deserEx, i);
        }
        process(thing);
    }
}

Or you can add a function to the EHD.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants