New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add new QueuedJobRegistry
to catch jobs dropped by workers before m…
#1568
base: master
Are you sure you want to change the base?
Conversation
…arked as started
Codecov Report
@@ Coverage Diff @@
## master #1568 +/- ##
==========================================
+ Coverage 95.59% 95.67% +0.08%
==========================================
Files 46 46
Lines 7061 7195 +134
==========================================
+ Hits 6750 6884 +134
Misses 311 311
Continue to review full report at Codecov.
|
Thanks for the PR. I think at any one time, a job can only exist in one registry. When a job is moved to When a job is canceled, it should also be moved from |
@selwin It is when you enqueue, it's just up a level in the call stack: Line 913 in 8fd8de0
I didn't include it in the same level of the call stack since the
@selwin I didn't include this here since there is a bug fix PR I also have up, where canceling assumes it is in failed, so if you had a job elsewhere in a different registry, it would also break. I think it makes sense to make this change in this PR after we merge #1564 |
I meant when a job is popped off the queue and moved to |
@selwin as mentioned above, I am removing it from Line 913 in 8fd8de0
|
I think what @joshcoden is trying to say is prepare_job_execution calls |
if job.enqueued_at < front_timestamp: | ||
self.requeue(job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if job.enqueued_at >= front_timestamp
, the condition would reman True
for rest of the jobs in the QueuedJobRegistry, you no longer need to iterate through the jobs (assuming get_job_ids returns a list in ascending order).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alella The user can manually specify the score when calling add
on the registry. While no such calls will exist in code, a user could specify a ttl
that does not align with the enqueued_at
time:
Lines 64 to 72 in 4711080
def add(self, job, ttl=0, pipeline=None, xx=False): | |
"""Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf""" | |
score = ttl if ttl < 0 else current_timestamp() + ttl | |
if score == -1: | |
score = '+inf' | |
if pipeline is not None: | |
return pipeline.zadd(self.key, {job.id: score}, xx=xx) | |
return self.connection.zadd(self.key, {job.id: score}, xx=xx) |
@alella @joshcoden thanks for explaination. It is indeed working as expected. I misread this because the the code around this part is not symmetric. Can you create a With this, we can potentially also cleanup
This already led to race condition bugs fixed in this PR #1550 |
else: | ||
raise InvalidJobOperationError( | ||
"Queued job {} has no enqueue_at value!".format(front_job.id) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could not happen, right? So I think we can skip this check.
for job_id in job_ids: | ||
# If job was enqueued AFTER the front of the queue it must have already been dequeued | ||
# This is faster than seeing if the job is in the queue directly. | ||
front_timestamp = self._get_front_queue_timestamp() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can move this out of the loop so we don't keep getting new timestamp for every single job processed.
It is not defined in cleanup since we don't want this being called everytime count or get_job_ids is called | ||
""" | ||
job_ids = self.get_job_ids() | ||
for job_id in job_ids: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be expensive if we have lots of jobs in QueuedJobRegistry
and sometimes people have millions of jobs enqueued. I wonder if we have a way to only check a subset of the jobs.
connection=queue.connection, | ||
job_class=queue.job_class, | ||
serializer=queue.serializer) | ||
registry.requeue_stuck_jobs() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we should automatically requeue stuck jobs because:
- This operation could be heavy depending on the number of jobs you have in the registry
- If requeued, jobs that are time sensitive could lead to unwanted outcome
@joshcoden any plans to continue working on this? |
…arked as started
Fixes #1553