5

Exemplary dummy example:

I have a DataFrame df:

> df

       para0  para1   para2
0  17.439020   True    high
1  19.757758   True    high
2  12.434424   True  medium
3  14.789654   True     low
4  14.131464  False    high
5   9.900233   True    high
6  10.977869  False     low
7   8.004251   True  medium
8  11.468420  False     low
9  12.764453  False    high

in which each row consists of a collection of parameters for a function foobar:

def foobar(r):
    """ r is a row of df, does something, and it takes a long time"""
    if r.para1:
        x = r.para2
    else:
        x = 'low'
    return int(r.para0), (r.Index+13)%3 == 0, x

I would like to apply foobar to each row of df, collect its results, and store these together with their respected parameters in a, well, DataFrame.

My (current) solution:

df['count'] = 0
df['valid'] = False
df['outpt'] = ''

def wrapper(r, df):
    c, v, o = foobar(r)
    df.ix[r.Index,'count'] = c
    df.ix[r.Index,'valid'] = v
    df.ix[r.Index,'outpt'] = o

for r in df.itertuples():
    wrapper(r, df)

This yields:

> df
       para0  para1   para2  count  valid   outpt
0  17.439020   True    high   17.0  False    high
1  19.757758   True    high   19.0  False    high
2  12.434424   True  medium   12.0   True  medium
3  14.789654   True     low   14.0  False     low
4  14.131464  False    high   14.0  False     low
5   9.900233   True    high    9.0   True    high
6  10.977869  False     low   10.0  False     low
7   8.004251   True  medium    8.0  False  medium
8  11.468420  False     low   11.0   True     low
9  12.764453  False    high   12.0  False     low

Here is my question:

In real life, the function foobar is computational expensive and takes approximately 20-30 min to run, df has typically between 100-2000 rows. I have access to a machine with eight cores, and as foobar only depends on the current processed row and neither on anything else, it should be trivial to run these computations in parallel.

It would also be nice that, when something goes wrong (say, if someone accidentally turns off the machine), it would not be necessary to start everything from the beginning, i.e., to skip rows which have already been processed.

How can I do this?


My try on multiprocessing unfortunately failed:

from multiprocessing import Pool

pool = Pool(3)
results = []

for r in df.itertuples():
    results += [pool.apply_async(wrapper, r, df)]

With:

> results[0].get()
…
/usr/lib/python3.5/multiprocessing/reduction.py in dumps(cls, obj, protocol)
     48     def dumps(cls, obj, protocol=None):
     49         buf = io.BytesIO()
---> 50         cls(buf, protocol).dump(obj)
     51         return buf.getbuffer()
     52

PicklingError: Can't pickle <class 'pandas.core.frame.Pandas'>: attribute lookup Pandas on pandas.core.frame failed

Here is how I created the toy DataFrame:

import pandas as pd
import numpy as np

df = pd.DataFrame({
    'para0' : pd.Series(
        np.random.gamma(12,size=10),
        dtype=np.float),
    'para1' : pd.Series(
        [(True,False)[i] for i in np.random.randint(0,2,10)],
        dtype=np.bool),
    'para2' : pd.Categorical(
        [('low','medium','high')[i] for i in np.random.randint(0,3,10)],
        ordered=True),
    })

3 Answers 3

2

I don't know if it helps, but try to use list instead of itertuples.

I mean something like this:

df_list = [[x[0], x[1],x[2]] for x in df.itertuples()]
for r in df_list:
    results += [pool.apply_async(wrapper, r, df)]
Sign up to request clarification or add additional context in comments.

Comments

1

If you want to keep the row as a dictionary you can use to_dict. Here is a working example (using starmap because additional parameters are being passed to the function):

from multiprocessing import Pool
import pandas as pd
from itertools import repeat

def test(df_row, otherparam):
    print(df_row, otherparam)
    return True

if __name__ == '__main__':
    df = pd.DataFrame({'a': [0, 1, 2], 'b':[1, 2, 3], 'c':[10, 20, 30]})
    df.set_index('a', inplace=True)
    pool = Pool(processes=2)
    it = df.reset_index().to_dict(orient='records')
    results = pool.starmap(test, zip(it, repeat(3)))
    print(results)

Output:

{'a': 0, 'b': 1, 'c': 10} 3
{'a': 1, 'b': 2, 'c': 20} 3
{'a': 2, 'b': 3, 'c': 30} 3
[True, True, True]

Comments

1

The error directly says that pickle cannot serialize an object with the pandas.core type.frame.Pandas which are df.itertuples() objects

>>>type(next(df.itertuples()))
>>>pandas.core.frame.Pandas

Solution №1

Instead of the standard multiprocessing module (which uses pickle under the hood), you can use pathos.multiprocessing module.

# pip install pathos
from pathos.multiprocessing import Pool

with Pool() as p:
    result = []
    for r in df.itertuples():
        results += [p.apply_async(wrapper, args=(r, df))

Ok. But let's look at the result

results[0]

      para0    para1    para2   count   valid   outpt
0   9.451356    False   medium  9.0     False   low
1   10.818135   False   low     NaN      NaN    NaN
2   13.438129   True    low     NaN      NaN    NaN
3   11.517698   False   medium  NaN      NaN    NaN
4   7.415294    False   low     NaN      NaN    NaN
5   14.500403   False   low     NaN      NaN    NaN
6   16.283561   True    medium  NaN      NaN    NaN
7   10.402704   True    medium  NaN      NaN    NaN
8   8.890628    True    medium  NaN      NaN    NaN
9   8.103542    False   medium  NaN      NaN    NaN
... ... ... ... ... ... ...

results[1]
       para0    para1   para2   count   valid   outpt
0   10.652145   False   medium   NaN     NaN    NaN
1   13.026500   False   high     13.0    False  low
2   8.030650    True    low      NaN     NaN    NaN
3   13.638145   False   low      NaN     NaN    NaN
4   12.118411   True    low      NaN     NaN    NaN
... ... ... ... ... ... ...

And all this is due to the fact that in the wrapper function you change the df dataframe every time. Since each of your workers has received a raw df and is making changes to it. You can't do this this is a very bad practice and can lead to not obvious problems and errors.

Solution №2

And what to remove the wrapper function at all? You just want to collect the results of executing the foobar function in a dataframe and attach them to the original dataframe df. Let's do that. But we will still use the Pool class from pathos.multiprocessing

def foobar(r):
    """ r is a row of df, does something, and it takes a long time"""
    if r.para1:
        x = r.para2
    else:
        x = 'low'
    return {'count':int(r.para0), 'valid': (r.Index+13)%3 == 0, 'outpt':x}

#create tasks
tasks= [r for r in df.itertuples()]
with Pool() as p:
    results = pd.DataFrame(p.map(foobar, tasks))

results
    count   valid   outpt
0   10     False    low
1   13     False    low
2   8       True    low
3   13     False    low
4   12     False    low
... ... ... ...
995 15      True    low
996 12     False    low
997 5      False    high
998 13      True    low
999 13     False    medium

and then:

result_df = pd.concat([df, results], axis=1)

Solution №3

Great, it works. But can we get around the problem with pickle using the standard multiprocessing library? Let me remind you once again that the problem lies in the serialization of objects from df.itertuples() , that is, in fact, that we are trying to feed them to some method of the Pool class (apply_async, map, etc). So let's do it inside the wrapper function? We only need to add one line:


def parallel_wrapper(df):
    for r in df.itertuples():
        c, v, o = foobar(r)
        df.at[r.Index,'count'] = c
        df.at[r.Index,'valid'] = v
        df.at[r.Index,'outpt'] = o
    return df

Now to do this in parallel, we just need to split the original dataframe into parts and use the method (for example) Pool().map of the standard multiprocessing library. It is easy to split a dataframe into parts along the 0 axis using np.array_split(df, chunks)

from multiprocessing import Pool

with Pool() as p:
    results = p.map(parallel_wrapper, np.array_split(df, 100))

and then:

result_df = pd.concat(results, ignore_index=True) 

P.S

And finally, as a cherry on the cake, I would like to show the parallelbar library, which allows you to visualize the progress when executing such methods of the Pool class as map, imap and imap_unordered For example, solution 3 would look like this:

# pip install parallelbar
from parallelbar import progress_map

progress_map(parallel_wrapper, np,array_split(df, 100), core_progress= True) 

# for 4 cores

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.