Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer throws error message Broker: Message size too large when the event payload size is less than the max max.message.bytes configuration on the topic #2214

Open
8 tasks done
vikasillumina opened this issue Apr 27, 2024 · 0 comments

Comments

@vikasillumina
Copy link

vikasillumina commented Apr 27, 2024

Description

Producer throws error messageBroker: Message size too large when the event payload size is less than the max.message.bytes configuration on the topic

How to reproduce

We have not been able to reproduce this issue and from the investigation we have concluded that the event payload we are trying to publish to Kafka is no where near the max.message.bytes configuration on the topic.
But here is the sample code:

 public virtual async Task<DeliveryResult<string, byte[]>[]> PublishMessagesAsync(IEnumerable<Message> messages,
            Action<MessageDelivery<string, byte[]>> onDelivery,
            CancellationToken cancellationToken = default(CancellationToken))
        {        
            var tasks = new List<Task<DeliveryResult<string, byte[]>>>();
            foreach (var m in messages)
            {
                if (m == null)
                {
                    continue;
                }
                Task<DeliveryResult<string, byte[]>> task = null;

                var contentJson = JsonConvert.SerializeObject(m.ContentObject, new JsonSerializerSettings
                {
                    DateTimeZoneHandling = DateTimeZoneHandling.Utc,
                    NullValueHandling = NullValueHandling.Ignore,
                    ContractResolver = new Newtonsoft.Json.Serialization.DefaultContractResolver
                    {
                        NamingStrategy = new Newtonsoft.Json.Serialization.CamelCaseNamingStrategy()
                    }
                });
                
                var contentBytes = Encoding.Default.GetBytes(contentJson);

                if (m.ContentBytes != null && m.ContentBytes.Length > 0)
                {
                    task = PublishRawAsync(m.KafkaMessageMetadata?.Topic, m.ContentBytes, m.Headers, m.KafkaMessageMetadata?.Key, cancellationToken);
                }
              
                else
                {
                    throw new ArgumentException("message has no content populated");
                }

                if (onDelivery != null)
                {
                    task = task.ContinueWith(async continuation =>
                    {
                        var deliveryReport = await continuation;
                        onDelivery(new MessageDelivery<string, byte[]>(m, deliveryReport));
                        return deliveryReport;
                    }, cancellationToken).Unwrap();
                }
                
                tasks.Add(task);
            }

            return await Task.WhenAll(tasks);
        }

public async Task<DeliveryResult<string, byte[]>> PublishRawAsync(string topic, byte[] content, HeadersWrapper messageHeaders, string messageKey = null, CancellationToken cancellationToken = default(CancellationToken))
        {
            if (string.IsNullOrEmpty(topic))
            {
                throw new ArgumentNullException(nameof(topic), "Topic must be provided");
            }
            if (content == null || content.Length == 0)
            {
                throw new ArgumentException("content is null or empty", nameof(content));
            }

            if (string.IsNullOrEmpty(messageHeaders.producedby))
            {
                messageHeaders.producedby = _serviceName;
            }

            try
            {
                var message = new Message<string, byte[]>()
                {
                    Value = content,
                    Headers = CopyToKafkaHeaders(messageHeaders),
                    Key = messageKey,
                    Timestamp = new Timestamp(DateTime.UtcNow)
                };

                var dr = await _producer.ProduceAsync(topic, message);
                return dr;
            }
            catch(Exception ex)
            {
                _logger.Error(ex, $"An error occured publishing event to topic {topic}.");
                throw;
            }
        }

CLIENT CONFIGURATION

Producer configuration:
bootstrap.servers = AWS broker url (3 brokers across 3 AZ)
client.id = service name
message.max.bytes = 4194304 (4MB) (its a common library setting, however the topic itself has 1MB limit on the message.max.bytes, please see below screenshot under Topic configuration

Topic configuration:
image

OPERATING SYSTEM:

Application runs in docker container hosted as Kubernetes pod inside Linux OS, below is more specifics of the OS
PRETTY_NAME="Debian GNU/Linux 11 (bullseye)"
NAME="Debian GNU/Linux"
VERSION_ID="11"
VERSION="11 (bullseye)"
VERSION_CODENAME=bullseye
ID=debian
HOME_URL="https://www.debian.org/"
SUPPORT_URL="https://www.debian.org/support"
BUG_REPORT_URL="https://bugs.debian.org/"
5.10.205-195.807.amzn2.x86_64

PRODUCER LOGS:

Broker: Message size too large at Confluent.Kafka.Producer2.ProduceAsync(TopicPartition topicPartition, Message2 message, CancellationToken cancellationToken)
This is the only error message we could get from the SDK.

Our application doesn't batch the events, it is only sending 1 event at a time via async Tasks. We do submit a lot of these tasks in parallel if that matters.

I looked through the SDK code but couldn't find more details that could explain this error other than the size.
I searched through internet and didn't find any obvious suggestions/solutions.

Please let me know if there is anything else I can provide.

Thanks for your help in advance.

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file. N/A as there is no way to reproduce this issue on demand.
  • Confluent.Kafka nuget version - 1.8.2
  • Apache Kafka version - 2.7.0 (AWS MSK)
  • Client configuration - See above Producer and Topic Configurations
  • Operating system - See above
  • Provide logs (with "debug" : "..." as necessary in configuration)- See above
  • Provide broker log excerpts- AWS confirmed when we saw the error messages on publishing there were no errors in broker log
  • Critical issue - Its not happening all the time but unfortunately we are seeing this in our production environment.
@vikasillumina vikasillumina changed the title Producer throws error Broker: Message size too large message the event payload size is less than the max max.message.bytes configuration on the topic Producer throws error message Broker: Message size too large when the event payload size is less than the max max.message.bytes configuration on the topic Apr 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant