We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concept:
from concurrent.futures import Executor, Future, ProcessPoolExecutor import queue from threading import Thread from time import sleep from pympipool.shared import cancel_items_in_queue def get_callback(future_external): def callback(fn): if fn.cancelled(): future_external.cancel() else: future_external.set_result(fn.result()) return callback def refresh(running_dict, todo_lst, max_workers, exe): running_dict = {k: v for k, v in running_dict.items() if not k.done()} cores_available = max_workers - sum(running_dict.values()) if len(todo_lst) > 0 and cores_available > todo_lst[0]["cores"]: task_dict = todo_lst.pop(0) fn_external = task_dict["future"] if fn_external.set_running_or_notify_cancel(): running_dict[fn_external] = task_dict["cores"] fn_internal = exe.submit(task_dict["fn"], *task_dict["args"], **task_dict["kwargs"]) fn_internal.add_done_callback(get_callback(future_external=fn_external)) refresh(running_dict=running_dict, todo_lst=todo_lst, max_workers=max_workers, exe=exe) def execute_task_dict(task_dict, todo_lst): if "fn" in task_dict.keys() or "future" in task_dict.keys(): todo_lst.append(task_dict) return True elif "shutdown" in task_dict.keys() and task_dict["shutdown"]: return False else: raise ValueError("Unrecognized Task in task_dict: ", task_dict) def background_execution(future_queue, exe_args, exe_kwargs, sleep_interval=0.1): todo_lst = [] running_dict = {} with ProcessPoolExecutor(*exe_args, **exe_kwargs) as exe: while True: try: task_dict = future_queue.get_nowait() except queue.Empty: pass else: if execute_task_dict(task_dict=task_dict, todo_lst=todo_lst): future_queue.task_done() else: future_queue.task_done() future_queue.join() break refresh(running_dict=running_dict, todo_lst=todo_lst, max_workers=exe._max_workers, exe=exe) sleep(sleep_interval) class MultiProcessPoolExecutor(Executor): def __init__(self, *args, **kwargs): self._future_queue = queue.Queue() self._process = Thread( target=background_execution, kwargs={ "exe_args": args, "exe_kwargs": kwargs, "future_queue": self._future_queue, }, ) self._process.start() def submit(self, fn, *args, cores=1, **kwargs): f = Future() self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f, "cores": cores}) return f def shutdown(self, wait=True, *, cancel_futures=False): if cancel_futures: cancel_items_in_queue(que=self._future_queue) self._future_queue.put({"shutdown": True, "wait": wait}) if wait: self._process.join() self._future_queue.join() self._process = None self._future_queue = None
Example:
from multi import MultiProcessPoolExecutor def test_addition(a, b): return a + b if __name__ == "__main__": with MultiProcessPoolExecutor() as exe: fn = exe.submit(test_addition, 1, 2) print(fn.result())
The text was updated successfully, but these errors were encountered:
No branches or pull requests
Concept:
Example:
The text was updated successfully, but these errors were encountered: