Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending high volume of packets in short amount of time causes packets dropping #2060

Open
Fjf opened this issue May 1, 2024 · 5 comments
Open
Labels

Comments

@Fjf
Copy link

Fjf commented May 1, 2024

I'm typically sending small messages, but in some instances a large amount of data has to be sent over the socket.
For this, to get around the maximum packet size, I chunked up the input bytes into chunks from the sender side, and stitch then up again on the receiver side.
For small amount of chunks (~10-ish), this works fine and without any problems.
For a large amount of chunks (>40), the receiver starts losing packets.

Below the client-side code, create_chunks creates json objects which contain the data, the total amount of chunks in this initial packet, and the chunk_id, so I know which packets I have received.

    def put(self, packet: Packet):
        data = packet.to_pickle()
        import time
        for chunk in create_chunks(data):
            print("Emitting packet with id", chunk["chunk_id"], len(pickle.dumps(chunk)))
            self.client.emit("io", pickle.dumps(chunk))

Server side code waits for data and stitches together.

    @self.sio.on("io")
    def io(data):
        self._intermediate_data_buffer.append(pickle.loads(data))
        logging.info(f'Receiving packet with id {self._intermediate_data_buffer[-1]["chunk_id"]}')
        n_chunks = self._intermediate_data_buffer[0].get("length")
        if n_chunks == len(self._intermediate_data_buffer):
            recreated_data = b''.join(
                x.get("data") for x in sorted(self._intermediate_data_buffer, key=lambda d: d.get("chunk_id"))
            )
            pkt = Packet.from_pickle(recreated_data)
            self._intermediate_data_buffer.clear()
    
            if self._on_packet_callback is not None:
                self._on_packet_callback(pkt)
            else:
                self.queue.put(pkt)

Logs from client side:
Emitting packet with id 0 524351
Emitting packet with id 1 524351
...
Emitting packet with id 265 524352
Emitting packet with id 266 219229

Logs on server side:
2024-05-01 19:09:43,249 [INFO] Receiving packet with id 0
2024-05-01 19:09:43,249 [INFO] Receiving packet with id 1
...
2024-05-01 19:09:43,260 [INFO] Receiving packet with id 38
2024-05-01 19:09:43,260 [INFO] Receiving packet with id 39

I can 'get around' this problem by adding a time.sleep(0.2) inbetween packet sending on the client side. Ideally I do not delay the sending of packets unnecessarily. Though this does indicate the problem has to do with some kind of internal buffer filling up.

The only thing I found on the flask-socketio documentation regarding message size regards a single message's size, and not the total aggregated size of all messages in the buffer.
max_http_buffer_size – The maximum size of a message when using the polling transport. The default is 1,000,000 bytes.

@miguelgrinberg
Copy link
Owner

First of all, you can change the maximum packet size. You've mentioned this yourself, you can use max_http_buffer_size.

What happens in the client application after the loop emitting all the chunks ends?

@Fjf
Copy link
Author

Fjf commented May 1, 2024

After the client is done sending all chunks it emits one more message, then does a disconnect, after which the process ends.
As for increasing the packet size, I don't really want to have a max packet size in the GB range, though if this is the only way I will try that.

@miguelgrinberg
Copy link
Owner

Well, that is likely the problem. If you are sending a lot of data it may take a while for the background tasks for flush everything out. If you disconnect the WebSocket then some data may still be waiting to go out. Three suggestions:

  1. Have the server disconnect the client after it has received all the packets. The client will just wait until it gets disconnected. There is a sio.wait() for this.
  2. Have the server acknowledge that it has received all the packets by sending a final message to the client. The client can disconnect when it receives this event.
  3. Use the callback feature to receive acknowledgements for all sent packets. The client can disconnect after it has received callbacks for all the packets it sent out (or maybe just the last one, since packets are sent in order).

@Fjf
Copy link
Author

Fjf commented May 2, 2024

The callback method works for me.
If you think the disconnect should not block until queued messages are flushed, this would not be a bug and the issue can be closed.

Thanks!

Update:
I've tried two methods:

  • spam the server with emitted chunks and wait for all acks before shutting down
  • wait for ack per emitted chunk before sending the next one

The first method seems to disconnect the client because the 'ping' messages are not coming through anymore due to the large backed-up queue.
I base this on the following CRITICAL log message:
[2024-05-02 09:48:47 +0200] [31955] [CRITICAL] WORKER TIMEOUT (pid:31957)
Second method works, though throughput is quite low due to the required back-and-forth per message.

As sending a large amount of data is not done often, it is fine for this application, but ideally it would not timeout the alive pings due to the message volume.

@miguelgrinberg
Copy link
Owner

miguelgrinberg commented May 2, 2024

If you think the disconnect should not block until queued messages are flushed

There's two ways to send events, emit() and call(). The former schedules the event and returns immediately, the latter only returns when the message was sent and acknowledged by the other side (which is implemented using callbacks as well). So this is working as designed.

I forgot to mention using call() above, that would be a fourth option you have, but of course if you do call() for all your events it may take you a bit longer to send everything out because there will be no buffering, each event will be sent and call() will block until the other side acknowledges the event.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants