You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Delete one message from Kafka server as it can't deserialized , but not restart Consumer server ,then it continue poll this message caused high CP
#2039
Delete one message from Kafka server as it can't deserialized , but not restart Consumer server ,then it continue poll this message caused high CPU
Spring-Kafka version: 2.5.14.Release
See: while can't be deseralized , it caused client still hang on this offset 13846467.
Log:
[2021-12-09 13:46:51,226] [ERROR] 1 [org.springframework.kafka.KafkaListenerEndpointContainer#4-0-C-1] essageListenerContainer$ListenerConsumer - - Consumer exception org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition sj1_b_meeting_cacheflush_webex_notification-0 at offset 13846467. If needed, please seek past the record to continue consumption. at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1405) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_302] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_302] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition sj1_b_meeting_cacheflush_webex_notification-0 at offset 13846467. If needed, please seek past the record to continue consumption.Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.cisco.webex.apiserver.boot.kafka.KafkaFlushCacheNotificationMessage]; nested exception is java.lang.ClassNotFoundException: com.cisco.webex.apiserver.boot.kafka.KafkaFlushCacheNotificationMessage at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:472) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1307) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1538) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1374) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:676) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:631) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1282) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-2.4.1.jar:?] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1246) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
after Delete the message offset 13846467.
See: Thread dump
org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1" #169 prio=5 os_prio=0 tid=0x00007fa107ae7800 nid=0xc4 runnable [0x00007fa0023f4000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000006f9cacaa0> (a sun.nio.ch.Util$3)
- locked <0x00000006f9caca88> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000006fabaaab8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:794)
at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1246)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1146)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
I read the Spring-Kafka code , I feel there is some loop for continuing poll message with uncommited offset, even delete the offset , it will still poll message from this un-Acked message
@garyrussell and @artembilan can you guide me what's problem on this case ? how can we handle it ?
The text was updated successfully, but these errors were encountered:
It's simply the way Kafka works and has nothing to do with Spring.
Error deserializing key/value for partition sj1_b_meeting_cacheflush_webex_notification-0 at offset 13846467. If needed, please seek past the record to continue consumption.
Without Spring, you would have to perform a seek to get past the bad record; the ErrorHandlingDeserializer in conjunction with the default error handler takes care of performing the seek for you.
Problem:
Delete one message from Kafka server as it can't deserialized , but not restart Consumer server ,then it continue poll this message caused high CPU
Spring-Kafka version: 2.5.14.Release
See: while can't be deseralized , it caused client still hang on this offset 13846467.
Log:
[2021-12-09 13:46:51,226] [ERROR] 1 [org.springframework.kafka.KafkaListenerEndpointContainer#4-0-C-1] essageListenerContainer$ListenerConsumer - - Consumer exception org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition sj1_b_meeting_cacheflush_webex_notification-0 at offset 13846467. If needed, please seek past the record to continue consumption. at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1405) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_302] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_302] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition sj1_b_meeting_cacheflush_webex_notification-0 at offset 13846467. If needed, please seek past the record to continue consumption.Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.cisco.webex.apiserver.boot.kafka.KafkaFlushCacheNotificationMessage]; nested exception is java.lang.ClassNotFoundException: com.cisco.webex.apiserver.boot.kafka.KafkaFlushCacheNotificationMessage at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:472) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE] at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1307) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1538) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1374) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:676) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:631) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1282) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-2.4.1.jar:?] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1246) ~[spring-kafka-2.5.14.RELEASE.jar:2.5.14.RELEASE]
after Delete the message offset 13846467.
See: Thread dump
org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1" #169 prio=5 os_prio=0 tid=0x00007fa107ae7800 nid=0xc4 runnable [0x00007fa0023f4000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000006f9cacaa0> (a sun.nio.ch.Util$3)
- locked <0x00000006f9caca88> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000006fabaaab8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:794)
at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1246)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1146)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
I read the Spring-Kafka code , I feel there is some loop for continuing poll message with uncommited offset, even delete the offset , it will still poll message from this un-Acked message
@garyrussell and @artembilan can you guide me what's problem on this case ? how can we handle it ?
The text was updated successfully, but these errors were encountered: