-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multithread Worker #1803
base: master
Are you sure you want to change the base?
Multithread Worker #1803
Conversation
Agree, we'll definitely label this as beta when first introduced. Another approach that I think is better, and builds on the robustness of existing worker implementations is the concept of having a |
So this would be a bit more similar to what Celery does in which you have a single worker, and you just pass the EDIT: Reading again and looking at the code, it seems this would work with multiprocessing, but not sure about threads, async and gevent, since the pool managed the full worker, right? So this looks like a prefork model in which the supervisor runs workers, instead of a worker that spawns processes. |
It's actually the inverse, so we don't need to rewrite any worker code (or very minimal, hopefully). The Let's say we start the worker with this CLI command for range(n):
worker = Worker()
worker.start() # This would run in a separate process like the scheduler https://github.com/rq/rq/blob/master/rq/worker.py#L668 The good part about this concept is that we can keep the worker simple because it only has to deal with dequeueing and executing it's own jobs and keep track of its own results etc, which it has been doing for the past 10 years. The Edit: this implementation also leaves room for someone to implement their own multi-threaded worker. If the |
Got it, makes sense after reading the code more carefully.
Doesn’t One thing to keep in mind is the datastore, that’s something to pay attention to, specially in documentation, since this can get confusing:
That all being said, I think this effort is parallel (although related) to the concurrency implementation of other methods (namely
I guess whether the wrapper over the existing worker (
This is a fairly different topic, but a very relevant one nonetheless. I understand your point here, although I might have a slightly different view. I guess my main concern is that the bit by bit favors backwards compatibility over anything else, which greatly reduces flexibility. As this is a +10y codebase, it’s very hard to make it A polished and stable solution necessarily needs time, being exposed to battle and being tested and used in a lot of different scenarios by different people - having “simple” versions out there (beta) is super relevant to get this experience and expertise (think of I tend to favor a path where 1.x is maintained while v2 is being developed in beta. Two good examples of this approach are |
It shouldn't, because the worker pool can pass all configuration options to the underlying worker class.
I agree, I'm open to PRs that rearranges or reworks worker methods to make developing other kinds of concurrency implementation easier. This is something that I want to support. A few people maintained gevent powered workers.
I'm open to re-organizing the codebase to make it more structured, but it has to be done in PR chunks that I can actually review. I also understand that there's a pattern of using the
I totally agree and realize that I tend to make decisions that prioritizes stability and slow evolution of things.
Yes, unfortunately this is an unfortunate side effect and I'm aware of this. But I'm in favor of rearranging methods/functions such that improvements can still be done by subclassing RQ classes.
This is a good idea but I currently don't have the time and bandwidth to support two different code bases :(. |
@selwin truly appreciate your frankness and openness (and patience :)) on this! Implementation wise, i was thinking about this over the last couple of days, and I wonder what your thoughts are.
I guess the first option is easier to develop, since not many other argument/parms are needed and the architecture remains mostly the same. The second option is more natural/easier for the enduser. What are your thoughts? Also, what are the minimum set of features would you consider for a new worker? By limiting the feature set of a new worker (eg: the |
This test is generic and doesn't really test the Threadworker It doesn't make much sense to this being tested in every worker.
Some more tests with dependencies. One way I can think of to fix this: before actually breaking the work loop on the main thread, we could have an array of tuples with This shouldn't happen when not in burst mode, since there's always going to be next loop cycle, so the |
This seems to work - some more polishing needed. Tests are passing locally, but it seems something is wrong on the CI, will need to take a better look at this. I'll mark this as ready for review just so we can work on top of a feedback loop. @selwin something to keep in mind: the current architecture for tests was made for a single worker, so to tests other workers basically all test suite has to be replicated, lots of room for trouble here. We might want to think of a more dynamic way of testing different worker classes in the future. |
This test is generic and doesn't really test the Threadworker It doesn't make much sense to this being tested in every worker.
… into feature/multithread
I haven't had the time to look at this PR closely, for a PR this ambitious, I think we'll need time to iterate. If you think there's a few chunks of logic that should be broken down into separate functions/methods, let me know. We can make a separate PR to get that function/method into master first so it's easier to maintain this PR. I'll try to take a look at this in a few days. |
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## master #1803 +/- ##
==========================================
+ Coverage 95.02% 95.52% +0.50%
==========================================
Files 51 51
Lines 7997 8915 +918
==========================================
+ Hits 7599 8516 +917
- Misses 398 399 +1
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
@selwin want to take another look at this to see what other changes you would like?
|
Yes, I'd like to take a more in depth look when I have more time. Thanks for getting this PR this far, expect more changes as we get this into shape ;) This will be a very big step forward for RQ. |
Sure no worries, happy to contribute. Let me know when you want to get back to this. Will have more workers coming soon :) EDIT: BTW, I'll leave this a draft while we work on it. |
with self._lock: | ||
self._idle_threads += operation | ||
|
||
def __wait_for_slot(self, wait_interval: float = 0.25): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there some kind of push based mechanism instead of polling for free workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that I could find, the ThreadPoolExecutor
is a pretty thin wrapper - the only push based mechanics I saw was related to a single future with the done_callback
.
super(ThreadPoolWorker, self).__init__(*args, **kwargs) | ||
|
||
@property | ||
def is_pool_full(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better if we change this to number_of_available_horses()
instead of a simple boolean.
time.sleep(wait_interval) | ||
continue | ||
|
||
def __check_pending_dependents(self) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to restructure work()
in such a way that this is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I went this path here is that in this case, things don't happen sequentially, so we need a global way of handling dependencies (thread Y can't work unless thread X has finished) - this is not a challenge on the Worker
and on the SimpleWorker
.
I need to think further to further group some sections inside On a more meta perspective, since we already have work horse terminology, I think we should stick to that terminology and call the threads |
|
||
return bool(completed_jobs) | ||
|
||
def execute_job(self, job, queue): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the next step would be to make execute_job()
on the main Worker
class follow this pattern. After executing the job on a separate process, it should return to the main loop and wait
for job to finish before dequeueing the next job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to find some time to work on this sometime in the next few days, I think this will be quite complicated.
self.executor = ThreadPoolExecutor(max_workers=self.threadpool_size, thread_name_prefix="rq_workers_") | ||
self._idle_threads = self.threadpool_size | ||
self._lock = threading.Lock() | ||
self._current_jobs: List[Tuple['Job', 'Future']] = [] # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also bring _current_jobs
to the main worker class.
A few comments that are relevant for multiple different review comments: There are three main reasons why I don’t think going with The first, is that this is not obvious: is a thread a horse? How about a worker that runs each job on a separate process (using multiprocess for example)? And a Async worker running an event loop? And a gevent worker? This is historically used for forked processes (specifically using Harder to maintain: not every worker will have the same concepts/structure as the current. The best example is the By fitting things from other workers into the main worker, we would most likely cause bloat and less predictability on the current worker implementation - which makes the worker harder to maintain (less freedom to new worker implementations, by “demanding” them to follow the main worker class pattern/signature. Another place where this appears is the Premature optimization: this is the very first implementation of a logically different worker. Fitting everything this new worker uses to the old one, can be seen as optimization at a time where we don’t actually know that much about the requirements for new workers (in my mind a Another good example is the dependency logic: for a multithread worker, it needs to check for dependencies outside a specific thread - to avoid a thread working on a job when it actually should be waiting for its dependency to finish (on a different thread). This is not a challenge on the At last the example of changing the This worker has ~200 LOC while the original worker has 1.4k LOC - we are actually not rewriting that much at all. Not sure if it's worth to walk a long complicated, tricky and risky path to try to reduce those 200 to say 150 - at least not in the first iteration of a new implementation. |
We need to have a term to describe the capability to have multiple processes/threads/worker inside a
I understand the concern, but in my experience maintaining RQ (and other projects), breaking things into several methods blocks actually helps with the stability of other implementation that subclasses the main worker method. There have been several attempts at making multi-threaded (for lack of a better term) workers which overrides the entire If we were to have multiple worker models in RQ's core library, we'll need to be able to maintain them :).
I agree with you that the
I share your concern, which is why I'm taking this really slowly, one step at a time. It would be good if we can have multiple worker models ready, but I think this will be a big effort so I'm doing what I can based on the implementation that we have. At the very least, monitoring tools like django-rq (and even RQ's built in CLI) will need several predefined APIs to:
This needs to change. A worker in |
To be honest I don't really see any need for that right now. I do understand this may be relevant in the future (mostly for monitoring purposes), but for now I was just thinking of calling the processing unit of that specific worker as what it actually is, a thread. This is important anyways, so we can have any single job-processing unit as a
Not sure I follow, what I mean was that if horse-related methods are not used by everyone, would make more sense to have the "base" worker (the one everyone inherits from) as thin as possible. In practice that would mean switching: the
To be honest I never quite understood why, I personally used a slightly updated version of the
That's true, however, I also think it's fair that we accept that not necessarily all features will be implemented to all workers every time, specially during the first iterations - Celery is good benchmark here, on their docs they explicitly mention which
That's a perfect approach, but arguably a really high standard for the very first iteration of the first Worker that implements concurrency to have to implement generic monitoring capabilities. But let's pursuit it, it's definitely a great ambition to have.
You mean the Let me get back to your specific review comments so we can make some progress and iterate. |
This is a good idea, we can refactor things out to a
Well, nothing prevents other people from writing their own That's what I'm trying to figure out now, bit by bit :). |
Yes, we should keep the logic simple:
|
I took a more thorough look on the implementation so far. My feedbacks:
|
Great, we can start looking at this, in a new PR, leaving all common methods to the
Most definitely yes, my point here is that we could implement the protocol step by step, mainly because we are going to learn other things when implementing new workers. So my idea was to do that incrementally, instead of trying to figure the full protocol right now.
I don't remember exactly why I chose that specific implementation. From what I can remember, with a concurrent worker, there was a scenario where two threads were working in parallel, and they actually shouldn't (because of the dependency). Let me take a look at this again, need to remember the dependency/queueing/deferring logic.
Sure, I'll work on that on a separate PR then.
Ideas on this implementation? The way this works for the thread is native, since the callback is attached to the
That's fine also, I don't see the downside of having the info pre calculated (thinking of monitoring), but not really that relevant, so we can remove that as well. |
Yes, I think we're aligned on this one. We're already doing this in a step by step fashion, as this PR continues to evolve :). We have three different worker models:
We'll try to sketch out the protocols based on these three models and evolve them as needed. The
Yes, I think Maybe it's signature can be
|
As an aside, if we can land this PR, ProcessPoolExecutor has very similar APIs so we'll actually get |
Exactly, that was one of the reasons I went with the And actually the logic is very similar for a |
Just so that we don't work on the same things, I'm working on bringing |
Unless you're already working on this, let me know. I'll work on something else. |
It would be really nice to have this. We have been trying to use |
There are relevant limitations to the `ThreadPoolWorker`, here's a non extensive list: | ||
|
||
- Dependencies might not behave like expected | ||
- Job registries may not be reliable (specially with jobs that were timed out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Biggest thing i like about rq
is its job reliability as it maintains different job registries correctly. Is it possible to have the same level of reliability with thread pool worker?
we process kafka queue and hand create rq task to process those events. Reliability is super important as we dont want to loose task without knowing about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback, definitely something to take into account. I've been working on a different branch since there were other relevant changes to allow a more robust design, will work on this as well. I do imagine that there will be limitations though, specially on the first couple of versions. Hopefully nothing that will hurt reliability... we'll see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appreciate it @ccrvlh . We are in the works to switch from celery to rq for the reliability factor.
btw, i pulled this branch down and tried the threadpool with task timing out but was able to get to consistently get a retry on it. Any thing specific you can point out where you have seen this issue? (just curious)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's been a while since I worked on this... I'm mostly working on the v2 branch now (which has this worker setup), but still a lot to do. I'll try to make some tests with it this weekend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you can share the link of the branch i would appreciate it. Just want to see and learn. I tried searching on this repo but was not able to find it.
UPDATE: i got it under your fork. thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! It's on my fork only, you might be able to find it from this repo (or from my profile), I'm on mobile, but I'll post the link here when I get to the PC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ccrvlh just curious on the timeline for this feature or in general V2 since we want to rollout rq in prod but multithread is critical to use. thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also can we try to release this with 2.0 under experimental flag. Based on my tests things looked good.
cc @selwin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gyandeeps like I mentioned, I'll only merge this in slowly, one bit at a time to make sure we don't mess anything up.
RQ version 2.0 also changed how jobs are enqueued so this branch is probably stale.
Is there any update on this feature? This would be a really nice feature |
This is a (very) rough sketch of what a multithreaded worker may look like.
The way this is currently implemented is: the main worker process is responsible for dequeuing jobs and monitoring the pool, if the pool has a free slot, it dequeues a job, and submits it to the executor, if it doesn't, it waits. It uses the
TimerDeathPenalty
to ensure the timeout, and receives apool_size
argument (Python's default is CPU * 5, we could adopt something similar).To monitor the thread pool, it uses a simple counter that is protected by a
Lock
, and decremented (the argument isidle_pool_slot
) as soon as a job occupies a thread, and incremented (the free slots) as soon as the job finishes (through adone_callback
) - the logic was built on top of anidle_pool
, we could easily do the other way round as well and sayrunning_threads
and compare it to themax_workers
.I though this implementation was the simplest (no need to spin threads manually and kill one by one), and the one that required the least amount of customization on the main worker code (we like simple 😀). It would be great if I was wrong and there's an even simpler way!
The only reason the
work()
method was customized, was because of the waiting it does while checking for a free slot in the pool, other than that, is a copy/paste of the original. Same as the_shutdown()
method: it's only there to actually shutdown theexecutor
.I copied the Worker tests and ran all of them using the
ThreadpoolWorker
. Currently, three of them fail: (1) timeout moves job to failed registry (2) race condition on dependencies and (3) result lifespan. I didn't spend time looking into why.Concurrency has been a big topic for quite a while now. At the moment both the
SimpleWorker
and theWorker
are very mature, but to be honest, it's going to take quite a while before aThreadpoolWorker
becomes as robust and mature as the existing ones (or any other new worker).However, this topic has been hanging for close to 10 years now. I would favor a simplistic
beta
release approach, in which not all features are necessarily implemented (dependencies, custom handlers, etc) and tested. I think this would greatly help us understand the issues with this worker, room for improvements and ways to make it as solid as the other ones - we could use warnings and be explicit about its experimental nature.This can be extremely useful for I/O bound workloads.
I would love comments, contributions, critics, and anything else that could help us move forward with adding native concurrency support to RQ - leaving as draft so we can discuss on top of it.
Related to #45 #404 #734 #1804
Fixes #1804