Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job is executed before "depends_on" job is finished when using Job.create and q.enqueue_job #1404

Closed
jtfidje opened this issue Jan 27, 2021 · 46 comments · Fixed by #1837
Closed

Comments

@jtfidje
Copy link
Contributor

jtfidje commented Jan 27, 2021

Consider the following snippet below and two running workers:

r = Redis(
    host="localhost",
    port=6379,
    db=0
)

q = Queue(connection=r)

id_1 = str(uuid4())

q.enqueue(func_1, job_id=id_1)
q.enqueue(func_2, depends_on=id_1)

In that snippet, everything works as expected; both jobs are enqueued ( one on each worker ) but func_2 does not run before func_1 is finished.

Now consider the following snippet, still with two workers:

r = Redis(
    host="localhost",
    port=6379,
    db=0
)

q = Queue(connection=r)

id_1 = str(uuid4())
id_2 = str(uuid4())

job_1 = Job.create(func_1, id=id_1, connection=r) 
job_2 = Job.create(func_2, id=id_2, depends_on=id_1, connection=r)
q.enqueue_job(job_1)
q.enqueue_job(job_2)

When I run that code both jobs are started immediately, despite setting the depends_on argument.



Mini bonus question

Why is it necessary to set connection= on the jobs when I use Job.create? Doesn't the Queue instance know what Redis connection to use already? Code fails without me setting connection on all the jobs as well.

@jtfidje jtfidje changed the title Job is executed before "depends_on" job is finished Job is executed before "depends_on" job is finished when using Job.create and q.enqueue_job Feb 2, 2021
@selwin
Copy link
Collaborator

selwin commented Feb 9, 2021

This PR adds multi dependency feature to RQ proper. You should try this instead.

@selwin selwin closed this as completed Feb 9, 2021
@jtfidje
Copy link
Contributor Author

jtfidje commented Feb 9, 2021

Thanks for answering!

I've looked through the PR, but I cant see how those changes would fix this problem? The only way I managed to fix this was to create a custom Queue class and implement the following function as a replacement for the initial enqueue_job call:

def enqueue_custom(
        self, job: CustomJob, pipeline=None, at_front: bool = False
    ) -> CustomJob:

        # If a _dependent_ job depends on any unfinished job, register all the
        # _dependent_ job's dependencies instead of enqueueing it.
        #
        # `Job#fetch_dependencies` sets WATCH on all dependencies. If
        # WatchError is raised in the when the pipeline is executed, that means
        # something else has modified either the set of dependencies or the
        # status of one of them. In this case, we simply retry.
        job.set_status(FlowStatus.QUEUED)

        if job._dependency_id:
            with self.connection.pipeline() as pipe:
                while True:
                    try:

                        pipe.watch(job.dependencies_key)

                        dependencies = job.fetch_dependencies(watch=True, pipeline=pipe)

                        pipe.multi()

                        for dependency in dependencies:
                            if (
                                dependency.get_status(refresh=False)
                                != FlowStatus.FINISHED
                            ):
                                job.set_status(FlowStatus.DEFERRED, pipeline=pipe)
                                job.register_dependency(pipeline=pipe)
                                job.save(pipeline=pipe)
                                job.cleanup(ttl=job.ttl, pipeline=pipe)
                                pipe.execute()
                                return job

                        break
                    except WatchError:
                        continue
        return super().enqueue_job(job, pipeline, at_front=at_front)

You have this if-block inside enqueue_call as well, and I couldn't get dependencies to work without it.

@selwin
Copy link
Collaborator

selwin commented Feb 9, 2021

With the PR merged, you can do queue.enqueue(my_func, depends_on=[job_1, job_2])

@jtfidje
Copy link
Contributor Author

jtfidje commented Feb 9, 2021

Yes I understand ( and really like this feature! )
But, in the docs you give this example:

from rq.job import Job

job = Job.create(count_words_at_url, 'http://nvie.com')
print('Job id: %s' % job.id)
q.enqueue_job(job)

# create a job with a predetermined id
job = Job.create(count_words_at url, 'http://nvie.com', id='my_job_id')

This way of doing it fits my needs very well, but if I do this job dependencies does not work. If this is by design then of course I will have to use q.enqueue( ... ).

Edit:
My functions ( i.e. count_words_at_url ) are loaded dynamically from a configuration file, and are therefore attached to my Job class at runtime.

@thomasmatecki
Copy link
Contributor

I think this issue should be reopened as it appears to still occur(new failing test):

def test_enqueue_job_with_dependency_defers_dependent(self):

I agree with @jtfidje that this is not related multiple dependencies, though a few folks documented it in #1170.

The crux of the issue is that enqueue_job does not check dependencies at all.

@LukasBommes
Copy link

In encountered the issue as well and would love to see a fix.

@GitTiago
Copy link

It seems this issue still stands
Job creation takes the depends_on parameter but enqueue_job does nothing with it.
This is in direct contradiction with the docs and the test @thomasmatecki mentioned

@selwin
Copy link
Collaborator

selwin commented Feb 23, 2022

queue.enqueue_job() just enqueues the job without doing any dependencies check, it's designed for that. Is there a reason why you don't use queue.enqueue() instead?

@thomasmatecki
Copy link
Contributor

I think the behavior of enqueue_job is at issue here. It is counterintuitive that enqueue_job would not respect a job's dependencies while enqueue and enqueue_call do. We could document this, but it does seem like an inconsistent API (allowing a job to be created with dependencies, then ignoring them in enqueue_job).

What do folks think of changing the behavior of enqueue_job to be consistent with enqueue? Keep in mind this is potentially a breaking change for some users.

@caffeinatedMike
Copy link
Contributor

caffeinatedMike commented Feb 25, 2022

@selwin I fully agree and support @thomasmatecki's point with aligning enqueue_job to be consistent with enqueue and enqueue_call. This is also preventing the use of enqueue_many due to its reliance on enqueue_job in the list-generator for jobs.

My reason for constructing the Job instances beforehand is so that my tasks (which I submit in successive batches) are queued as close together as possible. I also prefer Job.create due to its object-oriented approach. I can create my batch of jobs and sit on them for a sec while I do a few other pre-flight actions, then submit them (currently in rapid succession via enqueue_job). It wasn't until I stumbled upon this thread that I realized my dependencies were not behaving in an orderly fashion. So, I'm stuck waiting for a fix (hopefully this can be address in this issue).

@caffeinatedMike
Copy link
Contributor

caffeinatedMike commented Feb 25, 2022

Wouldn't moving the following snippet from enqueue_call into a standalone function and applying it to all the above methods provide consistency across all the aforementioned methods of the API and resolve this issue?

    job = self.setup_dependencies(
        job,
        pipeline=pipeline
    )
    # If we do not depend on an unfinished job, enqueue the job.
    if job.get_status(refresh=False) != JobStatus.DEFERRED:
        return self.enqueue_job(job, pipeline=pipeline, at_front=at_front)
    return job

@selwin selwin reopened this Feb 25, 2022
@selwin
Copy link
Collaborator

selwin commented Feb 25, 2022

The intended design for enqueue_job() was to just push a job into a queue. I do understand the confusion though, in hindsight we should have made this into an internal API _enqueue_job().

I'm ok with refactoring enqueue_job() to support all the functionalities supported by enqueue() provided that:

  1. The APIs remain backward compatible
  2. All the enqueue logic are centralized in one place to keep things tidy.

I'm open to accepting a PR for this. In the meantime, I can update the docs to clarify what enqueue_job() does and does not do.

@jtfidje
Copy link
Contributor Author

jtfidje commented Feb 25, 2022

@selwin I fully agree and support @thomasmatecki's point with aligning enqueue_job to be consistent with enqueue and enqueue_call. This is also preventing the use of enqueue_many due to its reliance on enqueue_job in the list-generator for jobs.

My reason for constructing the Job instances beforehand is so that my tasks (which I submit in successive batches) are queued as close together as possible. I also prefer Job.create due to its object-oriented approach. I can create my batch of jobs and sit on them for a sec while I do a few other pre-flight actions, then submit them (currently in rapid succession via enqueue_job). It wasn't until I stumbled upon this thread that I realized my dependencies were not behaving in an orderly fashion. So, I'm stuck waiting for a fix (hopefully this can be address in this issue).

I "fixed" this by creating a custom class that inherits from the RQ one and then just patch the function with the dependency code. I've done this for a couple of things so I run my own custom Worker, Queue and Job classes 🙂

@caffeinatedMike
Copy link
Contributor

@jtfidje Would you consider contributing the "fix" via a PR? I'm sure it would be greatly appreciated by all those affected by this issue, myself included.

@jtfidje
Copy link
Contributor Author

jtfidje commented Feb 26, 2022

Sure :) Let me try to take what @selwin said into account and see what I can come up with.

@jtfidje
Copy link
Contributor Author

jtfidje commented Feb 26, 2022

@selwin I'm having troubles setting up my environment in order to run tests. Could you please go over the docs and see if something has changed / is missing?

@selwin
Copy link
Collaborator

selwin commented Feb 27, 2022

What errors did you run into?

@jtfidje
Copy link
Contributor Author

jtfidje commented Feb 27, 2022

What errors did you run into?

I followed the docs and did this:

vagrant init ubuntu/trusty64
vagrant up
vagrant ssh -- "sudo apt-get -y install redis-server python-dev python-pip"
vagrant ssh -- "sudo pip install --no-input redis hiredis mock"
vagrant ssh -- "(cd /vagrant; ./run_tests)"

Now I guess that run_tests has to be copied in from somewhere, but if I run vagrant ssh -- "(cd /vagrant; ./run_tests)" it fails cause the script does not exist.

@caffeinatedMike
Copy link
Contributor

caffeinatedMike commented Mar 1, 2022

@jtfidje I'm attempting a fix for this as well (here's a link to my branch). I used the two tests mentioned earlier in this thread with my changes, but the second test still fails. For some reason the second job gets stuck in the DEFERRED state.

Would you be up to having a look? @selwin your input is welcome and appreciated as well.

pytest test_queue.py::TestQueue::test_enqueue_job_with_dependency_executes_both

________________________________________________________________ TestQueue.test_enqueue_job_with_dependency_executes_both ________________________________________________________________ 

self = <tests.test_queue.TestQueue testMethod=test_enqueue_job_with_dependency_executes_both>

    def test_enqueue_job_with_dependency_executes_both(self):
        """Jobs are enqueued only when their dependencies are finished."""
        from redis import Redis
        q = Queue(connection=Redis.from_url("redis://"))

        job_a = Job.create(func=say_hello, connection=q.connection)
        job_a.save()
        job = q.enqueue_job(job_a)
        self.assertEqual(job.get_status(), JobStatus.QUEUED)

        job_b = Job.create(func=say_hello, connection=q.connection, depends_on=job_a)
        job_b.save()
        job = q.enqueue_job(job_b)

        from rq.worker import SimpleWorker
        w = SimpleWorker([q], connection=q.connection)
        w.work(burst=True)

        self.assertEqual(job_a.get_status(), JobStatus.FINISHED)
>       self.assertEqual(job_b.get_status(), JobStatus.FINISHED)
E       AssertionError: 'deferred' != <JobStatus.FINISHED: 'finished'>

test_queue.py:543: AssertionError
================================================================================ short test summary info ================================================================================= 
FAILED test_queue.py::TestQueue::test_enqueue_job_with_dependency_executes_both - AssertionError: 'deferred' != <JobStatus.FINISHED: 'finished'>
=================================================================================== 1 failed in 0.47s ====================================================================================

@caffeinatedMike
Copy link
Contributor

@jtfidje How have your efforts faired?

@jtfidje
Copy link
Contributor Author

jtfidje commented Mar 8, 2022

Hi sorry! I haven't looked at it after I couldn't get the tests to run. Maybe you could walk me through your setup and how you run the tests? That would be of great help :)

@caffeinatedMike
Copy link
Contributor

caffeinatedMike commented Mar 11, 2022

@selwin I believe (I have yet to confirm though) this might be a bigger issue than originally thought. You state

queue.enqueue_job() just enqueues the job without doing any dependencies check, it's designed for that. Is there a reason why you don't use queue.enqueue() instead?

Which is a troubling thought when reviewing the Scheduler's logic for enqueueing jobs once they reach their set time. Going off of what you stated previously, does this mean that jobs with dependencies won't respect/wait on their dependencies when scheduled?

@caffeinatedMike
Copy link
Contributor

caffeinatedMike commented Mar 11, 2022

@jtfidje I opted to utilize WSL on windows to develop. It's much simpler. Just as a heads-up, you'll always have 2 failing tests when running tests through WSL Ubuntu (I'm using 18.04). These can be safely ignored as they pass when run by GitHub Actions. See below:

(linux-venv) ubuntu@MICHAELH-ZB:/mnt/c/users/mhill/pycharmprojects/rq$ pytest tests
======================================================================== test session starts =========================================================================
platform linux -- Python 3.6.9, pytest-7.0.1, pluggy-1.0.0
rootdir: /mnt/c/users/mhill/pycharmprojects/rq
plugins: cov-3.0.0
collected 341 items

tests/test_callbacks.py .........                                                                                                                              [  2%]
tests/test_cli.py ..............................                                                                                                               [ 11%]
tests/test_commands.py ...                                                                                                                                     [ 12%]
tests/test_connection.py .....                                                                                                                                 [ 13%]
tests/test_decorator.py ...............                                                                                                                        [ 18%]
tests/test_fixtures.py ..                                                                                                                                      [ 18%]
tests/test_helpers.py .                                                                                                                                        [ 19%]
tests/test_job.py ...............................................................................                                                              [ 42%]
tests/test_queue.py ..................................................                                                                                         [ 56%]
tests/test_registry.py ...........................                                                                                                             [ 64%]
tests/test_retry.py ......                                                                                                                                     [ 66%]
tests/test_scheduler.py ...............s...                                                                                                                    [ 72%]
tests/test_sentry.py ...                                                                                                                                       [ 73%]
tests/test_serializers.py .                                                                                                                                    [ 73%]
tests/test_timeouts.py .                                                                                                                                       [ 73%]
tests/test_utils.py ...............                                                                                                                            [ 78%]
tests/test_worker.py ..............ss..................s.s..................sssss..FsF.ss...                                                                   [ 98%]
tests/test_worker_registration.py ....                                                                                                                         [100%]
============================================================================== FAILURES ==============================================================================
________________________________________________________ TestWorkerSubprocess.test_run_scheduled_access_self _________________________________________________________

self = <tests.test_worker.TestWorkerSubprocess testMethod=test_run_scheduled_access_self>

    @skipIf('pypy' in sys.version.lower(), 'often times out with pypy')
    def test_run_scheduled_access_self(self):
        """Schedule a job that schedules a job, then run the worker as subprocess"""
        q = Queue()
        job = q.enqueue(schedule_access_self)
        subprocess.check_call(['rqworker', '-u', self.redis_url, '-b'])
        registry = FinishedJobRegistry(queue=q)
>       self.assertTrue(job in registry)
E       AssertionError: False is not true

tests/test_worker.py:1281: AssertionError
------------------------------------------------------------------------ Captured stdout call ------------------------------------------------------------------------
18:54:33 Worker rq:worker:cf5d63964c7f4b98b9018086b4ebc374: started, version 1.10.1
18:54:33 Subscribing to channel rq:pubsub:cf5d63964c7f4b98b9018086b4ebc374
18:54:33 *** Listening on default...
18:54:33 Cleaning registries for queue: default
18:54:33 default: tests.test_worker.schedule_access_self() (29441f48-7551-4b82-bc9d-a2f81b2d858c)
18:54:33 Worker rq:worker:cf5d63964c7f4b98b9018086b4ebc374: done, quitting
18:54:33 Unsubscribing from channel rq:pubsub:cf5d63964c7f4b98b9018086b4ebc374
------------------------------------------------------------------------ Captured stderr call ------------------------------------------------------------------------
WARNING:root:Import error for 'tests.test_worker'
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/utils.py", line 141, in import_attribute
    module = importlib.import_module(module_name)
  File "/usr/lib/python3.6/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 994, in _gcd_import
  File "<frozen importlib._bootstrap>", line 971, in _find_and_load
  File "<frozen importlib._bootstrap>", line 955, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "./tests/test_worker.py", line 22, in <module>
    import pytest
ModuleNotFoundError: No module named 'pytest'
18:54:33 Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/worker.py", line 1061, in perform_job
    rv = job.perform()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 821, in perform
    self._result = self._execute()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 844, in _execute
    result = self.func(*self.args, **self.kwargs)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 238, in func
    return import_attribute(self.func_name)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/utils.py", line 157, in import_attribute
    attribute_owner = getattr(module, attribute_owner_name)
AttributeError: module 'tests' has no attribute 'test_worker'
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/worker.py", line 1061, in perform_job
    rv = job.perform()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 821, in perform
    self._result = self._execute()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 844, in _execute
    result = self.func(*self.args, **self.kwargs)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 238, in func
    return import_attribute(self.func_name)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/utils.py", line 157, in import_attribute
    attribute_owner = getattr(module, attribute_owner_name)
AttributeError: module 'tests' has no attribute 'test_worker'
ERROR:rq.worker:Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/worker.py", line 1061, in perform_job
    rv = job.perform()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 821, in perform
    self._result = self._execute()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 844, in _execute
    result = self.func(*self.args, **self.kwargs)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 238, in func
    return import_attribute(self.func_name)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/utils.py", line 157, in import_attribute
    attribute_owner = getattr(module, attribute_owner_name)
AttributeError: module 'tests' has no attribute 'test_worker'
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/worker.py", line 1061, in perform_job
    rv = job.perform()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 821, in perform
    self._result = self._execute()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 844, in _execute
    result = self.func(*self.args, **self.kwargs)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 238, in func
    return import_attribute(self.func_name)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/utils.py", line 157, in import_attribute
    attribute_owner = getattr(module, attribute_owner_name)
AttributeError: module 'tests' has no attribute 'test_worker'
_____________________________________________________ HerokuWorkerShutdownTestCase.test_handle_shutdown_request ______________________________________________________

self = <tests.test_worker.HerokuWorkerShutdownTestCase testMethod=test_handle_shutdown_request>, mock_logger_info = <MagicMock name='info' id='140135331765776'>

    @mock.patch('rq.worker.logger.info')
    def test_handle_shutdown_request(self, mock_logger_info):
        """Mutate HerokuWorker so _horse_pid refers to an artificial process
        and test handle_warm_shutdown_request"""
        w = HerokuWorker('foo')

        path = os.path.join(self.sandbox, 'shouldnt_exist')
        p = Process(target=create_file_after_timeout_and_setsid, args=(path, 2))
        p.start()
        self.assertEqual(p.exitcode, None)
        time.sleep(0.1)

        w._horse_pid = p.pid
        w.handle_warm_shutdown_request()
        p.join(2)
        # would expect p.exitcode to be -34
>       self.assertEqual(p.exitcode, -34)
E       AssertionError: 0 != -34

tests/test_worker.py:1359: AssertionError
====================================================================== short test summary info =======================================================================
FAILED tests/test_worker.py::TestWorkerSubprocess::test_run_scheduled_access_self - AssertionError: False is not true
FAILED tests/test_worker.py::HerokuWorkerShutdownTestCase::test_handle_shutdown_request - AssertionError: 0 != -34
======================================================= 2 failed, 326 passed, 13 skipped in 170.18s (0:02:50) ========================================================
(linux-venv) ubuntu@MICHAELH-ZB:/mnt/c/users/mhill/pycharmprojects/rq$

@caffeinatedMike
Copy link
Contributor

@selwin I can confirm there is definitely a bug with the scheduler in regards to dependencies.

Script to reproduce

from datetime import datetime, timedelta
from redis import Redis
from rq import Queue
from rq.registry import ScheduledJobRegistry

dt = datetime.now() + timedelta(minutes=1)
q = Queue(connection=Redis.from_url('redis://'))
r = ScheduledJobRegistry(queue=q, connection=q.connection)

j1 = q.enqueue_at(dt, "time.sleep", args=(30,), description="Long task before short task")
j2 = q.enqueue_at(dt, print, args=("Hello World",), description="Short task dependent on long task", depends_on=j1)

assert j1 in r
assert j2 in r

With 2 workers running both tasks are executed simultaneously, instead of j2 waiting for j1 to finish.
dependency-issue-with-scheduler

@selwin
Copy link
Collaborator

selwin commented Mar 12, 2022

@caffeinatedMike yeah, it definitely looks like a bug. If a job is scheduled with a dependency, it should always go to DeferredJobRegistry instead of ScheduledJobRegistry.

You found so many bugs it's starting to make me look bad :(

@selwin
Copy link
Collaborator

selwin commented Mar 12, 2022

@caffeinatedMike obviously I was joking about the last bit. Keep up the good work 😄

@caffeinatedMike
Copy link
Contributor

caffeinatedMike commented Mar 12, 2022

@selwin sorry 😅

The good news for this bug is it all seems to tie back to enqueue_job. Solve that and scheduling (and all the above originally mentioned enqueue methods) should work again.

The problem is this scheduling issue, plus a few others are blocking a big work-related project of mine that's otherwise ready for production 😔

Blockers for me:

@caffeinatedMike
Copy link
Contributor

@selwin Can you have a look at my current work on this issue at https://github.com/caffeinatedMike/rq/tree/enqueue_job-depends_on-fix. I'm really struggling to resolve this. The current test results are abysmal.

@jtfidje
Copy link
Contributor Author

jtfidje commented Mar 15, 2022

@caffeinatedMike I just pulled your repo and tried running the tests (just noticed the run-in-docker thing.)
There are a lot more than two failing tests? Or have you done more changes since your message to me? :)

@caffeinatedMike
Copy link
Contributor

caffeinatedMike commented Mar 15, 2022

@jtfidje if you review the commit history I made a bunch of changes yesterday (see my comment above) and it's causing a bunch of failures because I'm trying a bunch of different tweaks to find a solution.

If the dependency issues aren't resolved soon I might just have to cut bait and opt for a different package.

@jtfidje
Copy link
Contributor Author

jtfidje commented Mar 15, 2022

Aah yeah sorry, I see 😃

Is it not possible for you as well to use my workaround from the top of this thread until a proper fix is pushed? 😃

@caffeinatedMike
Copy link
Contributor

@jtfidje can you share the entire workaround? I only saw that you shared one custom class

@OlegZv
Copy link
Contributor

OlegZv commented Apr 6, 2022

Just wanted to throw my +1 in here. Looking to do exactly this - have a dependency of scheduled jobs and would benefit from this fix! Thank you!

@caffeinatedMike
Copy link
Contributor

caffeinatedMike commented Apr 6, 2022

@OlegZv - If you're pressed for time this can be implemented with relative ease using celery. I managed to switch over in a matter of days (coming from having zero experience with celery).

@spietras
Copy link

spietras commented Apr 6, 2022

I "fixed" this by creating a custom class that inherits from the RQ one and then just patch the function with the dependency code. I've done this for a couple of things so I run my own custom Worker, Queue and Job classes slightly_smiling_face

I guess it's something along the lines of:

from rq import Queue, scheduler, worker
from rq.job import JobStatus


class PatchedQueue(Queue):
    def setup_dependencies(self, job, pipeline=None):
        if len(job._dependency_ids) == 0 and pipeline is not None and pipeline.explicit_transaction:
            return job
        return super().setup_dependencies(job, pipeline)

    def enqueue_job(self, job, pipeline=None, at_front=False):
        job = self.setup_dependencies(job, pipeline=pipeline)
        if job.get_status(refresh=False) != JobStatus.DEFERRED:
            return super().enqueue_job(job, pipeline, at_front)
        return job


class PatchedScheduler(scheduler.RQScheduler):
    scheduler.Queue = PatchedQueue


class PatchedWorker(worker.Worker):
    worker.RQScheduler = PatchedScheduler

This workaround works for me if you only use queues and workers from here. I know it's not pretty and I'm not 100% sure it works in all cases. You don't really need PatchedScheduler and PatchedWorker classes, but it just somehow looked better to me.

@caffeinatedMike
Copy link
Contributor

@spietras Would you be open to creating a PR and testing against the existing tests?

@spietras
Copy link

spietras commented Apr 6, 2022

I tried to add these changes to Queue and I ran the tests locally and 17 of them failed, so this method certainly doesn't work in all cases. Can't really debug it much further right now.

@lore-preoptima
Copy link

Hi all,
sorry for the question.. I have this issue using Queue.enqueue(....depends_on=...) .. I see the issue is still open, is there any other method I can use instead to enqueue a job that is dependant on a list of other jobs? Or this issue is on any method ?

Thanks

b-pos465 added a commit to climate-v/nc2zarr-webapp that referenced this issue May 10, 2022
…not wait as 'queue.enqueue_job' was used instead of 'queue.enqueue'. The issue with the counterintuitive API is documented in the following 'GitHub' issue: rq/rq#1404.
@thomasmatecki
Copy link
Contributor

@caffeinatedMike is there a separate issue for this #1404 (comment) problem?

@ccrvlh
Copy link
Collaborator

ccrvlh commented Feb 2, 2023

Specifically regarding to the scheduler like it was mentioned here by @caffeinatedMike :

I'm looking at this for the very first time, so please correct me if I got something wrong.
My impression is that the issue is the priority between depends_on and enqueued_at.

When doing this:

j1 = q.enqueue_at(dt, "time.sleep", args=(5,), description="Long task before short task")
j2 = q.enqueue_at(dt, print, args=("Hello World",), description="Short task dependent on long task", depends_on=j1)

There's somewhat of a conflict: should you run the job at the time the scheduler told the job to run enqueued_at, or should you enqueue the job based on the dependency tree? Currently, we seem to be respecting the scheduled time.

If we just enqueue the dependency, not actually sending it to the scheduler like so:

j1 = q.enqueue_at(dt, "time.sleep", args=(5,), description="Long task before short task")
j2 = q.enqueue(print, args=("Hello World",), description="Short task dependent on long task", depends_on=j1)

Now this works as expected: job2 will run after job1.
So we there's not really a way to make this consistent: you either chose to respect the dt, or you can chose to respect the dependency.

@caffeinatedMike
Copy link
Contributor

@thomasmatecki It's been awhile since I've checked in on this package since having to switch over to celery, so I'm not sure. Or more accurately, I don't remember.

@lowercase00 there should be nothing wrong with using enqueue_at with a dependency. Logically, I would think that once the enqueue_at time arrives that is when the depend_on statuses are checked. If they aren't done, then the behavior would be the equivalent of calling enqueue from that time on until a ready-state is reached; waiting on the dependencies to finish.

@ccrvlh
Copy link
Collaborator

ccrvlh commented Feb 3, 2023

@caffeinatedMike sure I understand your point and do think there's room for improvement, but even in that scenario, say you have a 1hour job (extrapolating to make a point), you would lose the scheduled time. That's what I meant by saying that there's no way to keep both guarantees.

@caffeinatedMike
Copy link
Contributor

@lowercase00 you definitely raise a good point! I think we could likely model a dependency policy model inline with how APScheduler handles your scenario (see Missed job executions and coalescing).

@ccrvlh
Copy link
Collaborator

ccrvlh commented Feb 3, 2023

That's a very interesting approach indeed. It does seem more complex (a brief overview of the docs, by experience with APScheduler is limited) than just have the dependency tree be the priority and making it so that the priority is an argument in which one could define whether time was of the essence vs the dependency tree.

For now, perhaps just changing this behavior + better docs explaining the potential conflict would be enough.

@ccrvlh
Copy link
Collaborator

ccrvlh commented Feb 3, 2023

Thinking about this a little more, It seems what's making things confusing is the fact that the queues takes a lot of responsibility on the jobs setup (dependencies).

What if we had setup_dependencies to be a method of the job - it actually only changes the job state, it has little to do with the queue. And then when using the queue to enqueue (anything, be it a job with enqueue_job or a func with enqueue_call) the job would carry a state is_dependencies_setup and the queue would check that and know whether it still needs to setup dependencies or not.

That way the queue API would remain stable, it would be easier to make a consistent behavior, and one may argue it would make more sense, since this method is just changing the job state, it does nothing to the queue.

@selwin thoughts?

@selwin
Copy link
Collaborator

selwin commented Feb 25, 2023

Sorry I just saw this message.

I'm ok with moving some dependencies logic to Job class if this has nothing or little to do with queue logic. We can even create a separate dependencies.py to further separate complex logic into different chunks so it's easier to maintain.

The issue with moving this logic to the Job class is that job.py is ~1.5k lines vs queue.py is ~1.3k lines so it's not like job.py has nothing to worry about ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants