Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tasks are not allowed to start subprocesses #1709

Closed
aromanovich opened this issue Nov 29, 2013 · 68 comments
Closed

Tasks are not allowed to start subprocesses #1709

aromanovich opened this issue Nov 29, 2013 · 68 comments

Comments

@aromanovich
Copy link

Starting with Celery 3.1.0 the processes pool (celery.concurrency.prefork, former celery.concurrency.processes) uses daemon processes to perform tasks.

Daemon processes are not allowed to create child processes and, as a result, tasks that use multiprocessing package are not working:

[2013-11-29 14:27:48,297: ERROR/MainProcess] Task app.add[e5d184c0-471f-4fc4-804c-f760178d4847] raised exception: AssertionError('daemonic processes are not allowed to have children',)
Traceback (most recent call last):
  File "/Users/aromanovich/Envs/celery3.1/lib/python2.7/site-packages/celery/app/trace.py", line 218, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/Users/aromanovich/Envs/celery3.1/lib/python2.7/site-packages/celery/app/trace.py", line 398, in __protected_call__
    return self.run(*args, **kwargs)
  File "/Users/aromanovich/Projects/celery/app.py", line 10, in add
    manager = multiprocessing.Manager()
  File "/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/__init__.py", line 99, in Manager
    m.start()
  File "/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 524, in start
    self._process.start()
  File "/usr/local/Cellar/python/2.7.6/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 124, in start
    'daemonic processes are not allowed to have children'
@ask
Copy link
Contributor

ask commented Dec 2, 2013

This has not changed between 3.0 and 3.1, so I'm not sure why you would get this error now and not before.

@aromanovich
Copy link
Author

This is how can this error be reproduced.

app.py:

import multiprocessing
from celery import Celery

app = Celery(__name__, broker='amqp://192.168.33.40')
@app.task
def f():
    manager = multiprocessing.Manager()

sendtask.py:

import app

app.f.delay()

I run worker using the following command: celery worker -A app.app -l debug.

With Celery 3.0.24 task succeeds:

[2013-12-02 20:43:56,454: INFO/MainProcess] Task app.f[bcaab028-dbec-43a8-9259-ff7c35ff13d0] 
succeeded in 0.0169339179993s: None

With Celery 3.1.5 it does not:

[2013-12-02 20:48:38,946: ERROR/MainProcess] Task app.f[c9f1cdd3-ae38-493e-b7c7-b9636ed473d0] 
raised exception: AssertionError('daemonic processes are not allowed to have children',)

My understanding of the issue is the following: celery.concurrency.prefork.TaskPool uses celery.concurrency.asynpool.AsynPool; AsynPool inherits from billiard.pool.Pool which spawns daemon worker processes and AsynPool does not override this behaviour. But you're right, this scheme does not seem to be changed between 3.0 and 3.1, so I'm confused too :)

And it seems that I'm not alone with that problem: http://stackoverflow.com/questions/20149421/threads-in-celery-3-1-5

@ask
Copy link
Contributor

ask commented Dec 2, 2013

One difference is that the worker process is now a subclass of 'Process', where before it used the function argument: Process(target=), maybe there is a difference in default values for these approaches.

@ask
Copy link
Contributor

ask commented Dec 2, 2013

multiprocessing and old versions of billiard sets daemon=True:
https://github.com/celery/billiard/blob/2.7/billiard/pool.py#L904

And it's the same in the latest version:
https://github.com/celery/billiard/blob/3.3/billiard/pool.py#L1039

@ilyastam
Copy link

ilyastam commented Dec 2, 2013

I think that task process being a daemon presents a serious limitation for tasks implementation.
I wrote a task which uses multiprocessing to speed up CPU-bound operations. Everything works fine when I start a worker in a terminal as follows:

celery worker --app=tasks -Q wb -l info --concurrency=1

But when I use celeryd script to start a worker, I get this exception:
AssertionError: daemonic processes are not allowed to have children

@aromanovich
Copy link
Author

I figured out what caused the change in the behaviour.
Tasks are run using daemon processes both in 3.0 and 3.1, but until celery/billiard@4c32d2e and celery/billiard@c676b94 multiprocessing module wasn't aware of that and hence was allowing creating subprocesses.

To my understanding, there was a bug prior to version 3.1 (tasks were allowed to create subprocesses, which could result in orphaned state) and now this bug has been fixed.

@ilyastam
Copy link

ilyastam commented Dec 3, 2013

The decision to not allow python daemon processes to fork seems rather arbitrary to me. While I recognize the good faith of it, I feel like I should be able to have a full control over this behavior if I choose to.

Being bound to one process per task seems to be a serious limitation to me. Thoughts?

@ask
Copy link
Contributor

ask commented Dec 3, 2013

I wonder why that limitation is there in the first place, a warning I can understand but outright disallowing it seems silly when you are perfectly able to fork processes using other means.

@ilyastam
Copy link

ilyastam commented Dec 3, 2013

@ask, would that be possible to initialize celery worker process with daemon flag being False? Or make this configurable?

@ask
Copy link
Contributor

ask commented Dec 3, 2013

@ilyastam seems we were commenting at the same time

I agree that it seems like an arbitrary limitation, but I wish I knew the rationale behind adding it in the first place.

This is a well known pitfall in posix systems, but it's still allowed. You may clean up child processes in a signal handler, though that does not protect you against SIGKILL.

I think we should remove the limitation from billiard, even though that would diverge from the multiprocessing behavior. You can still create child processes using the subpocess module or using the low level fork call, so power users should be able to create child billiard.Process instances.

@ask
Copy link
Contributor

ask commented Dec 3, 2013

@ilyastam Should be able to remove the raise statement, don't have to make the processes "non-daemon"

That is, daemon processes will be allowed to create child processes even if it will not be able to reap them,
which is how posix works anyway.

@ask
Copy link
Contributor

ask commented Dec 3, 2013

Btw, note that this is not a raise, it's an assert statement, which will be removed if python is started with the PYTHONOPTIMIZE envvar or the -O argument.

@ask
Copy link
Contributor

ask commented Dec 3, 2013

billiard 3.3.0.11 is on PyPI including this change

@ilyastam
Copy link

ilyastam commented Dec 3, 2013

@ask thank you. Any idea what version of celery will see this improvement?

@aromanovich
Copy link
Author

multiprocessing documentation explicitly states that daemon process are not allowed to create subprocesses and explains why. As to me, this assert statement looks more like it was put here as a shortcut for raise (people often do that).

This limitation is documented and I don't think that it is a good idea for Celery to silently monkey-patch multiprocessing and take it away. It could lead to really unexpected and harmful consequences.

I can think of the following example (it may seem a bit contrived, though):

@app.task
def f():
    p = multiprocessing.Pool(3)
    p.map_async(time.sleep, [1000, 1000, 1000])

Being run as a plain Python function, this code works correctly. But being run as a Celery task (using Celery version 3.0.*), it leaves three subprocesses that will hang forever; when the Celery worker quits, these subprocesses will become orphaned.

@ask
Copy link
Contributor

ask commented Dec 3, 2013

It doesn't explain why, it just states the unix behavior that you would expect when starting a child-child process. Even though it's an infamous limitation in unix it doesn't stop people from doing it. This is no different from
starting a subprocess.Popen process, or even calling fork() to start a new process. So why should it be illegal?

The way to do your example:

from billiard import Pool
from multiprocessing.util import Finalize

_finalizers = []

@app.task
def f():
    p = billiard.Pool(3)
    _finalizers.append(Finalize(p, p.terminate))
   try:
       p.map_async(time.sleep, [1000, 1000, 1000])
       p.close()
       p.join()
   finally:
       p.terminate()

To kill (-9) this you would have to also kill -9 the child processes, but that is something you will have
to consider for all unix processes.

Not that I advocate creating a Pool for every task, but I don't see why users, who know what they're
doing, shouldn't be allowed to do start processes from a task.

Also, we don't monkey patch anything this is a change in billiard only.

@aromanovich
Copy link
Author

Also, we don't monkey patch anything this is a change in billiard only.

By "monkey patching" I mean this assignment, which replaces multiprocessing._current_process with an instance of billiard.process.Process: https://github.com/celery/billiard/blob/master/billiard/process.py#L53.

I agree that there is nothing wrong with starting child-child processes if they are handled right (like in your example). My point is that multiprocessing is not written that way and we should not ignore it's implementation limitations.

@ask
Copy link
Contributor

ask commented Dec 9, 2013

@aromanovich It cannot be written any other way, it's not a limitation of multiprocessing it's a limitation of unix.

It sets _current_process so that the logging modules processName format variable works, and the billiard process object has the same API as the multiprocessing process object so it's safe to set the current process.

@ask
Copy link
Contributor

ask commented Dec 9, 2013

And btw, you would have to use billiard for the limitation to be lifted, using multiprocessing will still raise the exception.

@ghost
Copy link

ghost commented Jul 23, 2014

Could also fix this issue using this approach:
http://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
Which would allow users to continue using the multiprocessing module, avoiding this issue:
celery/billiard#99

@frodopwns
Copy link

I get this error when calling a @parallel fabric task from within a celery task.

@celery.task
def dostuff():
   execute(fabfile.push_settings, sid=site['sid'])

@parallel
@roles(environment)
def push_settings(sid):
  #do stuff

@xiaods
Copy link

xiaods commented Jan 26, 2015

@frodopwns use ENV
export PYTHONOPTIMIZE=1
to remove this assert. you need handle all things.

@frodopwns
Copy link

@xiaods I think I solved that issue with something like this:

@worker_process_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
    Crypto.Random.atfork()

@ostrokach
Copy link

Problem

I have a task which calculates some data and loads a scikit-learn classifier to make predictions based on that data. When I run the task by itself, everything is OK, but when I run it using Celery, I get an error when the task attempts to load the pickled classifier:

[2015-07-17 21:23:51,299: ERROR/MainProcess] Task app.f[329d0da4-2e0e-4e1f-8148-d64f47750b1f] raised unexpected: AttributeError("'Worker' object has no attribute '_config'",)
Traceback (most recent call last):
  File "/home/username/anaconda3/lib/python3.4/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/username/anaconda3/lib/python3.4/site-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/username/working/playground/celery/app.py", line 11, in f
    clf = pickle.load(open('clf.pickle', 'rb'))
  File "/home/username/anaconda3/lib/python3.4/site-packages/sklearn/ensemble/__init__.py", line 6, in <module>
    from .base import BaseEnsemble
  File "/home/username/anaconda3/lib/python3.4/site-packages/sklearn/ensemble/base.py", line 13, in <module>
    from ..externals.joblib import cpu_count
  File "/home/username/anaconda3/lib/python3.4/site-packages/sklearn/externals/joblib/__init__.py", line 112, in <module>
    from .parallel import Parallel
  File "/home/username/anaconda3/lib/python3.4/site-packages/sklearn/externals/joblib/parallel.py", line 23, in <module>
    from ._multiprocessing_helpers import mp
  File "/home/username/anaconda3/lib/python3.4/site-packages/sklearn/externals/joblib/_multiprocessing_helpers.py", line 25, in <module>
    _sem = mp.Semaphore()
  File "/home/username/anaconda3/lib/python3.4/multiprocessing/context.py", line 81, in Semaphore
    return Semaphore(value, ctx=self.get_context())
  File "/home/username/anaconda3/lib/python3.4/multiprocessing/synchronize.py", line 127, in __init__
    SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
  File "/home/username/anaconda3/lib/python3.4/multiprocessing/synchronize.py", line 59, in __init__
    kind, value, maxvalue, self._make_name(),
  File "/home/username/anaconda3/lib/python3.4/multiprocessing/synchronize.py", line 117, in _make_name
    return '%s-%s' % (process.current_process()._config['semprefix'],
AttributeError: 'Worker' object has no attribute '_config'

To reproduce

Create an empty classifier and save it as a pickle:

import pickle
from sklearn.ensemble import GradientBoostingClassifier
clf = GradientBoostingClassifier()
pickle.dump(clf, open('clf.pickle', 'wb'))

Create a simple app (app.py):

import pickle
import sklearn
from celery import Celery

app = Celery(__name__, broker='amqp://localhost//')

@app.task
def f():
    print('hello')
    clf = pickle.load(open('clf.pickle', 'rb'))
    print(clf)

Start the celery worker:

celery -A app worker --loglevel=debug

Run the app:

python -c "from app import f; f.delay()"

Error message:

...
AttributeError: 'Worker' object has no attribute '_config'

Solution

I think there should be an option to "monkeypatch" Celery to allow tasks to start sub-processes, especially if such a "feature" existed in the past. Right now, people are simply moving away to other frameworks when they encounter this problem: http://stackoverflow.com/questions/27904162/using-multiprocessing-pool-from-celery-task-raises-exception. Here is another example of this error: http://stackoverflow.com/questions/22674950/python-multiprocessing-job-to-celery-task-but-attributeerror.

This issue should be re-opened...

@hxzhao527
Copy link

hxzhao527 commented Aug 25, 2017

Fortunately,I find this issue when I am trying to run ansible playbook in Celery task.
The method provided by @martinth has not worked for me. I print the current_process()._config and get
{'authkey': b"y&e\x8d'\xcb\xd4\r\xd2\x86\x06\xe7\x9e\x14\xaf \xbc\xc4\x95\xa5G\xec&[i\x19\xf3G-\x06\xac\x19", 'semprefix': '/mp', 'daemon': True} .
Then I reassign field daemon to False, and it works.

Is there some solutions or other implementetion to allow running multiprocess in task?

@VisionUnchange
Copy link

VisionUnchange commented Sep 13, 2017

@HeartUnchange : recently we are hard working on a big data project, that we wish to use celery as the distributed component. and with your guide ,we are so lucky to solve the problem. see the task configuration:

     @app.task
    def handleBigZipfile(filename,nid):
    current_process()._config['daemon'] = False
    logger.info('{} begin handle!'.format(filename))
    handleAll(filename,nid)
     logger.info('{} is done!'.format(filename))

The solution is ok! we begin the project at 2017.1 and now the prototype is finished! nine months have passed! I own my thanks to you ! and my thanks is beyond expression!
would you please describe more about how you figure out the problem! we are eager to know that !

@induser
Copy link

induser commented Sep 29, 2017

Hi ,

I have a pretty standard set-up: Django + Rabbitmq + celery-4.0.2 + python-2.7 + centOS-7

I am trying to spawn a process using standard python multiprocessing module in celery.

Daemon processes are not allowed to create child processes and, as a result, tasks that use multiprocessing package are not working:
Command used to run : celery worker -B -A celery_task -l debug
Traceback Logs:


[2017-09-26 23:27:08,838: WARNING/PoolWorker-2] ERROR
[2017-09-26 23:27:08,839: WARNING/PoolWorker-2] Traceback (most recent call last):
[2017-09-26 23:27:08,839: WARNING/PoolWorker-2] File "/home/induser/config.py", line 612, in main
[2017-09-26 23:27:08,840: WARNING/PoolWorker-2] mylog_process = mp.Process(target=test_logger_process, args=(myqueue,))
[2017-09-26 23:27:08,840: WARNING/PoolWorker-2] File "/usr/lib64/python2.7/multiprocessing/process.py", line 98, in __init__
[2017-09-26 23:27:08,841: WARNING/PoolWorker-2] self._authkey = _current_process._authkey
[2017-09-26 23:27:08,841: WARNING/PoolWorker-2] AttributeError: 'Process' object has no attribute '_authkey'

What could be the reason for not spawning the process?
Here is the code:

import multiprocessing as mp
from celery.schedules import crontab
from celery.decorators import periodic_task

@periodic_task(run_every=crontab(minute='*/1'), name='test_process_celery')
def main():
data = config_read()
try:
    myqueue = mp.Queue(-1)
    mylog_process = mp.Process(target=test_logger_process, args=(myqueue,))
    mylog_process.start()
    . . .
    . . .
except Exception as e:
    raise
finally:
    mylog_process.join()

Thanks you.

@auvipy
Copy link
Member

auvipy commented Dec 19, 2017

try master and report if it's still the issue

@Kurara
Copy link

Kurara commented Mar 13, 2018

It still has the error. I tried to use a subprocess with:

from multiprocessing import Process, Value
import ctypes

[...]
        result = Value('i', 0)
        text = Value(ctypes.c_char_p, fail_string.encode())
        p = Process(target=reader.find_text_async, args=(result, text, ))
        p.start()
        p.join(5)

        if p.is_alive():
            logging.WARNING("Manual terminating the process 'find_text_async'")
            p.terminate()

but with celery master branch it sais:

File "/usr/lib/python3.5/multiprocessing/process.py", line 103, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children

EDIT

I changed multiprocessing with billiard and it works!

from billiard import Process, Value

@celery celery locked as resolved and limited conversation to collaborators Mar 13, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests