15

I have 5,000,000 rows in my dataframe. In my code, I am using iterrows() which is taking too much time. To get the required output, I have to iterate through all the rows . So I wanted to know whether I can parallelize the code in pandas.

2
  • 5
    Vectorize before parrallelize!!! You can vectorize in panda by avoiding iterrows(). If you are using iterrows at all, you probably haven't spent enough time learning pandas basics. Do that first. After that, ask a specific question showing your code along with some sample data. Commented Mar 17, 2016 at 15:51
  • @JohnE I actually needed to use iterrows as well, similarly having to process 10s of millions of rows in a file, and there was a need to aggregate keys for counting. To process the huge dataframe, it had to be split into chunks, which poses a problem for aggregation by keys (since they might appear in separate chunks). Hence the approach was to use an external state store (e.g. redis) for aggregation, by performing row-wise writes for each row's key into the state store for counting, which meant the use of iterrows(). Unless there are other ways to aggregate huge dataframes by vectorization? Commented Aug 31, 2019 at 16:02

3 Answers 3

17

Here's a webpage I found that might help: http://gouthamanbalaraman.com/blog/distributed-processing-pandas.html

And here's the code for multiprocessing found in that page:

import pandas as pd
import multiprocessing as mp

LARGE_FILE = "D:\\my_large_file.txt"
CHUNKSIZE = 100000 # processing 100,000 rows at a time

def process_frame(df):
    # process data frame
    return len(df)

if __name__ == '__main__':
    reader = pd.read_table(LARGE_FILE, chunksize=CHUNKSIZE)
    pool = mp.Pool(4) # use 4 processes

    funclist = []
    for df in reader:
        # process each data frame
        f = pool.apply_async(process_frame,[df])
        funclist.append(f)

    result = 0
    for f in funclist:
        result += f.get(timeout=10) # timeout in 10 seconds

    print "There are %d rows of data"%(result)
Sign up to request clarification or add additional context in comments.

1 Comment

Will the data order be retained? Some f's will finish faster but the get() will return in order, won't it?
11

In 2022, you DO NOT need to implement multiprocessing by yourself. Actually pandarallel provides an one-line solution for the parallel processing in pandas. Just follow the next two step:

First, install it

pip install pandarallel [--upgrade] [--user]

Second, replace your df.apply(func) with df.parallel_apply(func), and you'll see it work as you expect!

from pandarallel import pandarallel

pandarallel.initialize(progress_bar=True) # initialize(36) or initialize(os.cpu_count()-1)

#### old code:
# df.apply(func)
#### new code:
df.parallel_apply(func) 
#### and then you can see that CPU cores are called to run your pandas op~

More ops~

Without parallelization With parallelization
df.apply(func) df.parallel_apply(func)
df.applymap(func) df.parallel_applymap(func)
df.groupby(args).apply(func) df.groupby(args).parallel_apply(func)
df.groupby(args1).col_name.rolling(args2).apply(func) df.groupby(args1).col_name.rolling(args2).parallel_apply(func)
df.groupby(args1).col_name.expanding(args2).apply(func) df.groupby(args1).col_name.expanding(args2).parallel_apply(func)
series.map(func) series.parallel_map(func)
series.apply(func) series.parallel_apply(func)
series.rolling(args).apply(func) series.rolling(args).parallel_apply(func)

Enjoy your dinner :D

1 Comment

does it work in jupyter notebook?
4

This code shows how you might break up a large dataframe into smaller dataframes each with a number of rows equal to N_ROWS (except possibly for the last dataframe) and then pass each dataframe to a process pool (of whatever size you want, but there is no point in using anything larger than the number of processors you have). Each worker process returns the modified dataframe back to the main process which then reassembles the final result dataframe by concatenating the return values from the worker processes:

import pandas as pd
import multiprocessing as mp


def process_frame(df):
    # process data frame
    # create new index starting at 0:
    df.reset_index(inplace=True, drop=True)
    # increment everybody's age:
    for i in range(len(df.index)):
        df.at[i, 'Age'] += 1
    return df


def divide_and_conquer(df):
    N_ROWS = 2 # number of rows in each dataframe
    with mp.Pool(3) as pool: # use 3 processes
        # break up dataframe into smaller daraframes of N_ROWS rows each
        cnt = len(df.index)
        n, remainder = divmod(cnt, N_ROWS)
        results = []
        start_index = 0
        for i in range(n):
            results.append(pool.apply_async(process_frame, args=(df.loc[start_index:start_index+N_ROWS-1, :],)))
            start_index += N_ROWS
        if remainder:
            results.append(pool.apply_async(process_frame, args=(df.loc[start_index:start_index+remainder-1, :],)))
        new_dfs = [result.get() for result in results]
        # reassemble final dataframe:
        df = pd.concat(new_dfs, ignore_index=True)
        return df



if __name__ == '__main__':
    df = pd.DataFrame({
        "Name": ['Tom', 'Dick', 'Harry', 'Jane', 'June', 'Sally', 'Mary'],
        "Age": [10, 20, 30, 40, 40, 60, 70],
        "Sex": ['M', 'M', 'M', 'F', 'F', 'F', 'F']
    })
    print(df)
    df = divide_and_conquer(df)
    print(df)

Prints:

    Name  Age Sex
0    Tom   10   M
1   Dick   20   M
2  Harry   30   M
3   Jane   40   F
4   June   40   F
5  Sally   60   F
6   Mary   70   F
    Name  Age Sex
0    Tom   11   M
1   Dick   21   M
2  Harry   31   M
3   Jane   41   F
4   June   41   F
5  Sally   61   F
6   Mary   71   F

2 Comments

Is it possible to modify code so that process_frame function returns two data frames, let's one with increased age, another with decreased age? so the return would like smth return df1, df2.
@errenmike1806 Why not? You could modify process_frame to make a copy of the passed dataframe with df2 = df.copy(deep=True) and then subtract 1 instead of adding 1 using df2. Then the function would return a tuple (df, df2)` and the main process would build up two aggregate dataframe instances.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.