Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery worker using 100% CPU around epoll w/ prefork+SQS but still consuming tasks #5299

Closed
tuky opened this issue Jan 22, 2019 · 45 comments · Fixed by celery/kombu#1189
Closed

Comments

@tuky
Copy link

tuky commented Jan 22, 2019

Environment & Settings

Celery version:

Report:

software -> celery:4.2.0 (windowlicker) kombu:4.2.2-post1 py:3.6.6
            billiard:3.5.0.5 sqs:N/A
platform -> system:Linux arch:64bit, ELF
            kernel version:3.13.0-139-generic imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:sqs results:disabled

broker_url: 'sqs://localhost//'
include: [...]
worker_hijack_root_logger: False
task_serializer: 'json'
result_expires: 3600
accept_content: ['json']
result_serializer: 'json'
timezone: 'Europe/Berlin'
enable_utc: True
broker_transport_options: {
    'polling_interval': 1,
    'region': 'eu-west-1',
    'visibility_timeout': 10860}
task_ignore_result: True
task_acks_late: True
worker_prefetch_multiplier: 1
worker_max_tasks_per_child: 10
worker_pool: 'celery.concurrency.prefork:TaskPool'
task_time_limit: 10800
worker_enable_remote_control: False
worker_send_task_events: False
task_default_queue: 'celery'

Steps to Reproduce

Required Dependencies

  • Minimal Python Version: 3.6
  • Minimal Broker Version: N/A or Unknown
  • Minimal Result Backend Version: N/A or Unknown
  • Minimal OS and/or Kernel Version: : N/A or Unknown

Minimally Reproducible Test Case

  • edit kombu/asynchronous/hub.py to uncomment the print statements in create_loop
  • fire ~500 very short tasks to run (so the main worker process has to exit and start new workers because of worker_max_tasks_per_child: 10)

Expected Behavior

The main worker process should settle to ~1% CPU usage, after the 500 tasks were run. The print statements should be run not so often because of the sleep call at the end of the create_loop method.

Actual Behavior

The main worker hammers the CPU at constant 100% and the console output (due to the uncommented print statement) is flooded with lots (almost every microsecond) of these (never stopping):

WARNING:celery.redirected: [[[HUB]]]: (31)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-32, started daemon)>)->R!, (20)on_result_readable(20)->R!, (34)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-33, started daemon)>)->R!, (16)on_result_readable(16)->R!, (42)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-34, started daemon)>)->R!, (24)on_result_readable(24)->R!, (53)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-35, started daemon)>)->R!, (28)on_result_readable(28)->R!, (38)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-36, started daemon)>)->R!, (12)on_result_readable(12)->R!, (45)on_readable(45)->R!, (45)on_writable(45)->W (2019-01-22 17:09:04,502; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [EVENTS]: (GONE)(48)->R, (GONE)(46)->R, (GONE)(44)->R! (2019-01-22 17:09:04,503; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [[[HUB]]]: (31)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-32, started daemon)>)->R!, (20)on_result_readable(20)->R!, (34)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-33, started daemon)>)->R!, (16)on_result_readable(16)->R!, (42)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-34, started daemon)>)->R!, (24)on_result_readable(24)->R!, (53)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-35, started daemon)>)->R!, (28)on_result_readable(28)->R!, (38)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-36, started daemon)>)->R!, (12)on_result_readable(12)->R!, (45)on_readable(45)->R!, (45)on_writable(45)->W (2019-01-22 17:09:04,505; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [EVENTS]: (GONE)(48)->R, (GONE)(46)->R, (GONE)(44)->R! (2019-01-22 17:09:04,506; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [[[HUB]]]: (31)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-32, started daemon)>)->R!, (20)on_result_readable(20)->R!, (34)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-33, started daemon)>)->R!, (16)on_result_readable(16)->R!, (42)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-34, started daemon)>)->R!, (24)on_result_readable(24)->R!, (53)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-35, started daemon)>)->R!, (28)on_result_readable(28)->R!, (38)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-36, started daemon)>)->R!, (12)on_result_readable(12)->R!, (45)on_readable(45)->R!, (45)on_writable(45)->W (2019-01-22 17:09:04,507; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [EVENTS]: (GONE)(48)->R, (GONE)(46)->R, (GONE)(44)->R! (2019-01-22 17:09:04,508; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [[[HUB]]]: (31)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-32, started daemon)>)->R!, (20)on_result_readable(20)->R!, (34)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-33, started daemon)>)->R!, (16)on_result_readable(16)->R!, (42)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-34, started daemon)>)->R!, (24)on_result_readable(24)->R!, (53)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-35, started daemon)>)->R!, (28)on_result_readable(28)->R!, (38)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-36, started daemon)>)->R!, (12)on_result_readable(12)->R!, (45)on_readable(45)->R!, (45)on_writable(45)->W (2019-01-22 17:09:04,509; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [EVENTS]: (GONE)(48)->R, (GONE)(46)->R, (GONE)(44)->R! (2019-01-22 17:09:04,510; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [[[HUB]]]: (31)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-32, started daemon)>)->R!, (20)on_result_readable(20)->R!, (34)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-33, started daemon)>)->R!, (16)on_result_readable(16)->R!, (42)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-34, started daemon)>)->R!, (24)on_result_readable(24)->R!, (53)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-35, started daemon)>)->R!, (28)on_result_readable(28)->R!, (38)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-36, started daemon)>)->R!, (12)on_result_readable(12)->R!, (45)on_readable(45)->R!, (45)on_writable(45)->W (2019-01-22 17:09:04,511; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [EVENTS]: (GONE)(48)->R, (GONE)(46)->R, (GONE)(44)->R! (2019-01-22 17:09:04,513; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)

also, the strace is full of these:

epoll_ctl(9, EPOLL_CTL_DEL, 43, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRNORM|EPOLLERR|0x36f80000, {u32=32711, u64=4295000007}}) = -1 ENOENT (No such file or directory)
epoll_ctl(9, EPOLL_CTL_DEL, 46, {EPOLLERR|0x36ca1800, {u32=32711, u64=4295000007}}) = -1 EBADF (Bad file descriptor)
epoll_ctl(9, EPOLL_CTL_DEL, 44, {EPOLLERR|0x36ca1800, {u32=32711, u64=4295000007}}) = -1 EBADF (Bad file descriptor)
epoll_ctl(9, EPOLL_CTL_DEL, 19, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRBAND|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT|0x1a0fd820, {u32=32711, u64=4295000007}}) = -1 ENOENT (No such file or directory)
epoll_ctl(9, EPOLL_CTL_DEL, 23, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRBAND|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT|0x1a0fd820, {u32=32711, u64=4295000007}}) = -1 ENOENT (No such file or directory)
epoll_ctl(9, EPOLL_CTL_DEL, 27, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRBAND|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT|0x1a0fd820, {u32=32711, u64=4295000007}}) = -1 ENOENT (No such file or directory)
epoll_ctl(9, EPOLL_CTL_DEL, 11, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRBAND|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT|0x1a0fd820, {u32=32711, u64=4295000007}}) = -1 ENOENT (No such file or directory)
epoll_ctl(9, EPOLL_CTL_DEL, 15, {EPOLLRDNORM|EPOLLRDBAND|EPOLLWRBAND|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT|0x1a0fd820, {u32=32711, u64=4295000007}}) = -1 ENOENT (No such file or directory)

There have been similar issues around, but they most of the time were related to redis. Also what's different to former issues, is that the worker still distributes / consumes new tasks. So basically, everything works, but the CPU is not idling, but hammering the loop instead. I can consistently reproduce this with my SQS worker (concurrency 5 btw), so please let me know, what else information i can gather to hunt down this problem. THX!

@tuky
Copy link
Author

tuky commented Apr 18, 2019

Hello again. The latest updates don't seem to help. Currently we are at

billiard==3.6.0.0
celery==4.3.0
kombu==4.5.0
django-celery-beat==1.4.0

The whole issue is very similar to #1845 only that we are using SQS as the broker. The problem seems to be caused by worker_max_tasks_per_child, because the main process goes to 100% CPU as soon as it has to restart its children because of that task limit.

I did some further investigation with python gdb. Here is a python backtrace via py-bt:

Traceback (most recent call first):
  <built-in method unregister of select.epoll object at remote 0x7fadac9b8600>
  File "$ENV/lib/python3.6/site-packages/kombu/utils/eventio.py", line 75, in unregister
    self._epoll.unregister(fd)
  File "$ENV/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 243, in _unregister
    self.poller.unregister(fd)
  File "$ENV/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 160, in _remove_from_loop
    self._unregister(fd)
  File "$ENV/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 181, in remove
    self._remove_from_loop(fd)
  File "$ENV/lib/python3.6/site-packages/celery/concurrency/asynpool.py", line 721, in <listcomp>
    [hub_remove(fd) for fd in diff(active_writes)]
  File "$ENV/lib/python3.6/site-packages/celery/concurrency/asynpool.py", line 721, in on_poll_start
    [hub_remove(fd) for fd in diff(active_writes)]
  File "$ENV/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 295, in create_loop
    tick_callback()
  <built-in method next of module object at remote 0x7fadcaf37638>
  File "$ENV/lib/python3.6/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "$ENV/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 596, in start
    c.loop(*c.loop_args())
  File "$ENV/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "$ENV/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "$ENV/lib/python3.6/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "$ENV/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "$ENV/lib/python3.6/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "$ENV/lib/python3.6/site-packages/celery/bin/worker.py", line 258, in run
    worker.start()
  File "$ENV/lib/python3.6/site-packages/celery/bin/base.py", line 252, in __call__
    ret = self.run(*args, **kwargs)
  File "$ENV/lib/python3.6/site-packages/celery/bin/worker.py", line 223, in run_from_argv
    return self(*args, **options)
  File "$ENV/lib/python3.6/site-packages/celery/bin/celery.py", line 420, in execute
    ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
  File "$ENV/lib/python3.6/site-packages/celery/bin/celery.py", line 488, in handle_argv
    return self.execute(command, argv)
  File "$ENV/lib/python3.6/site-packages/celery/bin/base.py", line 298, in execute_from_commandline
    return self.handle_argv(self.prog_name, argv[1:])
  File "$ENV/lib/python3.6/site-packages/celery/bin/celery.py", line 496, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))
  File "$ENV/lib/python3.6/site-packages/celery/bin/celery.py", line 322, in main
    cmd.execute_from_commandline(argv)
  File "$ENV/lib/python3.6/site-packages/celery/__main__.py", line 16, in main
    _main()
  File "$ENV/lib/python3.6/site-packages/celery/__main__.py", line 20, in <module>
    main()
  <built-in method exec of module object at remote 0x7fadcaf37638>
  File "/usr/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/usr/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)

It fails to do the self._epoll.unregister(fd) which relates nicely to the strace outputs I originally posted. Stepping through for a while I end up at

 240    
 241        def _unregister(self, fd):
 242            try:
 243                self.poller.unregister(fd)
 244            except (AttributeError, KeyError, OSError):
>245                pass
 246    
 247        def close(self, *args):
 248            [self._unregister(fd) for fd in self.readers]
 249            self.readers.clear()
 250            [self._unregister(fd) for fd in self.writers]

so unregister clearly raises an exception.

The FDs there like 27, 11, 15 are all pipes according to /proc/$PID/fd. There I can get the pipe FDs and according to lsof | grep $PIPE_FD they are pipes between the main process and the child processes:

15614 is the main process and 23179 e.g. is a child process:

python3.6 15614              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 15614              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18344              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18344              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18344 18346        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18344 18346        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18368              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18368              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18368 18406        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18368 18406        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18369              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18369              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18369 18382        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18369 18382        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18370              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18370              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18370 18389        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18370 18389        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18371              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18371              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18371 18391        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18371 18391        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18372              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18372              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18372 18390        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18372 18390        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18373              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18373              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18373 18393        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18373 18393        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18375              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18375              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18375 18405        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18375 18405        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18376              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18376              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18376 18395        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18376 18395        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18378              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18378              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18378 18403        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18378 18403        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18380              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18380 18400        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18383              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18383              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18383 18404        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18383 18404        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18384              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18384              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18384 18424        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18384 18424        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18386              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18386              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18386 18420        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18386 18420        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18387              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18387              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18387 18423        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18387 18423        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18396              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18396              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18396 18426        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18396 18426        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18397              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18397              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18397 18422        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18397 18422        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18399              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18399              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 18399 18425        ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 18399 18425        ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 23122              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 23122              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe
python3.6 23179              ubuntu   39r     FIFO                0,8      0t0   49209822 pipe
python3.6 23179              ubuntu   40w     FIFO                0,8      0t0   49209822 pipe

Without knowing the kombu / billiard code it seems to me, that the child processes were successfully restarted but somehow the main process does not realize this and continues to try and get rid of the old ones, which don't exist anymore.

@thedrow
Copy link
Member

thedrow commented May 13, 2019

You are now bringing us to a conclusion we can use to debug this further.
Would you mind changing the code so that the exception will be raised?

If the exception complains about a file descriptor which does not exist, maybe we could simply remove it from the readers/writers list.

So maybe this exception should not be silently passed and ignored.
I'm going to make Celery emit a log for these failures.

@tuky
Copy link
Author

tuky commented May 13, 2019

Thanks for your support.

