Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka server: Message was too large, server rejected it to avoid allocation error #422

Open
aditya-msd opened this issue Apr 24, 2023 · 3 comments

Comments

@aditya-msd
Copy link

I am getting kafka server: Message was too large, server rejected it to avoid allocation error.
Below are the logs that indicate the same .
This is causing the processor to not commit and go into a loop sort of situation and blocking other messages in the topic .
Is there any way to determine or catch the errors .

Increasing the topic size solves the issue , but I would like is to determine and prevent the loop . If this error happens , then I can side step the processing and prevent the loop and continue with the processing as usual.

2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0) > PartTable] finished building storage for topic batch-worker-progress-processor-table/0 in 0.0 minutes
2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0) > PartTable] topic manager gives us oldest: 327771, hwm: 342528
2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0)] catching up table done
2023/04/24 11:32:51 [Processor batch-worker-progress-processor] setup generation 401 ... done
2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0)] starting
2023/04/24 11:32:51 [Processor batch-worker-progress-processor] ConsumeClaim for topic/partition batch-worker-progress-topic/0, initialOffset=342523
2023/04/24 11:32:51 [Sarama] consumer/broker/0 added subscription to batch-worker-progress-topic/0
2023/04/24 11:32:51 Packet 1 received
2023/04/24 11:32:51 Workerstate Size :: 89983
2023/04/24 11:32:51 Total Processed Packets are 1523 out of 20000
2023/04/24 11:32:51 Packet 2 received
2023/04/24 11:32:51 Workerstate Size :: 89983
2023/04/24 11:32:51 Total Processed Packets are 1523 out of 20000
2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0)] Errors occurred asynchronously. Will exit partition processor
2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0)] stopped
2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0)] Run failed with error: Errors:
* kafka server: Message was too large, server rejected it to avoid allocation error.
* kafka server: Message was too large, server rejected it to avoid allocation error.
2023/04/24 11:32:51 [Processor batch-worker-progress-processor] ConsumeClaim done for topic/partition batch-worker-progress-topic/0
2023/04/24 11:32:51 kafka: error while consuming batch-worker-progress-topic/0: error processing message: Errors:
* kafka server: Message was too large, server rejected it to avoid allocation error.
* kafka server: Message was too large, server rejected it to avoid allocation error.
2023/04/24 11:32:51 [Sarama] consumer/broker/0 closed dead subscription to batch-worker-progress-topic/0
2023/04/24 11:32:51 [Processor batch-worker-progress-processor] Cleaning up for 401
2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0)] Stopping
2023/04/24 11:32:51 [Sarama] loop check partition number coroutine will exit, topics [batch-worker-progress-topic]
2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0)] ... Stopping done
2023/04/24 11:32:51 [Processor batch-worker-progress-processor] Cleaning up for 401 ... done
2023/04/24 11:32:51 [Processor batch-worker-progress-processor] consuming from consumer loop done
2023/04/24 11:32:51 [Processor batch-worker-progress-processor] Consumer group loop done, will stop here
2023/04/24 11:32:56 [Processor batch-worker-progress-processor] consuming from consumer loop
2023/04/24 11:32:56 [Sarama] client/metadata fetching metadata for [batch-worker-progress-topic] from broker kafka-0.kafka-headless.nightwing.svc.cluster.local:9092
2023/04/24 11:32:56 [Sarama] client/coordinator requesting coordinator for consumergroup batch-worker-progress-processor from kafka-1.kafka-headless.nightwing.svc.cluster.local:9092
2023/04/24 11:32:56 [Sarama] client/coordinator coordinator for consumergroup batch-worker-progress-processor is (kafka-1.kafka-headless.nightwing.svc.cluster.local:9092)
2023/04/24 11:32:56 [Processor batch-worker-progress-processor] setup generation 402, claims=map[string][]int32{"batch-worker-progress-topic":[]int32{0}}
2023/04/24 11:32:56 [Processor batch-worker-progress-processor] Creating partition processor for partition 0
2023/04/24 11:32:56 [Processor batch-worker-progress-processor > PartitionProcessor (0)] catching up table
2023/04/24 11:32:56 [Processor batch-worker-progress-processor > PartitionProcessor (0) > PartTable] finished building storage for topic batch-worker-progress-processor-table/0 in 0.0 minutes

@frairon
Copy link
Contributor

frairon commented Apr 24, 2023

Hey @aditya-msd ,

currently there is no possibility to drop those messages, because it rarely happens and if it does, the processor must solve its underlying issue (or it will corrupt its own data or lose data).
However maybe it does make sense to handle messages that cause that behavior somehow.
What would be your approach to "side step" the processing, as you said? Should it be silently dropped? Maybe we could add a callback being triggered when that error happens? Or any other ideas?

@aditya-msd
Copy link
Author

For sidestepping :

  1. I can calculate the size of the message that is going to be set . But since the compressed data is what is going to be sent to kafka , I am unable to put a upper limit to this . As per the logs earlier , the Workerstate Size :: 89983 , this is the no of bytes .
    My topic size is set to max.message.bytes=10000 , which means , that the compressed data size is what is being checked .

  2. Right now , I know the error has occurred via the logs . Within the code , how to catch this error , so I can notify/manipulate the internal state as required .

Also you mentioned , we could add a callback being triggered when that error happens , can you provide code snippet as where to add this or any references that use this approach .

Else I have to figure some other means to detect this .

@frairon
Copy link
Contributor

frairon commented Apr 24, 2023

My point actually was that there is currently no way to detect or handle a failing emit. The processor shuts down, thats it. But we could build one, it doesn't sound too hard to do.
That's why I was wondering if you already had an idea how a solution could look like? Anyway, once we find the time we'll take a look and maybe an obvious solution pops up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants