-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Job is executed before "depends_on" job is finished when using Job.create and q.enqueue_job #1404
Comments
This PR adds multi dependency feature to RQ proper. You should try this instead. |
Thanks for answering! I've looked through the PR, but I cant see how those changes would fix this problem? The only way I managed to fix this was to create a custom Queue class and implement the following function as a replacement for the initial def enqueue_custom(
self, job: CustomJob, pipeline=None, at_front: bool = False
) -> CustomJob:
# If a _dependent_ job depends on any unfinished job, register all the
# _dependent_ job's dependencies instead of enqueueing it.
#
# `Job#fetch_dependencies` sets WATCH on all dependencies. If
# WatchError is raised in the when the pipeline is executed, that means
# something else has modified either the set of dependencies or the
# status of one of them. In this case, we simply retry.
job.set_status(FlowStatus.QUEUED)
if job._dependency_id:
with self.connection.pipeline() as pipe:
while True:
try:
pipe.watch(job.dependencies_key)
dependencies = job.fetch_dependencies(watch=True, pipeline=pipe)
pipe.multi()
for dependency in dependencies:
if (
dependency.get_status(refresh=False)
!= FlowStatus.FINISHED
):
job.set_status(FlowStatus.DEFERRED, pipeline=pipe)
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
job.cleanup(ttl=job.ttl, pipeline=pipe)
pipe.execute()
return job
break
except WatchError:
continue
return super().enqueue_job(job, pipeline, at_front=at_front) You have this if-block inside |
With the PR merged, you can do |
Yes I understand ( and really like this feature! ) from rq.job import Job
job = Job.create(count_words_at_url, 'http://nvie.com')
print('Job id: %s' % job.id)
q.enqueue_job(job)
# create a job with a predetermined id
job = Job.create(count_words_at url, 'http://nvie.com', id='my_job_id') This way of doing it fits my needs very well, but if I do this job dependencies does not work. If this is by design then of course I will have to use Edit: |
I think this issue should be reopened as it appears to still occur(new failing test): Line 530 in 9bef2aa
I agree with @jtfidje that this is not related multiple dependencies, though a few folks documented it in #1170. The crux of the issue is that |
In encountered the issue as well and would love to see a fix. |
It seems this issue still stands |
|
I think the behavior of What do folks think of changing the behavior of |
@selwin I fully agree and support @thomasmatecki's point with aligning My reason for constructing the |
Wouldn't moving the following snippet from job = self.setup_dependencies(
job,
pipeline=pipeline
)
# If we do not depend on an unfinished job, enqueue the job.
if job.get_status(refresh=False) != JobStatus.DEFERRED:
return self.enqueue_job(job, pipeline=pipeline, at_front=at_front)
return job |
The intended design for I'm ok with refactoring
I'm open to accepting a PR for this. In the meantime, I can update the docs to clarify what |
I "fixed" this by creating a custom class that inherits from the RQ one and then just patch the function with the dependency code. I've done this for a couple of things so I run my own custom Worker, Queue and Job classes 🙂 |
@jtfidje Would you consider contributing the "fix" via a PR? I'm sure it would be greatly appreciated by all those affected by this issue, myself included. |
Sure :) Let me try to take what @selwin said into account and see what I can come up with. |
@selwin I'm having troubles setting up my environment in order to run tests. Could you please go over the docs and see if something has changed / is missing? |
What errors did you run into? |
I followed the docs and did this:
Now I guess that |
@jtfidje I'm attempting a fix for this as well (here's a link to my branch). I used the two tests mentioned earlier in this thread with my changes, but the second test still fails. For some reason the second job gets stuck in the Would you be up to having a look? @selwin your input is welcome and appreciated as well.
________________________________________________________________ TestQueue.test_enqueue_job_with_dependency_executes_both ________________________________________________________________
self = <tests.test_queue.TestQueue testMethod=test_enqueue_job_with_dependency_executes_both>
def test_enqueue_job_with_dependency_executes_both(self):
"""Jobs are enqueued only when their dependencies are finished."""
from redis import Redis
q = Queue(connection=Redis.from_url("redis://"))
job_a = Job.create(func=say_hello, connection=q.connection)
job_a.save()
job = q.enqueue_job(job_a)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
job_b = Job.create(func=say_hello, connection=q.connection, depends_on=job_a)
job_b.save()
job = q.enqueue_job(job_b)
from rq.worker import SimpleWorker
w = SimpleWorker([q], connection=q.connection)
w.work(burst=True)
self.assertEqual(job_a.get_status(), JobStatus.FINISHED)
> self.assertEqual(job_b.get_status(), JobStatus.FINISHED)
E AssertionError: 'deferred' != <JobStatus.FINISHED: 'finished'>
test_queue.py:543: AssertionError
================================================================================ short test summary info =================================================================================
FAILED test_queue.py::TestQueue::test_enqueue_job_with_dependency_executes_both - AssertionError: 'deferred' != <JobStatus.FINISHED: 'finished'>
=================================================================================== 1 failed in 0.47s ==================================================================================== |
@jtfidje How have your efforts faired? |
Hi sorry! I haven't looked at it after I couldn't get the tests to run. Maybe you could walk me through your setup and how you run the tests? That would be of great help :) |
@selwin I believe (I have yet to confirm though) this might be a bigger issue than originally thought. You state
Which is a troubling thought when reviewing the |
@jtfidje I opted to utilize WSL on windows to develop. It's much simpler. Just as a heads-up, you'll always have 2 failing tests when running tests through WSL Ubuntu (I'm using 18.04). These can be safely ignored as they pass when run by GitHub Actions. See below: (linux-venv) ubuntu@MICHAELH-ZB:/mnt/c/users/mhill/pycharmprojects/rq$ pytest tests
======================================================================== test session starts =========================================================================
platform linux -- Python 3.6.9, pytest-7.0.1, pluggy-1.0.0
rootdir: /mnt/c/users/mhill/pycharmprojects/rq
plugins: cov-3.0.0
collected 341 items
tests/test_callbacks.py ......... [ 2%]
tests/test_cli.py .............................. [ 11%]
tests/test_commands.py ... [ 12%]
tests/test_connection.py ..... [ 13%]
tests/test_decorator.py ............... [ 18%]
tests/test_fixtures.py .. [ 18%]
tests/test_helpers.py . [ 19%]
tests/test_job.py ............................................................................... [ 42%]
tests/test_queue.py .................................................. [ 56%]
tests/test_registry.py ........................... [ 64%]
tests/test_retry.py ...... [ 66%]
tests/test_scheduler.py ...............s... [ 72%]
tests/test_sentry.py ... [ 73%]
tests/test_serializers.py . [ 73%]
tests/test_timeouts.py . [ 73%]
tests/test_utils.py ............... [ 78%]
tests/test_worker.py ..............ss..................s.s..................sssss..FsF.ss... [ 98%]
tests/test_worker_registration.py .... [100%]
============================================================================== FAILURES ==============================================================================
________________________________________________________ TestWorkerSubprocess.test_run_scheduled_access_self _________________________________________________________
self = <tests.test_worker.TestWorkerSubprocess testMethod=test_run_scheduled_access_self>
@skipIf('pypy' in sys.version.lower(), 'often times out with pypy')
def test_run_scheduled_access_self(self):
"""Schedule a job that schedules a job, then run the worker as subprocess"""
q = Queue()
job = q.enqueue(schedule_access_self)
subprocess.check_call(['rqworker', '-u', self.redis_url, '-b'])
registry = FinishedJobRegistry(queue=q)
> self.assertTrue(job in registry)
E AssertionError: False is not true
tests/test_worker.py:1281: AssertionError
------------------------------------------------------------------------ Captured stdout call ------------------------------------------------------------------------
18:54:33 Worker rq:worker:cf5d63964c7f4b98b9018086b4ebc374: started, version 1.10.1
18:54:33 Subscribing to channel rq:pubsub:cf5d63964c7f4b98b9018086b4ebc374
18:54:33 *** Listening on default...
18:54:33 Cleaning registries for queue: default
18:54:33 default: tests.test_worker.schedule_access_self() (29441f48-7551-4b82-bc9d-a2f81b2d858c)
18:54:33 Worker rq:worker:cf5d63964c7f4b98b9018086b4ebc374: done, quitting
18:54:33 Unsubscribing from channel rq:pubsub:cf5d63964c7f4b98b9018086b4ebc374
------------------------------------------------------------------------ Captured stderr call ------------------------------------------------------------------------
WARNING:root:Import error for 'tests.test_worker'
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/utils.py", line 141, in import_attribute
module = importlib.import_module(module_name)
File "/usr/lib/python3.6/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 994, in _gcd_import
File "<frozen importlib._bootstrap>", line 971, in _find_and_load
File "<frozen importlib._bootstrap>", line 955, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "./tests/test_worker.py", line 22, in <module>
import pytest
ModuleNotFoundError: No module named 'pytest'
18:54:33 Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/worker.py", line 1061, in perform_job
rv = job.perform()
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 821, in perform
self._result = self._execute()
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 844, in _execute
result = self.func(*self.args, **self.kwargs)
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 238, in func
return import_attribute(self.func_name)
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/utils.py", line 157, in import_attribute
attribute_owner = getattr(module, attribute_owner_name)
AttributeError: module 'tests' has no attribute 'test_worker'
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/worker.py", line 1061, in perform_job
rv = job.perform()
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 821, in perform
self._result = self._execute()
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 844, in _execute
result = self.func(*self.args, **self.kwargs)
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 238, in func
return import_attribute(self.func_name)
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/utils.py", line 157, in import_attribute
attribute_owner = getattr(module, attribute_owner_name)
AttributeError: module 'tests' has no attribute 'test_worker'
ERROR:rq.worker:Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/worker.py", line 1061, in perform_job
rv = job.perform()
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 821, in perform
self._result = self._execute()
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 844, in _execute
result = self.func(*self.args, **self.kwargs)
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 238, in func
return import_attribute(self.func_name)
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/utils.py", line 157, in import_attribute
attribute_owner = getattr(module, attribute_owner_name)
AttributeError: module 'tests' has no attribute 'test_worker'
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/worker.py", line 1061, in perform_job
rv = job.perform()
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 821, in perform
self._result = self._execute()
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 844, in _execute
result = self.func(*self.args, **self.kwargs)
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/job.py", line 238, in func
return import_attribute(self.func_name)
File "/home/ubuntu/.local/lib/python3.6/site-packages/rq/utils.py", line 157, in import_attribute
attribute_owner = getattr(module, attribute_owner_name)
AttributeError: module 'tests' has no attribute 'test_worker'
_____________________________________________________ HerokuWorkerShutdownTestCase.test_handle_shutdown_request ______________________________________________________
self = <tests.test_worker.HerokuWorkerShutdownTestCase testMethod=test_handle_shutdown_request>, mock_logger_info = <MagicMock name='info' id='140135331765776'>
@mock.patch('rq.worker.logger.info')
def test_handle_shutdown_request(self, mock_logger_info):
"""Mutate HerokuWorker so _horse_pid refers to an artificial process
and test handle_warm_shutdown_request"""
w = HerokuWorker('foo')
path = os.path.join(self.sandbox, 'shouldnt_exist')
p = Process(target=create_file_after_timeout_and_setsid, args=(path, 2))
p.start()
self.assertEqual(p.exitcode, None)
time.sleep(0.1)
w._horse_pid = p.pid
w.handle_warm_shutdown_request()
p.join(2)
# would expect p.exitcode to be -34
> self.assertEqual(p.exitcode, -34)
E AssertionError: 0 != -34
tests/test_worker.py:1359: AssertionError
====================================================================== short test summary info =======================================================================
FAILED tests/test_worker.py::TestWorkerSubprocess::test_run_scheduled_access_self - AssertionError: False is not true
FAILED tests/test_worker.py::HerokuWorkerShutdownTestCase::test_handle_shutdown_request - AssertionError: 0 != -34
======================================================= 2 failed, 326 passed, 13 skipped in 170.18s (0:02:50) ========================================================
(linux-venv) ubuntu@MICHAELH-ZB:/mnt/c/users/mhill/pycharmprojects/rq$ |
@selwin I can confirm there is definitely a bug with the scheduler in regards to dependencies. Script to reproduce from datetime import datetime, timedelta
from redis import Redis
from rq import Queue
from rq.registry import ScheduledJobRegistry
dt = datetime.now() + timedelta(minutes=1)
q = Queue(connection=Redis.from_url('redis://'))
r = ScheduledJobRegistry(queue=q, connection=q.connection)
j1 = q.enqueue_at(dt, "time.sleep", args=(30,), description="Long task before short task")
j2 = q.enqueue_at(dt, print, args=("Hello World",), description="Short task dependent on long task", depends_on=j1)
assert j1 in r
assert j2 in r With 2 workers running both tasks are executed simultaneously, instead of j2 waiting for j1 to finish. |
@caffeinatedMike yeah, it definitely looks like a bug. If a job is scheduled with a dependency, it should always go to You found so many bugs it's starting to make me look bad :( |
@caffeinatedMike obviously I was joking about the last bit. Keep up the good work 😄 |
@selwin sorry 😅 The good news for this bug is it all seems to tie back to The problem is this scheduling issue, plus a few others are blocking a big work-related project of mine that's otherwise ready for production 😔 Blockers for me:
|
@selwin Can you have a look at my current work on this issue at https://github.com/caffeinatedMike/rq/tree/enqueue_job-depends_on-fix. I'm really struggling to resolve this. The current test results are abysmal. |
@caffeinatedMike I just pulled your repo and tried running the tests (just noticed the run-in-docker thing.) |
@jtfidje if you review the commit history I made a bunch of changes yesterday (see my comment above) and it's causing a bunch of failures because I'm trying a bunch of different tweaks to find a solution. If the dependency issues aren't resolved soon I might just have to cut bait and opt for a different package. |
Aah yeah sorry, I see 😃 Is it not possible for you as well to use my workaround from the top of this thread until a proper fix is pushed? 😃 |
@jtfidje can you share the entire workaround? I only saw that you shared one custom class |
Just wanted to throw my +1 in here. Looking to do exactly this - have a dependency of scheduled jobs and would benefit from this fix! Thank you! |
@OlegZv - If you're pressed for time this can be implemented with relative ease using celery. I managed to switch over in a matter of days (coming from having zero experience with celery). |
I guess it's something along the lines of: from rq import Queue, scheduler, worker
from rq.job import JobStatus
class PatchedQueue(Queue):
def setup_dependencies(self, job, pipeline=None):
if len(job._dependency_ids) == 0 and pipeline is not None and pipeline.explicit_transaction:
return job
return super().setup_dependencies(job, pipeline)
def enqueue_job(self, job, pipeline=None, at_front=False):
job = self.setup_dependencies(job, pipeline=pipeline)
if job.get_status(refresh=False) != JobStatus.DEFERRED:
return super().enqueue_job(job, pipeline, at_front)
return job
class PatchedScheduler(scheduler.RQScheduler):
scheduler.Queue = PatchedQueue
class PatchedWorker(worker.Worker):
worker.RQScheduler = PatchedScheduler This workaround works for me if you only use queues and workers from here. I know it's not pretty and I'm not 100% sure it works in all cases. You don't really need |
@spietras Would you be open to creating a PR and testing against the existing tests? |
I tried to add these changes to |
Hi all, Thanks |
…not wait as 'queue.enqueue_job' was used instead of 'queue.enqueue'. The issue with the counterintuitive API is documented in the following 'GitHub' issue: rq/rq#1404.
@caffeinatedMike is there a separate issue for this #1404 (comment) problem? |
Specifically regarding to the scheduler like it was mentioned here by @caffeinatedMike : I'm looking at this for the very first time, so please correct me if I got something wrong. When doing this: j1 = q.enqueue_at(dt, "time.sleep", args=(5,), description="Long task before short task")
j2 = q.enqueue_at(dt, print, args=("Hello World",), description="Short task dependent on long task", depends_on=j1) There's somewhat of a conflict: should you run the job at the time the scheduler told the job to run If we just enqueue the dependency, not actually sending it to the scheduler like so: j1 = q.enqueue_at(dt, "time.sleep", args=(5,), description="Long task before short task")
j2 = q.enqueue(print, args=("Hello World",), description="Short task dependent on long task", depends_on=j1) Now this works as expected: job2 will run after job1. |
@thomasmatecki It's been awhile since I've checked in on this package since having to switch over to celery, so I'm not sure. Or more accurately, I don't remember. @lowercase00 there should be nothing wrong with using |
@caffeinatedMike sure I understand your point and do think there's room for improvement, but even in that scenario, say you have a 1hour job (extrapolating to make a point), you would lose the scheduled time. That's what I meant by saying that there's no way to keep both guarantees. |
@lowercase00 you definitely raise a good point! I think we could likely model a dependency policy model inline with how |
That's a very interesting approach indeed. It does seem more complex (a brief overview of the docs, by experience with APScheduler is limited) than just have the dependency tree be the For now, perhaps just changing this behavior + better docs explaining the potential conflict would be enough. |
Thinking about this a little more, It seems what's making things confusing is the fact that the queues takes a lot of responsibility on the jobs setup (dependencies). What if we had That way the queue API would remain stable, it would be easier to make a consistent behavior, and one may argue it would make more sense, since this method is just changing the job state, it does nothing to the queue. @selwin thoughts? |
Sorry I just saw this message. I'm ok with moving some dependencies logic to The issue with moving this logic to the |
Consider the following snippet below and two running workers:
In that snippet, everything works as expected; both jobs are enqueued ( one on each worker ) but func_2 does not run before func_1 is finished.
Now consider the following snippet, still with two workers:
When I run that code both jobs are started immediately, despite setting the
depends_on
argument.Mini bonus question
Why is it necessary to set
connection=
on the jobs when I useJob.create
? Doesn't theQueue
instance know what Redis connection to use already? Code fails without me settingconnection
on all the jobs as well.The text was updated successfully, but these errors were encountered: