-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flow control of TCP socket handling a large number of requests #297
Comments
That's a good point, and the behavior you're observing is consistent
with the behaviors I'd analytically expect from aiocoap.
Before we rush to fixes, let's consider the solution space. It mostly
boils down to making pressure propagate back to the clients for them to
deal with it. We'd have options:
1. We could not propagate backpressure, but leave messages in the network
buffer. Requests exceeding some number of worker tasks would only
consume buffer space, and not be turned into (much larger) parsed
messages and workers. This would mitigate the problem a bit, but
(especially under load test conditions) not solve it.
(This, by the way, gives me extra appreciation for asyncio -- if you
hit the limits of how many workers one can handle, I don't want to
think about what that'd mean if we spawned threads...)
2. We could leverage TCP; given you mentioned that protocol, it may be
what you have in mind. We'd use Python's stream APIs to stop the
new-bytes events until our workers are fewer. There are two problems
with it, though:
2a. It doesn't work the same way for UDP (it would for WebSockets,
though).
2b. It really only moves the problem to the client side, at least when
aiocoap is involved. (Which may be worth its own issue). An aiocoap
TCP client will happily keep writing to the TCP socket, and ignore
the very signals aiocoap as a server doesn't send either. If the
client uncontrolledly sends requests (ie. without awaiting the
completion of at least some of them), it will rack up application
requests (as it does now), but in addition have its TCP write buffer
grow indefinitely.
3. We could leverage CoAP error codes to turn down requests
intermittently. 5.03 (Service Unavailable) and 4.29 (Too Many
Requests) come to mind, each with an appropriate Max-Age.
Effectively, the context would keep track of its total of worker
tasks, and if that exceeds some number, start rejecting requests.
(Which of them to pick will depend on a more thorough reading of
RFC8516, and on whether one client is sending many requests, or many
clients are sending requests).
I think this one is preferable because it works for all transports. It
still has two downsides:
3a. Unlike the TCP backpressure, it has no "OK, now hit me
again" mechanism. The Max-Age needs to be timed suitably. I'm pretty
sure there's either a simple published algorithm or one we can come
up with on the spot (anything fast and O(1) that estimates the rate
of rejected requests).
3b. In aiocoap there is no automated mechanism on the client side yet to
retry failed requests if the error is transient. That means that
once a server starts doing any of this, currently the application
needs retry logic.
I'd guess that this is not too much of a problem in your load
testing case.
Would the last version work for you case? I guess that if you're doing
load testing, you also have an actual application in there. Whether the
server guided back-off works well will depend on what you're sending: If
you're sending individual events and expect each one to be eventually
delivered, you might still run into limitations at some point (if
aiocoap performance is a bottleneck, let's talk, but your original post
sounds to me like whatever the resource handler does is the slow part).
If it's all just clients that eventually reach a happy state (like,
they're all fetching an update), things should be fine. If the clients
are sending events that can be lumped together, it may actually be
beneficial if the application does the back-off, for if an initial
request to send one event fails, by the time the back-off period
expires, it might have several events to send in a single request (which
would be handled more efficiently later), as long as there is still a
mechanism for graceful degradation.
|
Mmm, I also thought about handling this on the CoAP level, but the client side sends out the requests asynchronously. So, there are 40k requests send out, before it could react on the servers missing ACK. Currently, I made a hotfix, that creates the render tasks in another thread and blocks the transport callback with a semaphore, until we are fine to process new requests. I tried to block the main task of the loop, but didn't know how to achieve this. Certainly, this fix does not work for UDP transports, since it breaks the more complex CON handling and confuses the message handler completely because it uses the wrong loop. And I have a real bad feeling about it for sustainability reasons. I was wondering that there is now handling in the asyncio transports, since other projects should have the same problems. It seems data_received is called whenever there is free processing time and there is no pause_reading for a bi-directional transport. |
OK, an open-loop CoAP client is brutal, but I guess that's what you're going for in a load test (effectively simulating DoSing clients). How come there are missed ACKs in play, I thought you're going CoAP-over-TCP? The pause_reading would be approach number 2 in the above list (now numbered; sorry, GitHub completely breaks formatting when dealing with mail messages) -- exert backpressure on the TCP side. This keeps the load off the system under attack, but the corresponding mechanism on the UDP side is just dropping the packets, and there we're off the ideal path. We'll have to somehow limit how many worker tasks are under way, however that'll be handled in the transports. Both making the underlying transport block (if it can) and responding with errors will achieve this, ... let's see what desired behaviors are. To determine them: What's the limiting factor in your case? (Not that we shouldn't address all, but let's focus on what is needed). Is it just the resources of the handlers, or wouldn't get the error responses out as fast as the requests come in? (That'd indicate an asymmetric uplink). On the implementation side, the point where I'd start addressing this is TokenManager, which should not push things arbitrarily into incoming_requests but reject things once that's over a configured size. A CoAP level rejection would be easy because it can just do that; a transport level rejection is trickier because a) it needs new API between the transports and the TokenManager, and b) it'd need to catch incoming_requests shrinking and unthrottle connections one by one. |
How come there are missed ACKs in play, I thought you're going CoAP-over-TCP? << I meant the response, not an ACK. Sorry for confusion. So, that an error response (like 4.29) would not be evaluated on client side. The pause_reading would be approach number 2 in the above list (now numbered; sorry, GitHub completely breaks formatting when dealing with mail messages) -- exert backpressure on the TCP side. This keeps the load off the system under attack, but the corresponding mechanism on the UDP side is just dropping the packets, and there we're off the ideal path. << I think there is no pause_reading on those transports. I think it would be easier from application side to throttle the complete protocol, otherwise on a system level, it could be nice to slow down only single transports or sockets, to block only the issuer and having something like DOS attacks in mind. << Dropping UDP packets should be not problem in my opinion, since the CoAP layer would catch this and trigger a retransmission. We'll have to somehow limit how many worker tasks are under way, however that'll be handled in the transports. Both making the underlying transport block (if it can) and responding with errors will achieve this, ... let's see what desired behaviors are. << I read, that the buffer size of the received buffer could be set to zero, since this will be send out to the client in TCP to send no more packets. For normal, the asyncio transports shall handle this automatically. Furthermore, the TCP response with this information could be send out to late, since the response needs to be processed before. Maybe an empty packet could be send out, but I am not so deep into TCP. To determine them: What's the limiting factor in your case? (Not that we shouldn't address all, but let's focus on what is needed). Is it just the resources of the handlers, or wouldn't get the error responses out as fast as the requests come in? (That'd indicate an asymmetric uplink). << Our problem is the increasing memory due to async tasks. The more messages we send, the faster it grows. In a real world scenario we've got 6 MB per second. Our memory exceeds after 2 hours stress. On the implementation side, the point where I'd start addressing this is TokenManager, which should not push things arbitrarily into incoming_requests but reject things once that's over a configured size. A CoAP level rejection would be easy because it can just do that; a transport level rejection is trickier because a) it needs new API between the transports and the TokenManager, and b) it'd need to catch incoming_requests shrinking and unthrottle connections one by one. << In my approach, it worked well to pause the thread in which the transport runs. Is there no way in an event loop to wait for it's tasks? |
I've looked it up in the aiohttp implementation. They are using pause_reading for the flow control. It is implemented here: https://github.com/aio-libs/aiohttp/blob/master/aiohttp/base_protocol.py#L40 They set up a queue for the data flow, which regulates the stream.. I tested it with a minimal example and it seems to work out well:
|
Hi, I modified the following context member functions to get a TCP flow control done. I don't know if this is also applicable for UDP. Furthermore, I don't know if this makes sense for UDP at all, since UDP makes no sense if you have to manage high data rates and a full load on a server. It makes the server process a lot smoother. class ControlledCoapContext(Context):
'''Context with task limitation.'''
_DEFAULT_ASYNC_TASK_LIMIT = 50
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._throttled_transports = set()
self._async_task_limit = self._DEFAULT_ASYNC_TASK_LIMIT
def _resume_processing(self, task, observation_request):
'''Called after request processing.
- Remove task from running renderings
- Check if transport could resume
'''
self._running_renderings.remove(task)
if self._throttled_transports:
if len(self._running_renderings) < self._async_task_limit:
for transport in self._throttled_transports:
transport.resume_reading()
self._throttled_transports.clear()
if observation_request:
self._async_task_limit -= 1
def render_to_plumbing_request(self, plumbing_request):
'''
Replace render_to_plumbing_request to limit tasks.
'''
task = self.loop.create_task(
self._render_to_plumbing_request(plumbing_request))
self._running_renderings.add(task)
observation_request = plumbing_request.request.opt.observe == 0
if observation_request:
# Consider observers in task limiter
self._async_task_limit += 1
if len(self._running_renderings) > self._async_task_limit:
if isinstance(plumbing_request.request.remote, TcpConnection):
# pylint: disable=protected-access
transport = plumbing_request.request.remote._transport
transport.pause_reading()
self._throttled_transports.add(transport)
remove_task = functools.partial(self._resume_processing, task, observation_request)
task.add_done_callback(lambda result, cb=remove_task: cb()) We're still on 0.4.1 at the moment and plan to migrate to 0.4.5 recently. In 0.4.5 this belongs into run_driving_pipe(*). |
Hello there,
we use a CoAP TCP server in our embedded Linux application for an internal task handler and event queue. It figured out during stress tests, that when some additional tasks run asynchronous to the requests processing, that the transport pumps in messages faster, than the application can process. It ends up with a lot of rendering tasks created by the context and that the memory growth until it is exceeded.
It might be an edge case for an IoT device, but are there any ways to slow down the transport from application side? Or any other mechanisms to limit the task creation?
With kind regards
Chris
The text was updated successfully, but these errors were encountered: