Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to properly shutdown, mid-process, a multithreaded graph? #155

Open
DaveMtl opened this issue Jun 18, 2021 · 7 comments
Open

How to properly shutdown, mid-process, a multithreaded graph? #155

DaveMtl opened this issue Jun 18, 2021 · 7 comments

Comments

@DaveMtl
Copy link

DaveMtl commented Jun 18, 2021

Greetings,

First, great library, we are making a pyqt app to handle data processing and flowpipe has been immensely useful.

I'd like to know if you have any advices on how to properly stop an app while a graph is running in threading mode.

Right now we are getting the following exception:
raise RuntimeError('cannot schedule new futures after ' RuntimeError: cannot schedule new futures after interpreter shutdown

My first idea was to add a flag inside the _evaluate_threaded function to quit when set to True. But doing it this way doesn't allow my nodes to properly close and they might be working on writing a file which would corrupt it.

I could wait for the graph to end before quitting but some of our processes could take a long time and that wouldn't be great for the user. Or maybe I could broadcast a message to raise an exception in all nodes so they can quite gracefully.

I wonder if you have any advice on how to handle that.

Thanks!

@neuneck
Copy link
Collaborator

neuneck commented Jun 19, 2021

Hi Dave, I'm glad you like flowpipe!

My first instinct is to go for a threading.Event and have all nodes react to that in whatever manner they need to (e.g. closing files before quitting). This might require some re-engineering of your nodes, though.

An easier to implement solution would be to track the event in _evaluate_threaded and shut down the ThreadPoolExecutor using executor.shutdown(wait=True, cancel_futures=True) method (https://docs.python.org/3.9/library/concurrent.futures.html#concurrent.futures.Executor.shutdown). This will block until the running nodes finish, though, which is what you didn't want.
I don't know how brutal executor.shutdown() is in cancelling running threads. I believe all it does it continue the control flow in the main thread, leaving all currently running threads to their devices. Thus, even in such a situation I don't think you'd end up with corrupted files. Do test this a bit in some toy samples before trusting it, though - I'm really not 100% sure of Python's behavior here.

The pull request at #154 might be interesting for you. I revamped the way various evaluation modes are implemented, for easier extension / manipulation of the Graph's manner of being computed. The functionality is there, I just haven't gotten around to updating the documentation / code samples to it. Seeing that users consider modifying the evaluation behavior gives me more motivation to complete that soon, since that's exactly what that feature's supposed to enable.

@DaveMtl
Copy link
Author

DaveMtl commented Jun 21, 2021

Hi neuneck,

I'll have a look threading.event, re-engineering our nodes shouldn't be too bad at this point of our project. I like that each node can have a specific reaction of a shutdown event, some can quit at any time while others are more sensitive to early shutdown.

#154, is something we've been pondering lately. For exemple, how to begin another run of the graph, to process another file, while the current one is still running in order to maximize CPU uses.

Thanks for the reply!

@neuneck
Copy link
Collaborator

neuneck commented Jun 21, 2021

Let me (us) know how/what worked for you in the end. I think an interruption feature and/or timeout on Graph evaluation would be very useful.

@neuneck
Copy link
Collaborator

neuneck commented Jun 21, 2021

About starting another run of the Graph - is it an option to set up a second graph?

What we're doing to run multiple graphs in parallel is create the graphs from a factory function (e.g. make_graph(file_path)) and then run these graphs in parallel (implicitly in different greenlets, as they get created and executed within gunicorn workers in our case, but threads or processes should work just the same).

@DaveMtl
Copy link
Author

DaveMtl commented Jun 21, 2021

In our case it's not that straightforward because what handles the files is just another node inside the graph. We have multiple nodes to handle different types of files. Depending of the process we need to do, we use the appropriate "file node" at the beginning of the pipeline and we run the pipeline as long as the file node still has files to process.

That said, we could spawn a new graph as long as the file node isn't done and initialize it with an iteration number so it knows which file to process. Food for thought...

@neuneck
Copy link
Collaborator

neuneck commented Jun 29, 2021

Hi @DaveMtl
Were you able to get the behavior you wanted? If so, I'd be interested to hear about your solution.

@DaveMtl
Copy link
Author

DaveMtl commented Jun 29, 2021

Hi @neuneck,
For the parallel graphs, I added the concept of a master node. Before starting a graph I check if a master node is in the graph. Then after the graph is finished I check if the master node is done or not, if it's not, I start the graph again.
Since the app also work in console mode, I could also execute multiple app in parallel on a cluster where each instance of the app process just 1 file.

I haven't implemented to stop function yet, I'll let you know what I end up doing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants