Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] For testing it would be great to have an executor which support tasks with more then one core #252

Open
jan-janssen opened this issue Jan 23, 2024 · 0 comments

Comments

@jan-janssen
Copy link
Member

jan-janssen commented Jan 23, 2024

Concept:

from concurrent.futures import Executor, Future, ProcessPoolExecutor
import queue
from threading import Thread
from time import sleep

from pympipool.shared import cancel_items_in_queue


def get_callback(future_external):
    def callback(fn):
        if fn.cancelled():
            future_external.cancel()
        else:
            future_external.set_result(fn.result())
    return callback   


def refresh(running_dict, todo_lst, max_workers, exe):
    running_dict = {k: v for k, v in running_dict.items() if not k.done()}
    cores_available = max_workers - sum(running_dict.values())
    if len(todo_lst) > 0 and cores_available > todo_lst[0]["cores"]:
        task_dict = todo_lst.pop(0)
        fn_external = task_dict["future"]
        if fn_external.set_running_or_notify_cancel():
            running_dict[fn_external] = task_dict["cores"]
            fn_internal = exe.submit(task_dict["fn"], *task_dict["args"], **task_dict["kwargs"])
            fn_internal.add_done_callback(get_callback(future_external=fn_external))
        refresh(running_dict=running_dict, todo_lst=todo_lst, max_workers=max_workers, exe=exe)


def execute_task_dict(task_dict, todo_lst):
    if "fn" in task_dict.keys() or "future" in task_dict.keys():
        todo_lst.append(task_dict)
        return True
    elif "shutdown" in task_dict.keys() and task_dict["shutdown"]:
        return False
    else:
        raise ValueError("Unrecognized Task in task_dict: ", task_dict)


def background_execution(future_queue, exe_args, exe_kwargs, sleep_interval=0.1):
    todo_lst = []
    running_dict = {}
    with ProcessPoolExecutor(*exe_args, **exe_kwargs) as exe:
        while True:
            try:
                task_dict = future_queue.get_nowait()
            except queue.Empty:
                pass
            else:
                if execute_task_dict(task_dict=task_dict, todo_lst=todo_lst):
                    future_queue.task_done()
                else:
                    future_queue.task_done()
                    future_queue.join()
                    break
            refresh(running_dict=running_dict, todo_lst=todo_lst, max_workers=exe._max_workers, exe=exe)
            sleep(sleep_interval)


class MultiProcessPoolExecutor(Executor):
    def __init__(self, *args, **kwargs):
        self._future_queue = queue.Queue()
        self._process = Thread(
            target=background_execution,
            kwargs={
                "exe_args": args, 
                "exe_kwargs": kwargs,
                "future_queue": self._future_queue,
            },
        )
        self._process.start()

    def submit(self, fn, *args, cores=1, **kwargs):
        f = Future()
        self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f, "cores": cores})
        return f

    def shutdown(self, wait=True, *, cancel_futures=False):
        if cancel_futures:
            cancel_items_in_queue(que=self._future_queue)
        self._future_queue.put({"shutdown": True, "wait": wait})
        if wait:
            self._process.join()
            self._future_queue.join()
        self._process = None
        self._future_queue = None

Example:

from multi import MultiProcessPoolExecutor

def test_addition(a, b):
    return a + b


if __name__ == "__main__":
    with MultiProcessPoolExecutor() as exe:
        fn = exe.submit(test_addition, 1, 2)
        print(fn.result())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant