Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new QueuedJobRegistry to catch jobs dropped by workers before m… #1568

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

joshcoden
Copy link
Contributor

…arked as started

Fixes #1553

@codecov
Copy link

codecov bot commented Sep 24, 2021

Codecov Report

Merging #1568 (cb9fc61) into master (e71fcb9) will increase coverage by 0.08%.
The diff coverage is 99.28%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1568      +/-   ##
==========================================
+ Coverage   95.59%   95.67%   +0.08%     
==========================================
  Files          46       46              
  Lines        7061     7195     +134     
==========================================
+ Hits         6750     6884     +134     
  Misses        311      311              
Impacted Files Coverage Δ
rq/registry.py 97.15% <96.96%> (-0.07%) ⬇️
rq/job.py 98.20% <100.00%> (+0.01%) ⬆️
rq/queue.py 94.26% <100.00%> (+0.35%) ⬆️
rq/worker.py 88.66% <100.00%> (+0.01%) ⬆️
tests/test_job.py 100.00% <100.00%> (ø)
tests/test_queue.py 100.00% <100.00%> (ø)
tests/test_registry.py 100.00% <100.00%> (ø)
tests/test_worker.py 97.52% <100.00%> (+<0.01%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e71fcb9...cb9fc61. Read the comment docs.

@joshcoden joshcoden marked this pull request as ready for review September 24, 2021 18:37
@selwin
Copy link
Collaborator

selwin commented Sep 28, 2021

Thanks for the PR.

I think at any one time, a job can only exist in one registry. When a job is moved to StartedJobRegistry, it should also be removed from QueuedJobRegistry in a single pipeline call. Do you mind making this change?

When a job is canceled, it should also be moved from QueuedJobRegistry to CanceledJobRegistry.

docs/docs/job_registries.md Outdated Show resolved Hide resolved
@joshcoden
Copy link
Contributor Author

I think at any one time, a job can only exist in one registry. When a job is moved to StartedJobRegistry, it should also be removed from QueuedJobRegistry in a single pipeline call. Do you mind making this change?

@selwin It is when you enqueue, it's just up a level in the call stack:

rq/rq/worker.py

Line 913 in 8fd8de0

job.queued_job_registry.remove(job, pipeline=pipeline)

I didn't include it in the same level of the call stack since the job.heartbeat method is also called in other places, like on job callbacks, where we don't want to remove it from a registry it shouldn't be in.

When a job is canceled, it should also be moved from QueuedJobRegistry to CanceledJobRegistry.

@selwin I didn't include this here since there is a bug fix PR I also have up, where canceling assumes it is in failed, so if you had a job elsewhere in a different registry, it would also break. I think it makes sense to make this change in this PR after we merge #1564

@selwin
Copy link
Collaborator

selwin commented Sep 28, 2021

It is when you enqueue, it's just up a level in the call stack:

I meant when a job is popped off the queue and moved to StartedJobRegistry (when it's being worked on), it should also be removed from QueuedJobRegistry. As it is now, when a job is being worked on it exists in both QueuedJobRegistry and StartedJobRegistry.

@joshcoden
Copy link
Contributor Author

joshcoden commented Sep 28, 2021

It is when you enqueue, it's just up a level in the call stack:

I meant when a job is popped off the queue and moved to StartedJobRegistry (when it's being worked on), it should also be removed from QueuedJobRegistry. As it is now, when a job is being worked on it exists in both QueuedJobRegistry and StartedJobRegistry

@selwin as mentioned above, I am removing it from QueuedJobRegistry before adding to StartedJobRegistry within the same pipeline, In prepare_for_execution I remove it from the QueuedJobRegistry before calling heartbeat which is where the job gets added to StartedJobRegistry?, please see the code link in the comment above: #1568 (comment):

rq/rq/worker.py

Line 913 in 8fd8de0

job.queued_job_registry.remove(job, pipeline=pipeline)

@alella
Copy link
Contributor

alella commented Sep 28, 2021

fn_call queue QueuedJobRegistry StartedJobRegistry parent worker forked worker
enqueue job job - -
dequeue_job_and_maintain_ttl - job active doesnt exist
perform_job(job, queue) - job inactive active
prepare_job_execution(job) - - job inactive active

I think what @joshcoden is trying to say is prepare_job_execution calls job.queued_job_registry.remove and job.heartbeat in the same pipeline. job.heartbeat in turn calls started_job_registry.add using the same pipeline. So the job never exists in both QueuedJobRegistry and StartedJobRegistry at the same time

docs/docs/job_registries.md Outdated Show resolved Hide resolved
rq/queue.py Outdated Show resolved Hide resolved
rq/registry.py Outdated Show resolved Hide resolved
rq/registry.py Outdated Show resolved Hide resolved
rq/registry.py Outdated Show resolved Hide resolved
rq/registry.py Outdated Show resolved Hide resolved
Comment on lines +370 to +371
if job.enqueued_at < front_timestamp:
self.requeue(job)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if job.enqueued_at >= front_timestamp, the condition would reman True for rest of the jobs in the QueuedJobRegistry, you no longer need to iterate through the jobs (assuming get_job_ids returns a list in ascending order).

Copy link
Contributor Author

@joshcoden joshcoden Sep 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alella The user can manually specify the score when calling add on the registry. While no such calls will exist in code, a user could specify a ttl that does not align with the enqueued_at time:

rq/rq/registry.py

Lines 64 to 72 in 4711080

def add(self, job, ttl=0, pipeline=None, xx=False):
"""Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf"""
score = ttl if ttl < 0 else current_timestamp() + ttl
if score == -1:
score = '+inf'
if pipeline is not None:
return pipeline.zadd(self.key, {job.id: score}, xx=xx)
return self.connection.zadd(self.key, {job.id: score}, xx=xx)

rq/registry.py Outdated Show resolved Hide resolved
joshcoden and others added 2 commits September 28, 2021 11:21
Co-authored-by: Ashoka Lella <alella@users.noreply.github.com>
@selwin
Copy link
Collaborator

selwin commented Sep 29, 2021

@alella @joshcoden thanks for explaination.

It is indeed working as expected. I misread this because the the code around this part is not symmetric. Can you create a job.move_to_started_job_registry() method that moves the job from QueuedJobRegistry into StartedJobRegistry? It will be cleaner this way.

With this, we can potentially also cleanup job.heartbeat(xx) because right now it does two jobs:

  1. Adding job to started job registry
  2. Updating the timestamp in started job registry

This already led to race condition bugs fixed in this PR #1550

Comment on lines +388 to +391
else:
raise InvalidJobOperationError(
"Queued job {} has no enqueue_at value!".format(front_job.id)
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could not happen, right? So I think we can skip this check.

for job_id in job_ids:
# If job was enqueued AFTER the front of the queue it must have already been dequeued
# This is faster than seeing if the job is in the queue directly.
front_timestamp = self._get_front_queue_timestamp()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move this out of the loop so we don't keep getting new timestamp for every single job processed.

It is not defined in cleanup since we don't want this being called everytime count or get_job_ids is called
"""
job_ids = self.get_job_ids()
for job_id in job_ids:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be expensive if we have lots of jobs in QueuedJobRegistry and sometimes people have millions of jobs enqueued. I wonder if we have a way to only check a subset of the jobs.

connection=queue.connection,
job_class=queue.job_class,
serializer=queue.serializer)
registry.requeue_stuck_jobs()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should automatically requeue stuck jobs because:

  1. This operation could be heavy depending on the number of jobs you have in the registry
  2. If requeued, jobs that are time sensitive could lead to unwanted outcome

@joshcoden joshcoden marked this pull request as draft September 29, 2021 15:00
@ccrvlh
Copy link
Collaborator

ccrvlh commented Jan 17, 2023

@joshcoden any plans to continue working on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

rq jobs go into a bad state when redis hits connection limit
4 participants