In kombu's eventio.py I had to modify _epoll to not swallow exceptions (so above i probably did not really run into an exception. the gnome debugger probably just falsely thought, he was in that path). with this i could make all the repeating unregister errors visible:

    def unregister(self, fd):
        try:
            self._epoll.unregister(fd)
        except (socket.error, ValueError, KeyError, TypeError):
            raise
        except (IOError, OSError) as exc:
            raise
            if getattr(exc, 'errno', None) not in (errno.ENOENT, errno.EPERM):
                raise

so that in hub.py I could do

    def _unregister(self, fd):
        try:
            self.poller.unregister(fd)
        except (AttributeError, KeyError, OSError):
            logger.exception("pls debug me, fd = %s", fd, extra={'fd_debug': fd})

with many fd numbers repeating thousands of times within seconds.

e.g. the message was pls debug me, fd = 56 with traceback

Traceback (most recent call last):
  File ".../kombu/asynchronous/hub.py", line 243, in _unregister
    self.poller.unregister(fd)
  File ".../kombu/utils/eventio.py", line 78, in unregister
    self._epoll.unregister(fd)
FileNotFoundError: [Errno 2] No such file or directory

@auvipy auvipy added this to the 4.4.0 milestone May 13, 2019
@thedrow
Copy link
Member

thedrow commented May 14, 2019

The eventio module specifically ignores these exceptions and does not raise them.

We can either:

  • Raise the error and ensure it is raised by the _unregister method in the hub module as well. In that case we will have to handle the exception whenever _unregistered is called.
  • Check in each loop iteration which file descriptors are still alive and remove the dead ones.

@thedrow
Copy link
Member

thedrow commented May 15, 2019

Can you please check if #5499 helps in any way?

@tuky
Copy link
Author

tuky commented May 15, 2019

i will try it out.

@tuky
Copy link
Author

tuky commented May 15, 2019

Unfortunately it does not help. Depending on the setting (-Ofair vs. -Odefault) the worker still goes crazy at these lines:

https://github.com/celery/celery/blob/master/celery/concurrency/asynpool.py#L735
https://github.com/celery/celery/blob/master/celery/concurrency/asynpool.py#L742

edit:
And in the logs I find no occurrences of Encountered OSError while trying.

@thedrow
Copy link
Member

thedrow commented May 16, 2019

I don't understand why.
If we're calling hub.remove we're eventually calling
hub._discard whichsshould remove the file descriptor from both the readers and the writers.
If you add a breakpoint there, do you see the fd being removed?
Is it added again somewhere?

@tuky
Copy link
Author

tuky commented May 20, 2019

So for the -Ofair case, I modified on_poll_start like so:

            def on_poll_start():
                if outbound and len(busy_workers) < len(all_inqueues):
                    #  print('ALL: %r ACTIVE: %r' % (len(all_inqueues),
                    #                                len(active_writes)))
                    inactive = diff(active_writes)
                    [hub_add(fd, None, WRITE | ERR, consolidate=True)
                     for fd in inactive]
                else:
                    for fd in diff(active_writes):
                        aw = list(self._active_writes)
                        ai = list(self._all_inqueues)
                        result = hub_remove(fd)
                        extra = {
                            'fd': fd,
                            'result': repr(result),
                            'active_writes': aw,
                            'all_inqueues': ai,
                        }
                        logger.warn('removed fd %r result %r aw %r ai %r', fd, result, aw, ai, extra=extra)

I'm logging self._active_writes and self._all_inqueues, because diff(active_writes) seems to be equivalent to self._all_inqueues - self._active_writes.

Then I started the worker with --concurrency 1 and worker_max_tasks_per_child=10 and delayed a few tasks, so the restarting should set in quickly.

Even before the restarting took place, my logs were full of this

removed fd 8 result None aw [] ai [8]
removed fd 8 result None aw [] ai [8]
removed fd 8 result None aw [] ai [8]

The frequency and CPU are not high yet. But as soon as restarting takes place and no more tasks arrive, this goes crazy to 7000 logs per 5 seconds. From start to end, the fd 8 never changes. To me, it seems like AsynPool._all_inqueues is not properly cleaned up. Debuggin on_inqueue_close now.

@tuky
Copy link
Author

tuky commented May 20, 2019

So, on_inqueue_close is being called successfully, but everytime, at least once afterwards, on_process_alive is being called and the line

self._all_inqueues.add(proc.inqW_fd)
adds the 8 back in there.

@thedrow
Copy link
Member

thedrow commented May 20, 2019

So we do not remove the process from the pool after the worker recycles itself?

@thedrow
Copy link
Member

thedrow commented May 21, 2019

Whenever we start a process we append the worker to the pool:

https://github.com/celery/billiard/blob/265f119ec8b2944e36e3b0810578b5d615e6f987/billiard/pool.py#L1102-L1123

I don't see anyone calling for _join_exited_workers() which should clean them from the pool.

Could this be the problem?
Can you please verify?

@tuky
Copy link
Author

tuky commented May 21, 2019

i am not that sure anymore, if that is the overall culprit. i mean, after all, some code must be responsible for the cleanup to be called so frequently. So i moved up the traceback to place some logging into the hub's loop in https://github.com/celery/kombu/blob/master/kombu/asynchronous/hub.py#L301:

poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
logger.info('new loop run with poll_timeout %r', poll_timeout)
if readers or writers:
    …

Let me show you, how I analyze this log message now with kibana:

Screenshot_20190521_102310

  1. I trigger 20 tasks for 10 max tasks per child
  2. The worker works on the first 10 tasks
  3. The worker is restarting with a short period of 100% CPU and lots of loop runs
  4. The worker is working on the last 10 tasks
  5. The worker restarted again because of max tasks per child. the loop is going crazy forever
  6. Manually restarted the worker. no new tasks arrive and loop is running at a reasonable rate

To me it seems, that something is short circuiting the loop and that's why the CPU goes to 100%. However, I have no idea how to effectively debug this further. There are so many except and continue clauses inside of it.

@tuky
Copy link
Author

tuky commented May 21, 2019

Can you please verify?

I'm not sure, I understand. Do you want me to verify, if _join_exited_workers is being called?

@tuky
Copy link
Author

tuky commented May 21, 2019

I analysed the loop with lots of log statements, but all i can say, is that it most of the time ran into https://github.com/celery/kombu/blob/master/kombu/asynchronous/hub.py#L362, which is normal i guess. So there are just way to many events, that don't seem to disappear. Just as in my original post:

WARNING:celery.redirected: [[[HUB]]]: (31)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-32, started daemon)>)->R!, (20)on_result_readable(20)->R!, (34)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-33, started daemon)>)->R!, (16)on_result_readable(16)->R!, (42)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-34, started daemon)>)->R!, (24)on_result_readable(24)->R!, (53)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-35, started daemon)>)->R!, (28)on_result_readable(28)->R!, (38)_event_process_exit(<Hub@0x7fa987a1dc18: R:11 W:1>, <ForkProcess(ForkPoolWorker-36, started daemon)>)->R!, (12)on_result_readable(12)->R!, (45)on_readable(45)->R!, (45)on_writable(45)->W (2019-01-22 17:09:04,502; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)
WARNING:celery.redirected: [EVENTS]: (GONE)(48)->R, (GONE)(46)->R, (GONE)(44)->R! (2019-01-22 17:09:04,503; /home/ubuntu/.virtualenvs/unchained-worker3.6/src/celery/celery/utils/log.py:235)

@thedrow
Copy link
Member

thedrow commented Jun 3, 2019

Can you please verify?

I'm not sure, I understand. Do you want me to verify, if _join_exited_workers is being called?

Yes.
Maybe add it to the top of the loop as well...

@matteius
Copy link
Contributor

Is it possible the celery code already has a comment describing this could happen?

https://github.com/celery/celery/pull/5499/files#diff-c80fd9891efbbe68275a133d83cd22a4L456

    def _track_child_process(self, proc, hub):
        try:
            fd = proc._sentinel_poll
        except AttributeError:
            # we need to duplicate the fd here to carefully
            # control when the fd is removed from the process table,
            # as once the original fd is closed we cannot unregister
            # the fd from epoll(7) anymore, causing a 100% CPU poll loop.
            fd = proc._sentinel_poll = os.dup(proc._popen.sentinel)
        hub.add_reader(fd, self._event_process_exit, hub, proc)

@matteius
Copy link
Contributor

Can you share the command you run @tuky for launching the celery worker for this bug report? Is it possible you are using heartbeats and don't have --without-mingle --without-gossip ?

@auvipy auvipy modified the milestones: 4.4.0, 4.5 Jun 13, 2019
@matteius
Copy link
Contributor

Here was an original commit back when Ask was trying to solve this for Celery 3 and he related it back to epoll ca57e72
and in the commit he mentions he found out that:

  1. epoll_wait always returned an error state for a Popen pipe fd.
  2. the worker was trying to unregister this fd from epoll, but
  3. epoll.unregister refused to do so giving an IOError(ENOENT)
    error.

So turns out this is an epoll quirk, and the solution is to duplicate the pipe fd
so that we can carefully control when it's removed from the process
file descriptor table.

@tuky
Copy link
Author

tuky commented Jun 14, 2019

Can you share the command you run @tuky for launching the celery worker for this bug report?

Thank you for your investigations. Here you go @matteius

python3.6 -m celery worker --loglevel=INFO --app=foo.worker -Ofair --concurrency=1 --logfile=/var/log/celery/foo.log --pidfile=/var/run/celery/foo.pid --hostname=foo@bar

Can you tell from that, whether I'm using heart beats? At least, I am not using the --without-heartbeat option. Should I try running with all 3 options --without-gossip --without-mingle --without-heartbeat?

@matteius
Copy link
Contributor

matteius commented Jun 14, 2019

@tuky It would be worth a try for the sake of science, though I suspect now after reading more of the code last night that the issue is inherent to the SQS support and/or just how it uses epoll. I started working on a code change similar to my other PR that was merged and linked in here earlier that would also try and accept these other file descriptor points in the Async pool -- hoping to spend some more time on it later today but it will take time to understand everything going on as there is a lot. Any chance you can work on a TDD style unit test that reproduces the issue of the ENOENT in someway?

Also note (since you asked): my experience with heart beats is on the RabbitMQ AMQP broker where I believe the broker is setup to send heatbeats and in that case you have to have the worker respond within the heatbeat interval or it drops the connection -- we run this way and with the flags -without-gossip --without-mingle because those protocols were way too chatty causing some performance bugs back on Celery 3.

@tuky
Copy link
Author

tuky commented Jun 14, 2019

I tried out running with --without-gossip --without-mingle --without-heartbeat, but without success. @thedrow I also finally verified, that _join_exited_workers is indeed being called from time to time.

I will try to come up with a test similar to the test of https://github.com/celery/celery/pull/5499/files, but probably will fail, because I know too few details 🤞

@auvipy auvipy modified the milestones: 4.5, 4.4.0 Jun 27, 2019
auvipy pushed a commit that referenced this issue Jun 29, 2019
…rk (issue #5299) (#5604)

* make astimezone call in localize more safe

make astimezone call in localize more safe; with tests

* Refactor the safety check for FDs and reuse it to add safety to on_poll_start.

* Also handle the FileNotFoundError the same way and make log message generic.

* cleanup pydocstyle audit failures.

* Check in a unit test and a bug fix to my prior commit and another enhanement error check.

* more unit testing made me realize I had misused pop in my prior work here.

* Ahha -- when I refactored I made it so the source data could be a list or dict and did not fully realize it -- fix it so it can do both.

* clean up edge cases with more test cases.

* fixed my test

* Fix test for py2

* Thre has got to be a better way.

* Update with PR feedback, flake8 and pydocstyle.

* one more spot that would benefit from iterate_file_descriptors_safely

* optimize refactor -- dedupe double check of conditional and reduce lines in my implementation.

* Expand the test coverage of this PR

* Refactor the safety check for FDs and reuse it to add safety to on_poll_start.

* Also handle the FileNotFoundError the same way and make log message generic.

* cleanup pydocstyle audit failures.

* Check in a unit test and a bug fix to my prior commit and another enhanement error check.

* more unit testing made me realize I had misused pop in my prior work here.

* Ahha -- when I refactored I made it so the source data could be a list or dict and did not fully realize it -- fix it so it can do both.

* clean up edge cases with more test cases.

* fixed my test

* Fix test for py2

* Update with PR feedback, flake8 and pydocstyle.

* one more spot that would benefit from iterate_file_descriptors_safely

* optimize refactor -- dedupe double check of conditional and reduce lines in my implementation.

* Expand the test coverage of this PR
@tuky
Copy link
Author

tuky commented Jul 5, 2019

I invited you @matteius @thedrow and @auvipy to a private repo which is ready to use - including credentials - for an SQS queue. Thank you, I hope you can reproduce the issue with this.

@auvipy auvipy modified the milestones: 4.4.0, 4.5 Aug 23, 2019
@matheusbrat
Copy link

matheusbrat commented Nov 30, 2019

Is this fixed already? Is there any work around for it?

I think I'm having the same/similar issue without "worker_max_tasks_per_child". It seems the pipe between the workers is gone and it get stuck.

I'm running python3.7-alpine with:

celery[sqs]==4.4.0rc4 
kombu==4.6.6

It is a simple task which has a single parameter which is an UUID. max_retries is set to None and retry_backoff=2 with task_acks_late=True and task_acks_on_failure_or_timeout=False.

The task in my case always raises an exception to keep the message on the queue for testing purposes.

/code # ps aux
PID   USER     TIME  COMMAND
    1 root      0:00 {start-celery.sh} /bin/sh /code/docker/start-celery.sh
    8 root      0:01 {celery} /usr/local/bin/python /usr/local/bin/celery -A my_module worker -l debug -c1
   11 root      0:00 {celery} /usr/local/bin/python /usr/local/bin/celery -A my_module worker -l debug -c1
$ strace -fp 11 -s 10000
strace: Process 11 attached
read(8, 
$ ls -l /proc/11/fd/8
lr-x------    1 root     root            64 Nov 30 04:10 /proc/11/fd/8 -> pipe:[23027111]
# (find /proc -type l | xargs ls -l | fgrep 'pipe:[23027111]') 2>/dev/null
lr-x------    1 root     root            64 Nov 30 04:10 /proc/11/fd/8 -> pipe:[23027111]
lr-x------    1 root     root            64 Nov 30 04:10 /proc/11/task/11/fd/8 -> pipe:[23027111]
lr-x------    1 root     root            64 Nov 30 04:10 /proc/8/fd/8 -> pipe:[23027111]
l-wx------    1 root     root            64 Nov 30 04:10 /proc/8/fd/9 -> pipe:[23027111]
lr-x------    1 root     root            64 Nov 30 04:10 /proc/8/task/8/fd/8 -> pipe:[23027111]
l-wx------    1 root     root            64 Nov 30 04:10 /proc/8/task/8/fd/9 -> pipe:[23027111]
$ strace -fp 8 -s 10000
# Normal messages
epoll_ctl(7, EPOLL_CTL_ADD, 10, {EPOLLIN|EPOLLERR|EPOLLHUP, {u32=10, u64=139878494896138}}) = -1 EEXIST (File exists)
mmap(NULL, 262144, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7f3815615000
epoll_ctl(7, EPOLL_CTL_DEL, 9, 0x7ffdfeee653c) = -1 ENOENT (No such file or directory)
epoll_pwait(7, [], 1023, 666, NULL, 8)  = 0
epoll_ctl(7, EPOLL_CTL_DEL, 9, 0x7ffdfeee653c) = -1 ENOENT (No such file or directory)
sysinfo({uptime=58265, loads=[112224, 126656, 139264], totalram=24973213696, freeram=880857088, sharedram=1203023872, bufferram=2054041600, totalswap=8258580480, freeswap=7548694528, procs=2268, totalhigh=0, freehigh=0, mem_unit=1}) = 0
epoll_pwait(7, [], 1023, 67, NULL, 8)   = 0
epoll_ctl(7, EPOLL_CTL_DEL, 9, 0x7ffdfeee653c) = -1 ENOENT (No such file or directory)
epoll_pwait(7, [], 1023, 898, NULL, 8)  = 0
epoll_ctl(7, EPOLL_CTL_DEL, 9, 0x7ffdfeee653c) = -1 ENOENT (No such file or directory)
wait4(11, 0x7ffdfeee5e34, WNOHANG, NULL) = 0
epoll_pwait(7, [], 1023, 100, NULL, 8)  = 0
epoll_ctl(7, EPOLL_CTL_DEL, 9, 0x7ffdfeee653c) = -1 ENOENT (No such file or directory)
... same message

(as you can see here the message repeat it self forever.)

@auvipy auvipy modified the milestones: 4.5, 4.4.x Dec 16, 2019
@code-haven
Copy link

code-haven commented Feb 7, 2020

@tuky I have been dealing with a similar setup: celery with SQS as the broker. We notice that when a worker terminates, the new worker comes online but CPU usage spikes to 100%.

After a lot of trial and error, downgrading python worked and CPU spikes disappeared. (Base docker image changed from python:3.6(latest?) -> python:3.6.8). I understand this is not a fix, just putting it out here in case it helps anyone or with the investigation itself.

@thedrow
Copy link
Member

thedrow commented Feb 11, 2020

@tuky I have been dealing with a similar setup: celery with SQS as the broker. We notice that when a worker terminates, the new worker comes online but CPU usage spikes to 100%.

After a lot of trial and error, downgrading python worked and CPU spikes disappeared. (Base docker image changed from python:3.6(latest?) -> python:3.6.8). I understand this is not a fix, just putting it out here in case it helps anyone or with the investigation itself.
Thank you @code-haven.
Has something changed between 3.6.8 & 3.6.10 that might affect this?
This is the path to investigate.

@Sanyambansal76
Copy link

I had tried with python:3.6.8-slim (docker) and celery==4.2.2 but celery still running with the same issue when worker reload after consuming max_task_per_child.

@ewdurbin
Copy link

ewdurbin commented Apr 24, 2020

The PyPI infrastructure began seeing this behavior when upgrading from python:3.7.3-slim-stretch to python:3.8.2-slim-buster in https://github.com/pypa/warehouse/pull/7828/files.

We're using SQS with --max-tasks-per-child.

@and800
Copy link

and800 commented May 12, 2020

With python:3.8.2-slim-buster, kombu==4.6.8 and pycurl==7.43.0.2 the issue is always reproducible like this:

  • initialize celery with 1 worker and max tasks per child = 1
  • feed one task to it, so it recreates the worker
  • wait for it for about several minutes

After some stracing, I can see that in my case, the file which breaks epoll is not a pipe between processes, but a socket to SQS. So maybe the cause is unrelated to that one which the author originally had.

As I see, epoll breaks right after closing the SQS socket (and kombu does not delete the socket from epoll, and as we all already know, it is wrong). When we start our main loop, Pycurl establishes Keepalive HTTPS connection to SQS. Then Kombu sets a periodic routine, which sends http request give me some new messages over that connection. But after several minutes from the start, curl suddenly decides to close it. So when Kombu regains the control inside CurlClient._handle_socket() callback, it has an already closed file descriptor, which cannot be removed from epoll.

Without --max-tasks-per-child it would still work - epoll holds sort of weak references to file objects, so when all strong references are destroyed, the file is also deleted from epoll. But in our case new workers are spawned by forking from the main process, which means the worker gets all open file descriptors from the main one.

@and800
Copy link

and800 commented May 12, 2020

I met this issue about a year ago and managed to work around it in an extremely dumb but an extremely effective way: just go to kombu.asynchronous.hub.Hub.create_loop() and insert sleep() after every N iterations 🙂

@auvipy
Copy link
Member

auvipy commented May 12, 2020

I met this issue about a year ago and managed to work around it in an extremely dumb but an extremely effective way: just go to kombu.asynchronous.hub.Hub.create_loop() and insert sleep() after every N iterations slightly_smiling_face

would you mind sending a PR on kombu?

and800 added a commit to and800/kombu that referenced this issue May 13, 2020
@auvipy auvipy assigned and800 and unassigned georgepsarakis May 13, 2020
and800 added a commit to and800/kombu that referenced this issue May 14, 2020
and800 added a commit to and800/kombu that referenced this issue May 14, 2020
and800 added a commit to and800/kombu that referenced this issue May 14, 2020
@auvipy
Copy link
Member

auvipy commented May 15, 2020

The PyPI infrastructure began seeing this behavior when upgrading from python:3.7.3-slim-stretch to python:3.8.2-slim-buster in https://github.com/pypa/warehouse/pull/7828/files.

We're using SQS with --max-tasks-per-child.

would you mind checking the proposed fix in kombu? celery/kombu#1189

@tuky
Copy link
Author

tuky commented May 15, 2020

this is exciting news. will check this next month, if it fixes everything for us, too.

jeyrce pushed a commit to jeyrce/celery that referenced this issue Aug 25, 2021
…rk (issue celery#5299) (celery#5604)

* make astimezone call in localize more safe

make astimezone call in localize more safe; with tests

* Refactor the safety check for FDs and reuse it to add safety to on_poll_start.

* Also handle the FileNotFoundError the same way and make log message generic.

* cleanup pydocstyle audit failures.

* Check in a unit test and a bug fix to my prior commit and another enhanement error check.

* more unit testing made me realize I had misused pop in my prior work here.

* Ahha -- when I refactored I made it so the source data could be a list or dict and did not fully realize it -- fix it so it can do both.

* clean up edge cases with more test cases.

* fixed my test

* Fix test for py2

* Thre has got to be a better way.

* Update with PR feedback, flake8 and pydocstyle.

* one more spot that would benefit from iterate_file_descriptors_safely

* optimize refactor -- dedupe double check of conditional and reduce lines in my implementation.

* Expand the test coverage of this PR

* Refactor the safety check for FDs and reuse it to add safety to on_poll_start.

* Also handle the FileNotFoundError the same way and make log message generic.

* cleanup pydocstyle audit failures.

* Check in a unit test and a bug fix to my prior commit and another enhanement error check.

* more unit testing made me realize I had misused pop in my prior work here.

* Ahha -- when I refactored I made it so the source data could be a list or dict and did not fully realize it -- fix it so it can do both.

* clean up edge cases with more test cases.

* fixed my test

* Fix test for py2

* Update with PR feedback, flake8 and pydocstyle.

* one more spot that would benefit from iterate_file_descriptors_safely

* optimize refactor -- dedupe double check of conditional and reduce lines in my implementation.

* Expand the test coverage of this PR
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment