-
-
Notifications
You must be signed in to change notification settings - Fork 4.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Celery worker using 100% CPU around epoll w/ prefork+SQS but still consuming tasks #5299
Comments
Hello again. The latest updates don't seem to help. Currently we are at
The whole issue is very similar to #1845 only that we are using SQS as the broker. The problem seems to be caused by I did some further investigation with
It fails to do the
so The FDs there like
Without knowing the |
You are now bringing us to a conclusion we can use to debug this further. If the exception complains about a file descriptor which does not exist, maybe we could simply remove it from the readers/writers list. So maybe this exception should not be silently passed and ignored. |
Thanks for your support. In kombu's def unregister(self, fd):
try:
self._epoll.unregister(fd)
except (socket.error, ValueError, KeyError, TypeError):
raise
except (IOError, OSError) as exc:
raise
if getattr(exc, 'errno', None) not in (errno.ENOENT, errno.EPERM):
raise so that in def _unregister(self, fd):
try:
self.poller.unregister(fd)
except (AttributeError, KeyError, OSError):
logger.exception("pls debug me, fd = %s", fd, extra={'fd_debug': fd}) with many e.g. the message was
|
The We can either:
|
Can you please check if #5499 helps in any way? |
i will try it out. |
Unfortunately it does not help. Depending on the setting ( https://github.com/celery/celery/blob/master/celery/concurrency/asynpool.py#L735 edit: |
I don't understand why. |
So for the def on_poll_start():
if outbound and len(busy_workers) < len(all_inqueues):
# print('ALL: %r ACTIVE: %r' % (len(all_inqueues),
# len(active_writes)))
inactive = diff(active_writes)
[hub_add(fd, None, WRITE | ERR, consolidate=True)
for fd in inactive]
else:
for fd in diff(active_writes):
aw = list(self._active_writes)
ai = list(self._all_inqueues)
result = hub_remove(fd)
extra = {
'fd': fd,
'result': repr(result),
'active_writes': aw,
'all_inqueues': ai,
}
logger.warn('removed fd %r result %r aw %r ai %r', fd, result, aw, ai, extra=extra) I'm logging Then I started the worker with Even before the restarting took place, my logs were full of this
The frequency and CPU are not high yet. But as soon as restarting takes place and no more tasks arrive, this goes crazy to 7000 logs per 5 seconds. From start to end, the |
So, celery/celery/concurrency/asynpool.py Line 1083 in e7ae429
8 back in there.
|
So we do not remove the process from the pool after the worker recycles itself? |
Whenever we start a process we append the worker to the pool: I don't see anyone calling for Could this be the problem? |
i am not that sure anymore, if that is the overall culprit. i mean, after all, some code must be responsible for the cleanup to be called so frequently. So i moved up the traceback to place some logging into the hub's loop in https://github.com/celery/kombu/blob/master/kombu/asynchronous/hub.py#L301: …
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
logger.info('new loop run with poll_timeout %r', poll_timeout)
if readers or writers:
… Let me show you, how I analyze this log message now with kibana:
To me it seems, that something is short circuiting the loop and that's why the CPU goes to 100%. However, I have no idea how to effectively debug this further. There are so many |
I'm not sure, I understand. Do you want me to verify, if |
I analysed the loop with lots of log statements, but all i can say, is that it most of the time ran into https://github.com/celery/kombu/blob/master/kombu/asynchronous/hub.py#L362, which is normal i guess. So there are just way to many events, that don't seem to disappear. Just as in my original post:
|
Yes. |
Is it possible the celery code already has a comment describing this could happen? https://github.com/celery/celery/pull/5499/files#diff-c80fd9891efbbe68275a133d83cd22a4L456
|
Can you share the command you run @tuky for launching the celery worker for this bug report? Is it possible you are using heartbeats and don't have --without-mingle --without-gossip ? |
Here was an original commit back when Ask was trying to solve this for Celery 3 and he related it back to epoll ca57e72
So turns out this is an epoll quirk, and the solution is to duplicate the pipe fd |
Thank you for your investigations. Here you go @matteius python3.6 -m celery worker --loglevel=INFO --app=foo.worker -Ofair --concurrency=1 --logfile=/var/log/celery/foo.log --pidfile=/var/run/celery/foo.pid --hostname=foo@bar Can you tell from that, whether I'm using heart beats? At least, I am not using the |
@tuky It would be worth a try for the sake of science, though I suspect now after reading more of the code last night that the issue is inherent to the SQS support and/or just how it uses epoll. I started working on a code change similar to my other PR that was merged and linked in here earlier that would also try and accept these other file descriptor points in the Async pool -- hoping to spend some more time on it later today but it will take time to understand everything going on as there is a lot. Any chance you can work on a TDD style unit test that reproduces the issue of the ENOENT in someway? Also note (since you asked): my experience with heart beats is on the RabbitMQ AMQP broker where I believe the broker is setup to send heatbeats and in that case you have to have the worker respond within the heatbeat interval or it drops the connection -- we run this way and with the flags -without-gossip --without-mingle because those protocols were way too chatty causing some performance bugs back on Celery 3. |
I tried out running with I will try to come up with a test similar to the test of https://github.com/celery/celery/pull/5499/files, but probably will fail, because I know too few details 🤞 |
…rk (issue #5299) (#5604) * make astimezone call in localize more safe make astimezone call in localize more safe; with tests * Refactor the safety check for FDs and reuse it to add safety to on_poll_start. * Also handle the FileNotFoundError the same way and make log message generic. * cleanup pydocstyle audit failures. * Check in a unit test and a bug fix to my prior commit and another enhanement error check. * more unit testing made me realize I had misused pop in my prior work here. * Ahha -- when I refactored I made it so the source data could be a list or dict and did not fully realize it -- fix it so it can do both. * clean up edge cases with more test cases. * fixed my test * Fix test for py2 * Thre has got to be a better way. * Update with PR feedback, flake8 and pydocstyle. * one more spot that would benefit from iterate_file_descriptors_safely * optimize refactor -- dedupe double check of conditional and reduce lines in my implementation. * Expand the test coverage of this PR * Refactor the safety check for FDs and reuse it to add safety to on_poll_start. * Also handle the FileNotFoundError the same way and make log message generic. * cleanup pydocstyle audit failures. * Check in a unit test and a bug fix to my prior commit and another enhanement error check. * more unit testing made me realize I had misused pop in my prior work here. * Ahha -- when I refactored I made it so the source data could be a list or dict and did not fully realize it -- fix it so it can do both. * clean up edge cases with more test cases. * fixed my test * Fix test for py2 * Update with PR feedback, flake8 and pydocstyle. * one more spot that would benefit from iterate_file_descriptors_safely * optimize refactor -- dedupe double check of conditional and reduce lines in my implementation. * Expand the test coverage of this PR
Is this fixed already? Is there any work around for it? I think I'm having the same/similar issue without "worker_max_tasks_per_child". It seems the pipe between the workers is gone and it get stuck. I'm running python3.7-alpine with:
It is a simple task which has a single parameter which is an UUID. max_retries is set to None and retry_backoff=2 with task_acks_late=True and task_acks_on_failure_or_timeout=False. The task in my case always raises an exception to keep the message on the queue for testing purposes.
(as you can see here the message repeat it self forever.) |
@tuky I have been dealing with a similar setup: celery with SQS as the broker. We notice that when a worker terminates, the new worker comes online but CPU usage spikes to 100%. After a lot of trial and error, downgrading python worked and CPU spikes disappeared. (Base docker image changed from |
|
I had tried with python:3.6.8-slim (docker) and celery==4.2.2 but celery still running with the same issue when worker reload after consuming max_task_per_child. |
The PyPI infrastructure began seeing this behavior when upgrading from We're using SQS with |
With
After some stracing, I can see that in my case, the file which breaks epoll is not a pipe between processes, but a socket to SQS. So maybe the cause is unrelated to that one which the author originally had. As I see, epoll breaks right after closing the SQS socket (and kombu does not delete the socket from epoll, and as we all already know, it is wrong). When we start our main loop, Pycurl establishes Keepalive HTTPS connection to SQS. Then Kombu sets a periodic routine, which sends http request Without |
I met this issue about a year ago and managed to work around it in an extremely dumb but an extremely effective way: just go to |
would you mind sending a PR on kombu? |
would you mind checking the proposed fix in kombu? celery/kombu#1189 |
this is exciting news. will check this next month, if it fixes everything for us, too. |
…rk (issue celery#5299) (celery#5604) * make astimezone call in localize more safe make astimezone call in localize more safe; with tests * Refactor the safety check for FDs and reuse it to add safety to on_poll_start. * Also handle the FileNotFoundError the same way and make log message generic. * cleanup pydocstyle audit failures. * Check in a unit test and a bug fix to my prior commit and another enhanement error check. * more unit testing made me realize I had misused pop in my prior work here. * Ahha -- when I refactored I made it so the source data could be a list or dict and did not fully realize it -- fix it so it can do both. * clean up edge cases with more test cases. * fixed my test * Fix test for py2 * Thre has got to be a better way. * Update with PR feedback, flake8 and pydocstyle. * one more spot that would benefit from iterate_file_descriptors_safely * optimize refactor -- dedupe double check of conditional and reduce lines in my implementation. * Expand the test coverage of this PR * Refactor the safety check for FDs and reuse it to add safety to on_poll_start. * Also handle the FileNotFoundError the same way and make log message generic. * cleanup pydocstyle audit failures. * Check in a unit test and a bug fix to my prior commit and another enhanement error check. * more unit testing made me realize I had misused pop in my prior work here. * Ahha -- when I refactored I made it so the source data could be a list or dict and did not fully realize it -- fix it so it can do both. * clean up edge cases with more test cases. * fixed my test * Fix test for py2 * Update with PR feedback, flake8 and pydocstyle. * one more spot that would benefit from iterate_file_descriptors_safely * optimize refactor -- dedupe double check of conditional and reduce lines in my implementation. * Expand the test coverage of this PR
Environment & Settings
Celery version:
Report:
Steps to Reproduce
Required Dependencies
Minimally Reproducible Test Case
kombu/asynchronous/hub.py
to uncomment the print statements increate_loop
worker_max_tasks_per_child: 10
)Expected Behavior
The main worker process should settle to ~1% CPU usage, after the 500 tasks were run. The print statements should be run not so often because of the
sleep
call at the end of thecreate_loop
method.Actual Behavior
The main worker hammers the CPU at constant 100% and the console output (due to the uncommented print statement) is flooded with lots (almost every microsecond) of these (never stopping):
also, the
strace
is full of these:There have been similar issues around, but they most of the time were related to
redis
. Also what's different to former issues, is that the worker still distributes / consumes new tasks. So basically, everything works, but the CPU is not idling, but hammering the loop instead. I can consistently reproduce this with my SQS worker (concurrency 5 btw), so please let me know, what else information i can gather to hunt down this problem. THX!The text was updated successfully, but these errors were encountered: