Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete one message from Kafka server as it can't deserialized , but not restart Consumer server ,then it continue poll this message caused high CP #2039

Open
xzhan opened this issue Dec 10, 2021 · 4 comments

Comments

@xzhan
Copy link

xzhan commented Dec 10, 2021

Problem:

Delete one message from Kafka server as it can't deserialized , but not restart Consumer server ,then it continue poll this message caused high CPU

Spring-Kafka version: 2.5.14.Release

See: while can't be deseralized , it caused client still hang on this offset 13846467.

Log:

[2021-12-09 13:46:51,226] [ERROR] 1 [org.springframework.kafka.KafkaListenerEndpointContainer#4-0-C-1] essageListenerContainer$ListenerConsumer - - Consumer exception org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition sj1_b_meeting_cacheflush_webex_notification-0 at offset 13846467. If needed, please seek past the record to continue consumption. at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1405) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_302] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_302] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition sj1_b_meeting_cacheflush_webex_notification-0 at offset 13846467. If needed, please seek past the record to continue consumption.Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.cisco.webex.apiserver.boot.kafka.KafkaFlushCacheNotificationMessage]; nested exception is java.lang.ClassNotFoundException: com.cisco.webex.apiserver.boot.kafka.KafkaFlushCacheNotificationMessage at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:472) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1307) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1538) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1374) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:676) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:631) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1282) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-2.4.1.jar:?] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1246) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]

after Delete the message offset 13846467.

See: Thread dump

org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1" #169 prio=5 os_prio=0 tid=0x00007fa107ae7800 nid=0xc4 runnable [0x00007fa0023f4000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000006f9cacaa0> (a sun.nio.ch.Util$3)
- locked <0x00000006f9caca88> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000006fabaaab8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:794)
at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1246)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1146)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)

I read the Spring-Kafka code , I feel there is some loop for continuing poll message with uncommited offset, even delete the offset , it will still poll message from this un-Acked message

@garyrussell and @artembilan can you guide me what's problem on this case ? how can we handle it ?

@artembilan
Copy link
Member

Spring for Apache Kafka 2.5.x is out of support: https://spring.io/projects/spring-kafka#support.

Consider to upgrade to the latest one.
Also it is not clear what is "Delete the message offset"? I was not aware if it even possible with Kafka at all...
See some information about deserialization error handling in docs: https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

@xzhan
Copy link
Author

xzhan commented Dec 13, 2021

@xzhan
Copy link
Author

xzhan commented Dec 13, 2021

But my point I am thinking why the consumer still poll old offset message? @artembilan

@garyrussell
Copy link
Contributor

It's simply the way Kafka works and has nothing to do with Spring.

Error deserializing key/value for partition sj1_b_meeting_cacheflush_webex_notification-0 at offset 13846467. If needed, please seek past the record to continue consumption.

Without Spring, you would have to perform a seek to get past the bad record; the ErrorHandlingDeserializer in conjunction with the default error handler takes care of performing the seek for you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants