-
-
Notifications
You must be signed in to change notification settings - Fork 19.1k
Description
xref #5751
questions from SO.
mrocklins nice example of using .apply
So here is an example of how to do a parallel apply using dask. This could be baked into .apply()
in pandas by the following signature enhancement:
current:
DataFrame.apply(self, func, axis=0, broadcast=False, raw=False, reduce=None,
args=(), **kwds)
proposed:
DataFrame.apply(self, func, axis=0, broadcast=False, raw=False, reduce=None,
engine=None, chunksize=None, args=(), **kwds)
where engine='dask'
(or numba
at some point) are possibilities
chunksize
would map directly to npartitions
and default to the number of cores if not specified.
further would allow engine
to be a meta object like Dask(scheduler='multiprocessing')
to support other options one would commonly pass (could also move chunksize
inside that instead of as a separate object).
impl and timings:
from functools import partial
import pandas as pd
import dask
import dask.dataframe as dd
from dask import threaded, multiprocessing
from time import sleep
pd.__version__
dask.__version__
def make_frame(N):
return pd.DataFrame({'A' : range(N)})
def slow_func(x):
sleep(0.5)
return x
df = make_frame(40)
# reg apply
def f1(df):
return df.apply(slow_func, axis=1)
# dask apply
def f2(df, get):
ddf = dd.from_pandas(df, npartitions=8, sort=False)
return ddf.apply(slow_func, columns=df.columns, axis=1).compute(get=get)
f1 = partial(f1, df)
f2_threaded = partial(f2, df, threaded.get)
f2_multi = partial(f2, df, multiprocessing.get)
result1 = f1()
result2 = f2_threaded()
result3 = f2_multi()
In [18]: result1.equals(result2)
Out[18]: True
In [19]: result1.equals(result3)
Out[19]: True
In [20]: %timeit -n 1 -r 1 f1()
1 loop, best of 1: 20.6 s per loop
In [21]: %timeit -n 1 -r 1 f2_threaded()
1 loop, best of 1: 3.03 s per loop
In [22]: %timeit -n 1 -r 1 f2_multi()
1 loop, best of 1: 3.07 s per loop
Now for some caveats.
People want to parallelize a poor implementation. Generally you proceed thru the following steps first:
- get your problem correct; optimizing incorrect results is useless
- profile profile profile. This is always the first thing to do
- use built-in pandas / numpy vectorized routines
- use
cython
ornumba
on the user defined function .apply
is always the last choice- if its still not enough, parallelizaton.
You always want to make code simpler, not more complex. Its hard to know a-priori where bottlenecks are. People think .apply
is some magical thing, its NOT, its JUST A FOR LOOP. The problem is people tend to throw in the kitchen sink, and just everything, which is just a terrible idea.
Ok my 2c about optimizing things.
In order for parallelization to actually matter the function you are computing should take some non-trivial amount of time to things like:
- iteration costs of the loop
- serialization time (esp if using multi-processing / distributed computing)
- does the function release the GIL if not, then threading will probably not help much
- development resources (your time)
If these criteria are met, then sure give it a try.
I think providing pandas a first class way to parallelize things, even tough people will just naively use it is probably not a bad thing.
Further extensions to this are: to_dask()
(return a dask.dataframe
to the user directly), and engine='dask'
syntax for .groupby()