Skip to content

ENH: parallel support in .apply #13111

@jreback

Description

@jreback

xref #5751

questions from SO.
mrocklins nice example of using .apply

So here is an example of how to do a parallel apply using dask. This could be baked into .apply() in pandas by the following signature enhancement:

current:

DataFrame.apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, 
                args=(), **kwds)

proposed:

DataFrame.apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, 
                engine=None, chunksize=None, args=(), **kwds)

where engine='dask' (or numba at some point) are possibilities
chunksize would map directly to npartitions and default to the number of cores if not specified.
further would allow engine to be a meta object like Dask(scheduler='multiprocessing') to support other options one would commonly pass (could also move chunksize inside that instead of as a separate object).

impl and timings:

from functools import partial
import pandas as pd
import dask
import dask.dataframe as dd
from dask import threaded, multiprocessing
from time import sleep

pd.__version__
dask.__version__

def make_frame(N):
    return pd.DataFrame({'A' : range(N)})
def slow_func(x):
    sleep(0.5)
    return x
df = make_frame(40)

# reg apply
def f1(df):
    return df.apply(slow_func, axis=1)
# dask apply
def f2(df, get):
    ddf = dd.from_pandas(df, npartitions=8, sort=False)
    return ddf.apply(slow_func, columns=df.columns, axis=1).compute(get=get)

f1 = partial(f1, df)
f2_threaded = partial(f2, df, threaded.get)
f2_multi = partial(f2, df, multiprocessing.get)

result1 = f1()
result2 = f2_threaded()
result3 = f2_multi()
In [18]: result1.equals(result2)
Out[18]: True

In [19]: result1.equals(result3)
Out[19]: True

In [20]: %timeit -n 1 -r 1 f1()
1 loop, best of 1: 20.6 s per loop

In [21]: %timeit -n 1 -r 1 f2_threaded()
1 loop, best of 1: 3.03 s per loop

In [22]: %timeit -n 1 -r 1 f2_multi()
1 loop, best of 1: 3.07 s per loop

Now for some caveats.

People want to parallelize a poor implementation. Generally you proceed thru the following steps first:

  • get your problem correct; optimizing incorrect results is useless
  • profile profile profile. This is always the first thing to do
  • use built-in pandas / numpy vectorized routines
  • use cython or numba on the user defined function
  • .apply is always the last choice
  • if its still not enough, parallelizaton.

You always want to make code simpler, not more complex. Its hard to know a-priori where bottlenecks are. People think .apply is some magical thing, its NOT, its JUST A FOR LOOP. The problem is people tend to throw in the kitchen sink, and just everything, which is just a terrible idea.

Ok my 2c about optimizing things.

In order for parallelization to actually matter the function you are computing should take some non-trivial amount of time to things like:

  • iteration costs of the loop
  • serialization time (esp if using multi-processing / distributed computing)
  • does the function release the GIL if not, then threading will probably not help much
  • development resources (your time)

If these criteria are met, then sure give it a try.

I think providing pandas a first class way to parallelize things, even tough people will just naively use it is probably not a bad thing.

Further extensions to this are: to_dask() (return a dask.dataframe to the user directly), and engine='dask' syntax for .groupby()

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions