Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nested multiprocessing in notebooks with mpire #29

Open
sergpolly opened this issue Dec 14, 2023 · 2 comments
Open

nested multiprocessing in notebooks with mpire #29

sergpolly opened this issue Dec 14, 2023 · 2 comments

Comments

@sergpolly
Copy link
Member

Consider demonstrating an example of parallel execution of some of the cooltools API functions for multiple samples - i.e. when an API function itself is using multiprocessing and we want to do it in the in the notebook ...

If one have a big multicore system (16 real cores and more) it is easy to run several CLI tasks in parallel for multiple samples, where each task itself is using several cores - i.e. is multiprocessed. Very often such multiprocessed operations does not scale well beyond 8-12 processes - so it is indeed more "economical" to process multiple samples at once with fewer cores each.

Now - what if we want to achieve the same but in the notebook ? It is not trivial to do so - because multiprocess does not allow nesting (the way we typically use it/out of the box). Now it can be easily done with MPIRE https://github.com/sybrenjansen/mpire , which allows running multiple multiprocessed task in parallel and its API is very similar to multiprocess itself ... Check it out:

mpire test:

from mpire import WorkerPool
clrs  # dictionary of several coolers
exp_kwargs = dict(view_df=hg38_arms, nproc=12)

def _job(sample):
        _clr = clrs[sample]
        _exp = cooltools.expected_cis( _clr, **exp_kwargs)
    return (sample, _exp)

# have to use daemon=False, because _job is multiprocessing-based already ...
# trying to run 8 samples in parallel, each using 12-processes - 8*12=96
with WorkerPool(n_jobs=8, daemon=False) as wpool:
    results = wpool.map(_job, telo_clrs, progress_bar=True)

# sort out the results ...
exps = {sample: _exp for sample, _exp in results}

# this takes ~1 min for 16 coolers @ 25kb on 56-core system (112 thread)

one-by-one using a ton of cores per task:

exp_kwargs = dict(view_df=hg38_arms, nproc=112)
exps = {}
for sample, _clr in clrs.items():
    print(f"calculating expected for {sample} ...")
        exps[sample] = cooltools.expected_cis( _clr, **exp_kwargs)

# this takes > 2 mins, and shows no time improvements after nproc=32 ...

this has limited application to projects with many samples and people with big workstations - but when those 2 criteria are both met - the speed up is very appreciated

@sergpolly
Copy link
Member Author

Had issues myself using method="fork" on the outtermost WorkerPool - it was crashing shamelessly ...
Here is an example on how to use method="spawn" or forkserver (it is also important to add use_dill=True as well , otherwise pickle is start failing ...)

def _job(data_pack, sample):
    # unpack shared data ...
    clr_dict, view_df = data_pack
    # define cooler to work on ...
    _clr = clr_dict[sample]
    from cooltools.sandbox.obs_over_exp_cooler import expected_full
    # calculate full expected (cis + trans)
    _exp_full = expected_full(
            _clr,
            view_df=view_df,
            smooth_cis=False,
            aggregate_trans=True,
            expected_column_name="expected",
            nproc=8,
    )
    return (sample, _exp_full)

# have to use daemon=False, because _job is multiprocessing-based already ...
with WorkerPool(
    n_jobs=16,
    shared_objects=(clrs_samples_dict, hg38_arms),
    daemon=False,
    start_method='forkserver', # or spawn ...
    use_dill=True
) as wpool:
    results = wpool.map(
        _job,
        list(clrs_samples_dict.keys()),
        progress_bar=True
    )

# sort out the results ...
exps_full_dict = {sample: _exp for sample, _exp in results}

key here , is that spawn isn't "as aware" of all the global vars as fork, so one has to pass vars explicitly and do imports inside each task ...

@sergpolly
Copy link
Member Author

since nested multiprocess is experimental - better to keep track of related issues and such on the mpire side sybrenjansen/mpire#105

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant