Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery does not pickup class based tasks #3744

Closed
george-silva opened this issue Jan 7, 2017 · 11 comments
Closed

Celery does not pickup class based tasks #3744

george-silva opened this issue Jan 7, 2017 · 11 comments

Comments

@george-silva
Copy link

I'm having a few problems with the new version of Celery. It won't discover class based tasks.

Here' my conf and versions:

Versions

  • Django 1.10
  • Celery: 4.0.2

Installed celery with pip install -U celery then installed redis, pip install redis.

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

# celery.py
# coding: utf-8
"""Configuração inicial do Celery"""
from __future__ import absolute_import
from django.conf import settings
import os
from celery import Celery
from scout.settings.utils import get_env_variable
try:
    setts = get_env_variable('DJANGO_SETTINGS_MODULE')
except:
    setts = 'scout.settings.local'
os.environ.setdefault('DJANGO_SETTINGS_MODULE', setts)

app = Celery('scout')
app.config_from_object('django.conf:settings', namespace='CELERY')
# i have tried this line below with different setups. using lambda, without anything, etc.
app.autodiscover_tasks([a for a in settings.INSTALLED_APPS])

Tasks:

### workflow/tasks.py ###

class BaseTask(Task):

    ignore_result = False
    validation_class = ''
    name = ''
    description = ''

    def run(self, *args, **kwargs):
        controller_id = kwargs.pop('cid', None)
        next_id = kwargs.pop('nsi', None)
        self._load_data(controller_id, next_id)
        return self._run()

    def _load_data(self, controller_id, next_id):

        self.controller = StateController.objects.get(id=controller_id)
        self.previous = self.controller.current_state
        self.next = State.objects.get(id=next_id)

    def _run(self):

        return True

### addresses/tasks.py ###

# coding: utf-8
from workflow.tasks import BaseTask
from celery import current_app


@current_app.task
def foo(x, y):
    return x + y


class AddressMatching(BaseTask):
    name = 'Address Matching'
    description = '''Matches address from sources A and B and constructs
a list of Address Matches for other analysis and manual review.'''
    public = True

    def _run(self, *args, **kwargs):
        print 'address matching'
        return True


class CanonicalMatching(BaseTask):
    name = 'Canonical Matching'
    description = '''Matches addresses from Address Matches
with canonical source of addresses. These canonical sources might be
addresses from official government sources of ADDRESSES (not POIs)
and other sources, like OpenStreetMap'''
    public = True

    def _run(self, *args, **kwargs):
        print 'canon matching'
        return True

Basically I defined a master base task class, that makes sense for us. It does some setup and it will depend on the same stuff. I've imported this BaseTask class and ran the worker.

Steps to reproduce

  1. Setup the project somewhat like this;
  2. Run worker celery -A scout worker -l info

Output:

[tasks]
  . addresses.tasks.foo
  . scout.tasks.email_async

[2017-01-07 21:00:28,811: INFO/MainProcess] Connected to redis://localhost:6379/0
[2017-01-07 21:00:28,819: INFO/MainProcess] mingle: searching for neighbors
[2017-01-07 21:00:29,833: INFO/MainProcess] mingle: all alone
[2017-01-07 21:00:29,845: WARNING/MainProcess] /home/george/projetos/.virtualenvs/scout/local/lib/python2.7/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2017-01-07 21:00:29,845: INFO/MainProcess] celery@winterfell ready.

Expected behavior

The class based tasks should also be picked up by celery. When I try to delegate a task to Celery, it will refuse it, because it's not registered. Note that the regular function based task is properly picked up by celery.

[2017-01-07 21:01:31,922: ERROR/MainProcess] Received unregistered task of type 'Address Matching'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
'[[], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (77b)
Traceback (most recent call last):
  File "/home/george/projetos/.virtualenvs/scout/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
    strategy = strategies[type_]
KeyError: 'Address Matching'

@ddemid
Copy link

ddemid commented Jan 8, 2017

I believe you have to use current_app.Task as a base class to make it registered.

@george-silva
Copy link
Author

