Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow control of TCP socket handling a large number of requests #297

Open
ChrisCuts opened this issue Dec 14, 2022 · 6 comments
Open

Flow control of TCP socket handling a large number of requests #297

ChrisCuts opened this issue Dec 14, 2022 · 6 comments

Comments

@ChrisCuts
Copy link
Contributor

Hello there,

we use a CoAP TCP server in our embedded Linux application for an internal task handler and event queue. It figured out during stress tests, that when some additional tasks run asynchronous to the requests processing, that the transport pumps in messages faster, than the application can process. It ends up with a lot of rendering tasks created by the context and that the memory growth until it is exceeded.

It might be an edge case for an IoT device, but are there any ways to slow down the transport from application side? Or any other mechanisms to limit the task creation?

With kind regards
Chris

@chrysn
Copy link
Owner

chrysn commented Dec 14, 2022 via email

@ChrisCuts
Copy link
Contributor Author

Mmm,

I also thought about handling this on the CoAP level, but the client side sends out the requests asynchronously. So, there are 40k requests send out, before it could react on the servers missing ACK.

Currently, I made a hotfix, that creates the render tasks in another thread and blocks the transport callback with a semaphore, until we are fine to process new requests. I tried to block the main task of the loop, but didn't know how to achieve this. Certainly, this fix does not work for UDP transports, since it breaks the more complex CON handling and confuses the message handler completely because it uses the wrong loop. And I have a real bad feeling about it for sustainability reasons.

I was wondering that there is now handling in the asyncio transports, since other projects should have the same problems. It seems data_received is called whenever there is free processing time and there is no pause_reading for a bi-directional transport.

@chrysn
Copy link
Owner

chrysn commented Dec 14, 2022

OK, an open-loop CoAP client is brutal, but I guess that's what you're going for in a load test (effectively simulating DoSing clients).

How come there are missed ACKs in play, I thought you're going CoAP-over-TCP?

The pause_reading would be approach number 2 in the above list (now numbered; sorry, GitHub completely breaks formatting when dealing with mail messages) -- exert backpressure on the TCP side. This keeps the load off the system under attack, but the corresponding mechanism on the UDP side is just dropping the packets, and there we're off the ideal path.

We'll have to somehow limit how many worker tasks are under way, however that'll be handled in the transports. Both making the underlying transport block (if it can) and responding with errors will achieve this, ... let's see what desired behaviors are.

To determine them: What's the limiting factor in your case? (Not that we shouldn't address all, but let's focus on what is needed). Is it just the resources of the handlers, or wouldn't get the error responses out as fast as the requests come in? (That'd indicate an asymmetric uplink).


On the implementation side, the point where I'd start addressing this is TokenManager, which should not push things arbitrarily into incoming_requests but reject things once that's over a configured size. A CoAP level rejection would be easy because it can just do that; a transport level rejection is trickier because a) it needs new API between the transports and the TokenManager, and b) it'd need to catch incoming_requests shrinking and unthrottle connections one by one.

@ChrisCuts
Copy link
Contributor Author

How come there are missed ACKs in play, I thought you're going CoAP-over-TCP?

<< I meant the response, not an ACK. Sorry for confusion. So, that an error response (like 4.29) would not be evaluated on client side.

The pause_reading would be approach number 2 in the above list (now numbered; sorry, GitHub completely breaks formatting when dealing with mail messages) -- exert backpressure on the TCP side. This keeps the load off the system under attack, but the corresponding mechanism on the UDP side is just dropping the packets, and there we're off the ideal path.

<< I think there is no pause_reading on those transports. I think it would be easier from application side to throttle the complete protocol, otherwise on a system level, it could be nice to slow down only single transports or sockets, to block only the issuer and having something like DOS attacks in mind.

<< Dropping UDP packets should be not problem in my opinion, since the CoAP layer would catch this and trigger a retransmission.

We'll have to somehow limit how many worker tasks are under way, however that'll be handled in the transports. Both making the underlying transport block (if it can) and responding with errors will achieve this, ... let's see what desired behaviors are.

<< I read, that the buffer size of the received buffer could be set to zero, since this will be send out to the client in TCP to send no more packets. For normal, the asyncio transports shall handle this automatically. Furthermore, the TCP response with this information could be send out to late, since the response needs to be processed before. Maybe an empty packet could be send out, but I am not so deep into TCP.

To determine them: What's the limiting factor in your case? (Not that we shouldn't address all, but let's focus on what is needed). Is it just the resources of the handlers, or wouldn't get the error responses out as fast as the requests come in? (That'd indicate an asymmetric uplink).

<< Our problem is the increasing memory due to async tasks. The more messages we send, the faster it grows. In a real world scenario we've got 6 MB per second. Our memory exceeds after 2 hours stress.

On the implementation side, the point where I'd start addressing this is TokenManager, which should not push things arbitrarily into incoming_requests but reject things once that's over a configured size. A CoAP level rejection would be easy because it can just do that; a transport level rejection is trickier because a) it needs new API between the transports and the TokenManager, and b) it'd need to catch incoming_requests shrinking and unthrottle connections one by one.

<< In my approach, it worked well to pause the thread in which the transport runs. Is there no way in an event loop to wait for it's tasks?

@ChrisCuts
Copy link
Contributor Author

I've looked it up in the aiohttp implementation. They are using pause_reading for the flow control. It is implemented here:

https://github.com/aio-libs/aiohttp/blob/master/aiohttp/base_protocol.py#L40

They set up a queue for the data flow, which regulates the stream..
https://github.com/aio-libs/aiohttp/blob/master/aiohttp/streams.py#L624

I tested it with a minimal example and it seems to work out well:

import asyncio


async def process_request(transport, data):

    await asyncio.sleep(2)
    print('process')
    transport.write(data)

class EchoServerProtocol(asyncio.Protocol):
    
    def __init__(self, loop):
    
        self.loop = loop
        self._reading_paused = False
        
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def pause_reading(self) -> None:
        if not self._reading_paused and self.transport is not None:
            try:
                self.transport.pause_reading()
            except (AttributeError, NotImplementedError, RuntimeError):
                pass
            self._reading_paused = True

    def resume_reading(self) -> None:
        if self._reading_paused and self.transport is not None:
            try:
                self.transport.resume_reading()
            except (AttributeError, NotImplementedError, RuntimeError):
                pass
            self._reading_paused = False

    def data_received(self, data):

        self.loop.create_task(process_request(self.transport, data))
        
        if len(asyncio.all_tasks()) % 100 == 0:
            self.pause_reading()
            print(len(asyncio.all_tasks()), 'Tasks', flush=True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        lambda: EchoServerProtocol(loop),
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()

asyncio.run(main())

@ChrisCuts
Copy link
Contributor Author

ChrisCuts commented Feb 23, 2023

Hi,

I modified the following context member functions to get a TCP flow control done. I don't know if this is also applicable for UDP. Furthermore, I don't know if this makes sense for UDP at all, since UDP makes no sense if you have to manage high data rates and a full load on a server. It makes the server process a lot smoother.

class ControlledCoapContext(Context):
    '''Context with task limitation.'''

    _DEFAULT_ASYNC_TASK_LIMIT = 50

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._throttled_transports = set()
        self._async_task_limit = self._DEFAULT_ASYNC_TASK_LIMIT

    def _resume_processing(self, task, observation_request):
        '''Called after request processing.

        - Remove task from running renderings
        - Check if transport could resume
        '''
        self._running_renderings.remove(task)

        if self._throttled_transports:
            if len(self._running_renderings) < self._async_task_limit:
                for transport in self._throttled_transports:
                    transport.resume_reading()
                self._throttled_transports.clear()

        if observation_request:
            self._async_task_limit -= 1

    def render_to_plumbing_request(self, plumbing_request):
        '''
        Replace render_to_plumbing_request to limit tasks.
        '''
        task = self.loop.create_task(
            self._render_to_plumbing_request(plumbing_request))

        self._running_renderings.add(task)

        observation_request = plumbing_request.request.opt.observe == 0
        if observation_request:
            # Consider observers in task limiter
            self._async_task_limit += 1

        if len(self._running_renderings) > self._async_task_limit:
            if isinstance(plumbing_request.request.remote, TcpConnection):
                # pylint: disable=protected-access
                transport = plumbing_request.request.remote._transport
                transport.pause_reading()
                self._throttled_transports.add(transport)

        remove_task = functools.partial(self._resume_processing, task, observation_request)

        task.add_done_callback(lambda result, cb=remove_task: cb())

We're still on 0.4.1 at the moment and plan to migrate to 0.4.5 recently. In 0.4.5 this belongs into run_driving_pipe(*).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants