Exemplary dummy example:
I have a DataFrame df:
> df
para0 para1 para2
0 17.439020 True high
1 19.757758 True high
2 12.434424 True medium
3 14.789654 True low
4 14.131464 False high
5 9.900233 True high
6 10.977869 False low
7 8.004251 True medium
8 11.468420 False low
9 12.764453 False high
in which each row consists of a collection of parameters for a function
foobar:
def foobar(r):
""" r is a row of df, does something, and it takes a long time"""
if r.para1:
x = r.para2
else:
x = 'low'
return int(r.para0), (r.Index+13)%3 == 0, x
I would like to apply foobar to each row of df, collect its
results, and store these together with their respected parameters in a,
well, DataFrame.
My (current) solution:
df['count'] = 0
df['valid'] = False
df['outpt'] = ''
def wrapper(r, df):
c, v, o = foobar(r)
df.ix[r.Index,'count'] = c
df.ix[r.Index,'valid'] = v
df.ix[r.Index,'outpt'] = o
for r in df.itertuples():
wrapper(r, df)
This yields:
> df
para0 para1 para2 count valid outpt
0 17.439020 True high 17.0 False high
1 19.757758 True high 19.0 False high
2 12.434424 True medium 12.0 True medium
3 14.789654 True low 14.0 False low
4 14.131464 False high 14.0 False low
5 9.900233 True high 9.0 True high
6 10.977869 False low 10.0 False low
7 8.004251 True medium 8.0 False medium
8 11.468420 False low 11.0 True low
9 12.764453 False high 12.0 False low
Here is my question:
In real life, the function foobar is
computational expensive and takes approximately 20-30 min to run, df
has typically between 100-2000 rows. I have access to a machine with
eight cores, and as foobar only depends on the current processed row
and neither on anything else, it should be trivial to run these
computations in parallel.
It would also be nice that, when something goes wrong (say, if someone accidentally turns off the machine), it would not be necessary to start everything from the beginning, i.e., to skip rows which have already been processed.
How can I do this?
My try on multiprocessing unfortunately failed:
from multiprocessing import Pool
pool = Pool(3)
results = []
for r in df.itertuples():
results += [pool.apply_async(wrapper, r, df)]
With:
> results[0].get()
…
/usr/lib/python3.5/multiprocessing/reduction.py in dumps(cls, obj, protocol)
48 def dumps(cls, obj, protocol=None):
49 buf = io.BytesIO()
---> 50 cls(buf, protocol).dump(obj)
51 return buf.getbuffer()
52
PicklingError: Can't pickle <class 'pandas.core.frame.Pandas'>: attribute lookup Pandas on pandas.core.frame failed
Here is how I created the toy DataFrame:
import pandas as pd
import numpy as np
df = pd.DataFrame({
'para0' : pd.Series(
np.random.gamma(12,size=10),
dtype=np.float),
'para1' : pd.Series(
[(True,False)[i] for i in np.random.randint(0,2,10)],
dtype=np.bool),
'para2' : pd.Categorical(
[('low','medium','high')[i] for i in np.random.randint(0,3,10)],
ordered=True),
})