@ddemid hello! Thanks for the help. but no good :(

I've tried two things:

  1. Inherited from base Task class on one of my concrete Tasks;
  2. Changed the BaseTask to inherit from current_app.Task;

Like so:

# coding: utf-8
from celery import Task
from workflow.tasks import BaseTask
from celery import current_app


@current_app.task
def foo(x, y):
    return x + y


@current_app.task
def address_matching(x,y):
    pass


class AddressMatching(Task):
    name = 'Address Matching'
    description = '''Matches address from sources A and B and constructs
a list of Address Matches for other analysis and manual review.'''
    public = True

    def _run(self, *args, **kwargs):
        print 'address matching'
        return True

class BaseTask(current_app.Task):

    ignore_result = False
    validation_class = ''
    name = ''
    description = ''

    def run(self, *args, **kwargs):
        controller_id = kwargs.pop('cid', None)
        next_id = kwargs.pop('nsi', None)
        self._load_data(controller_id, next_id)
        return self._run()

    def _load_data(self, controller_id, next_id):

        self.controller = StateController.objects.get(id=controller_id)
        self.previous = self.controller.current_state
        self.next = State.objects.get(id=next_id)

    def _run(self):

        return True

Still, none of these tasks have been picked up. The tasks that have been pickedup are function based.

[tasks]
  . addresses.tasks.address_matching
  . addresses.tasks.foo
  . scout.tasks.email_async

@george-silva
Copy link
Author

Just to clarify: I need these to be class based tasks, because there is currently some discovery behavior that I need to support.

The user can choose these tasks and they have some "special" attributes that I need to support. The validation_class, name and description, are used by Django Rest Framework.

@rickwargo
Copy link
Contributor

rickwargo commented Jan 9, 2017

I have had the same problem. You'll need to explicitly register each task. For example, after defining AddressMatching and CanonicalMatching, call:

current_app.tasks.register(AddressMatching())
current_app.tasks.register(CanonicalMatching())

Refer to @ask's comment to #3645.

@george-silva
Copy link
Author

@rickwargo where are you registering those? How did you configured the CELERY_IMPORTS or the autodiscover?

I've tried to do the registering myself, but no dice.

@rickwargo
Copy link
Contributor

Not using autodiscover. I'm registering after I define the class. Here's an example of a class-based task I use where I am using fragments() as a polymorphic method and defining the list per class.

I am using a custom router (maps between lib.task and myapp) and my imports contain: 'lib.task.extract.discovery',.

from lib.task.extract.base import ExtractTask
from myapp.celery import celery_app


class ExtractDiscoveryTask(ExtractTask):
    name = 'myapp.discovery.extract'

    def fragments(self, site):
        return site.discovery_fragments

celery_app.tasks.register(ExtractDiscoveryTask())

@george-silva
Copy link
Author

Oh, now we're talking :D.

I've managed to this. We already had a loader that check all of those tasks (BaseTask) children. So I just added the new register to that method.

There is a lot of boilerplate to add these tasks to models in DB, but something like this should work for a global registry.

Check this:

class AvailableTaskLoader(object):

    def _get_subclasses(self):
        task_dict = {}
        for app in settings.INSTALLED_APPS:
            try:
                mod = __import__('%s.%s' % (app, 'tasks'))
            except:
                continue
            members = inspect.getmembers(mod.tasks, predicate=lambda x: inspect.isclass(x) and issubclass(x, BaseTask))
            for m in members:
                # {'foo.bar.Task': 'class <foo.bar.Task>'}
                task_dict['{0}.{1}'.format(m[1].__module__, m[0])] = m[1]
        return task_dict

    def load(self):
        subcls = self._get_subclasses()
        for cls_name, cls in subcls.items():

            current_app.tasks.register(cls)

            if not hasattr(cls, 'public') or not cls.public:
                continue
            full_name = '{0}.{1}'.format(cls.__module__, cls.__name__)
            name = cls.name if cls.name else cls.__name__
            description = cls.description if cls.description else cls.__doc__
            try:
                obj, created = AvailableTask.objects.get_or_create(name=name,
                                                                   klass=full_name,
                                                                   description=description)
                logger.info('AvailableTask %s created successfully.', full_name)
            except Exception as ex:
                logger.warning('Error while creating %s. %s',
                               full_name,
                               ex.message)

        self._prune()

    def _prune(self):
        '''removes all the unecessary tasks'''
        tasks = set([member for member in self._get_subclasses().keys()])
        existing = set(AvailableTask.objects.all().values_list('klass', flat=True))
        stale = existing - tasks
        for s in stale:
            at = AvailableTask.objects.get(klass=s)
            at.transition_tasks.all().delete()
            at.delete()

Here is the celery start output:

[tasks]
  . Address Matching
  . Base Task
  . Canonical Matching
  . Change State
  . scout.tasks.email_async

👏 👏 👏

@ask perhaps we should add this to the docs? Before migrating to 4.0.2 (I don't remember which version I was using) these tasks were automatically found.

@acidjunk
Copy link

acidjunk commented Feb 9, 2017

Please add it to the docs; I lost a lot of time today while upgrading to celery 4.0.2.

We use class based tasks a lot in our project as we have some basic functionality in them (sort of a Mixin) and all tasks inherit from it; before 4.0.2 it autodiscovered these class based tasks just fine.

Maybe off topic:
There are several blogs that write about a best practices with class based task, but it seems like Celery 4 kind of drop support for these? I found this in the docs:
"This change also means that the abstract attribute of the task no longer has any effect." here
Should we stop using (abstract) class based tasks?

I'll have some time next week, to write a short tutorial or to write a chapter in the docs about it.

@ryanhiebert
Copy link
Contributor

After messing around with this quite a bunch, it's clear to me that you're not intended to use class-based tasks anymore. Always use the decorator approach instead.

I even wrote up a custom loader so that I could keep it the old way, following the instructions in the docs (app.register_task(MyTask())), but then I can't run it unless I've manually set the name on the class, which I was trying to avoid.

Moral of the story: Celery 4 is time to bite the bullet and stop using class-based tasks. It turns out they are confusing to use anyway, since the task instance lives longer than you're likely to expect. If you need a class for you functionality, create a separate class that the task uses instead.

@georgepsarakis
Copy link
Contributor

Perhaps #3874 is related.

@auvipy
Copy link
Member

auvipy commented Dec 19, 2017

closing in favor of #3874 hope it's fixed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants