4

I have a process that requires each row of a dataframe processed and then a new value appended to each row. It's a large dataframe and taking hours to process one dataframe at a time.

If I have a iterrow loop that sends each row to a function, can I parallize my processing for a speedup? The results of the row are not related

basically my code something like this

for index, row in df.iterrows():
   row['data'] = function[row]

Is there a easy way to do this to speed up processing?

3
  • Probably take a look at Dask. Commented Oct 14, 2020 at 22:37
  • 1
    In my experience, if you are looping through a DataFrame, you're probably not doing it the Pandas way. Commented Oct 14, 2020 at 22:50
  • As @Jarad said you probably don't want to iterate. See if vectorized alternatives exist (not apply, depending on what function does there may be better options) -- see this answer by me and also this one on when (not) to use apply. Commented Oct 15, 2020 at 0:04

2 Answers 2

1

While iterating over rows isnt good practice and there can be alternate logics with grouby/transform aggregations etc, but if in worst case you really need to do so, follow the answer. Also, you might not need to reimplement everything here and you can use libraries like Dask, which is built on top of pandas.

But just to give Idea, you can use multiprocessing (Pool.map) in combination with chunking. read csv in chunk(or make chucks as mentioned in the end of answer) and map it to the pools, in processing each chunk add new rows (or add them to list and make new chunk) and return it from the function.

In the end combine the dataframes when all pools are executed.

import pandas as pd
import numpy as np
import multiprocessing


def process_chunk(df_chunk):
        
        for index, row in df_chunk.reset_index(drop = True).iterrows():
                    #your logic for updating this chunk or making new chunk here
                         
                    print(row)
                    
                    print("index is " + str(index))
        #if you can added to same df_chunk, return it, else if you appended
        #rows to have list_of_rows, make a new df with them and return
        #pd.Dataframe(list_of_rows)  

        return df_chunk   


if __name__ == '__main__':
            #use all available cores , otherwise specify the number you want as an argument,
            #for example if you have 12 cores,  leave 1 or 2 for other things
            pool = multiprocessing.Pool(processes=10) 
            
            results = pool.map(process_chunk, [c for c in pd.read_csv("your_csv.csv", chunksize=7150)])
            pool.close()
            pool.join()
            
            #make new df by concatenating
            
            concatdf = pd.concat(results, axis=0, ignore_index=True)
            

Note: Instead of reading csv you can pass chucks by the same logic, to calculate chunk-size you might want something like round_of( (length of df) / (number of core available-2)) eg 100000/14 = round(7142.85) = 7150 rows per chunk

 results = pool.map(process_chunk,
        [df[c:c+chunk_size] for c in range(0,len(df),chunk_size])
Sign up to request clarification or add additional context in comments.

Comments

1

Instead of using df.iterrows() why not just use a vectorized method like apply()?

df.apply(function, axis=1)

.apply() is a Pandas way to perform iterations on columns/rows. It takes advantage of vectorized techniques and speeds up execution of simple and complex operations by many times.

Check this Reference article to see how it differs.

Other options are looking at Dask, Vaex or just good old fashion Multiprocessing.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.