Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maximum message / s throughput for a single subscriber can't stay above 10000 m/s #1962

Open
Aaronontheweb opened this issue Apr 4, 2024 · 5 comments
Labels
bug Something isn't working

Comments

@Aaronontheweb
Copy link

Verification

Using version 4.3.3.952 in Release mode on .NET 8 with logging disabled.

Describe the bug

I'm currently running a small PoC using Akka.NET and MQTTnet via an EMQX broker using the following settings:

_mqttClient = _mqttFactory.CreateMqttClient();

        var mqttClientOptions = new MqttClientOptionsBuilder()
            .WithClientId(_appSetting.MQTT_ClientId)
            .WithTcpServer(_appSetting.MQTT_Host, _appSetting.MQTT_Port)
            .WithCredentials(_appSetting.MQTT_User, _appSetting.MQTT_Password)
            .WithRequestProblemInformation(true)
            .WithCleanSession()
            .WithoutPacketFragmentation()
            .Build();

        var connectResult = await _mqttClient.ConnectAsync(mqttClientOptions);
        if(connectResult.ResultCode != MqttClientConnectResultCode.Success)
        {
            // force a restart and retry
            _log.Error("Failed to connect to MQTT broker: {0}", connectResult.ResultCode);
            throw new InvalidOperationException("Failed to connect to MQTT broker");
        }

        var mqttSubscribeOptions = _mqttFactory.CreateSubscribeOptionsBuilder()
            .WithTopicFilter(
                f =>
                {
                    f.WithTopic(_appSetting.MQTT_Topic);

                    // Conditionally set QoS based on AppSetting.MQTT_QoS
                    switch (_appSetting.MQTT_QoS)
                    {
                        case 0:
                            f.WithAtMostOnceQoS();
                            break;
                        case 1:
                            f.WithAtLeastOnceQoS();
                            break;
                        case 2:
                            f.WithExactlyOnceQoS();
                            break;
                        default:
                            // Handle invalid QoS setting (assuming default to QoS 0 in this example)
                            f.WithAtMostOnceQoS();
                            break;
                    }
                }).Build();
        
        _mqttClient.ApplicationMessageReceivedAsync += ManagedMqttClient_ApplicationMessageReceivedAsync;

        await _mqttClient.SubscribeAsync(mqttSubscribeOptions);
        _mqttClient.DisconnectedAsync += ManagedMqttClient_DisconnectedAsync;

And the following event handler - which just writes to an unbounded ChannelWriter<T>:

private async Task ManagedMqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
    {       
        try
        {
            using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
            await e.AcknowledgeAsync(cts.Token);
            await _channelWriter.WriteAsync(e.ApplicationMessage, cts.Token);
            _self.Tell(Processed.Instance);
        }
        catch (Exception ex)
        {
            _log.Error(ex, "Error acknowledging message {0}", e.PacketIdentifier);
        }
    }

I'm seeing MQTTnet fail to keep up with my workload starting around 10,000 msg/s - and gradually it will fail as the queue gets too large on EMQX (the broker will automatically disconnect the client.)

Even if I change my ManagedMqttClient_ApplicationMessageReceivedAsync to just return a Task.CompletedTask and not do any actual processing, I still see this issue: MQTTnet can't keep up with the rate of messages coming from the broker.

For the project I'm working on each subscriber in an MQTT subscriber group will need to be able to process ~30,000 msg/s - is that possible with this library?

Which component is your bug related to?

  • Client
  • ManagedClient

Tried with both the Client and the ManagedClient but it didn't seem to make any difference.

Expected behavior

I'd expect MQTTnet to keep up beyond 10k msg/s.

Additional context / logging

These are some of the logs from EMQX - it just gives up on the client because the input queue gets too long:

2024-04-04T13:28:24.495093-05:00 [SOCKET] ghb.1@172.18.240.1:62157 msg: emqx_connection_terminated, reason: {shutdown,#{max => 1000,reason => message_queue_too_long,value => 13381}}, username: admin

This ends up creating a snowballing effect where the client is never really able to keep up.

@Aaronontheweb Aaronontheweb added the bug Something isn't working label Apr 4, 2024
@Aaronontheweb
Copy link
Author

I increased some of EMQX's broker settings to allow up to 10,000 inflight messages at any given time and I increased the queue size to 1m messages per MQTT topic. Still seeing things top out around 10k msg/s even after adjusting that.

@MD-V
Copy link
Contributor

MD-V commented Apr 9, 2024

Can you profile your client side where the most time in the code is spent?
How big are the messages that you are receiving from your EMQX instance?
Is the EMQX broker on the same network as your system or in the web?
Is any other MQTT client able to keep up with your demand on your system?

@bajinder
Copy link

Hi @Aaronontheweb,
We had the same issue with MQTTNet but our payloads were much larger. We completely gave up on MQTTNet library altogether at a point. But then came across this #1160. Even though your payload size is only 10KB but from memory I remember looking into the MQTTNet code and the default buffer for TCP server is much smaller. I would suggest increasing the buffer size as mentioned in the linked issue and see if it helps.

@Aaronontheweb
Copy link
Author

Great advice - thank you

@Aaronontheweb
Copy link
Author

@bajinder so this did not help me - pumped the receive buffer up from the default of 8kb to 1mb. Still petered out around the same point.

Tried doing some work using the LowLevelClient as well - the numbers look a tad better there but I haven't finished reasoning about all of the fault-tolerance that needs to be done there to keep the connection relatively stable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants