I have 5,000,000 rows in my dataframe. In my code, I am using iterrows() which is taking too much time. To get the required output, I have to iterate through all the rows . So I wanted to know whether I can parallelize the code in pandas.
-
5Vectorize before parrallelize!!! You can vectorize in panda by avoiding iterrows(). If you are using iterrows at all, you probably haven't spent enough time learning pandas basics. Do that first. After that, ask a specific question showing your code along with some sample data.JohnE– JohnE2016-03-17 15:51:01 +00:00Commented Mar 17, 2016 at 15:51
-
@JohnE I actually needed to use iterrows as well, similarly having to process 10s of millions of rows in a file, and there was a need to aggregate keys for counting. To process the huge dataframe, it had to be split into chunks, which poses a problem for aggregation by keys (since they might appear in separate chunks). Hence the approach was to use an external state store (e.g. redis) for aggregation, by performing row-wise writes for each row's key into the state store for counting, which meant the use of iterrows(). Unless there are other ways to aggregate huge dataframes by vectorization?chaooder– chaooder2019-08-31 16:02:54 +00:00Commented Aug 31, 2019 at 16:02
3 Answers
Here's a webpage I found that might help: http://gouthamanbalaraman.com/blog/distributed-processing-pandas.html
And here's the code for multiprocessing found in that page:
import pandas as pd
import multiprocessing as mp
LARGE_FILE = "D:\\my_large_file.txt"
CHUNKSIZE = 100000 # processing 100,000 rows at a time
def process_frame(df):
# process data frame
return len(df)
if __name__ == '__main__':
reader = pd.read_table(LARGE_FILE, chunksize=CHUNKSIZE)
pool = mp.Pool(4) # use 4 processes
funclist = []
for df in reader:
# process each data frame
f = pool.apply_async(process_frame,[df])
funclist.append(f)
result = 0
for f in funclist:
result += f.get(timeout=10) # timeout in 10 seconds
print "There are %d rows of data"%(result)
1 Comment
In 2022, you DO NOT need to implement multiprocessing by yourself. Actually pandarallel provides an one-line solution for the parallel processing in pandas. Just follow the next two step:
First, install it
pip install pandarallel [--upgrade] [--user]
Second, replace your df.apply(func) with df.parallel_apply(func), and you'll see it work as you expect!
from pandarallel import pandarallel
pandarallel.initialize(progress_bar=True) # initialize(36) or initialize(os.cpu_count()-1)
#### old code:
# df.apply(func)
#### new code:
df.parallel_apply(func)
#### and then you can see that CPU cores are called to run your pandas op~
More ops~
| Without parallelization | With parallelization |
|---|---|
| df.apply(func) | df.parallel_apply(func) |
| df.applymap(func) | df.parallel_applymap(func) |
| df.groupby(args).apply(func) | df.groupby(args).parallel_apply(func) |
| df.groupby(args1).col_name.rolling(args2).apply(func) | df.groupby(args1).col_name.rolling(args2).parallel_apply(func) |
| df.groupby(args1).col_name.expanding(args2).apply(func) | df.groupby(args1).col_name.expanding(args2).parallel_apply(func) |
| series.map(func) | series.parallel_map(func) |
| series.apply(func) | series.parallel_apply(func) |
| series.rolling(args).apply(func) | series.rolling(args).parallel_apply(func) |
Enjoy your dinner :D
1 Comment
This code shows how you might break up a large dataframe into smaller dataframes each with a number of rows equal to N_ROWS (except possibly for the last dataframe) and then pass each dataframe to a process pool (of whatever size you want, but there is no point in using anything larger than the number of processors you have). Each worker process returns the modified dataframe back to the main process which then reassembles the final result dataframe by concatenating the return values from the worker processes:
import pandas as pd
import multiprocessing as mp
def process_frame(df):
# process data frame
# create new index starting at 0:
df.reset_index(inplace=True, drop=True)
# increment everybody's age:
for i in range(len(df.index)):
df.at[i, 'Age'] += 1
return df
def divide_and_conquer(df):
N_ROWS = 2 # number of rows in each dataframe
with mp.Pool(3) as pool: # use 3 processes
# break up dataframe into smaller daraframes of N_ROWS rows each
cnt = len(df.index)
n, remainder = divmod(cnt, N_ROWS)
results = []
start_index = 0
for i in range(n):
results.append(pool.apply_async(process_frame, args=(df.loc[start_index:start_index+N_ROWS-1, :],)))
start_index += N_ROWS
if remainder:
results.append(pool.apply_async(process_frame, args=(df.loc[start_index:start_index+remainder-1, :],)))
new_dfs = [result.get() for result in results]
# reassemble final dataframe:
df = pd.concat(new_dfs, ignore_index=True)
return df
if __name__ == '__main__':
df = pd.DataFrame({
"Name": ['Tom', 'Dick', 'Harry', 'Jane', 'June', 'Sally', 'Mary'],
"Age": [10, 20, 30, 40, 40, 60, 70],
"Sex": ['M', 'M', 'M', 'F', 'F', 'F', 'F']
})
print(df)
df = divide_and_conquer(df)
print(df)
Prints:
Name Age Sex
0 Tom 10 M
1 Dick 20 M
2 Harry 30 M
3 Jane 40 F
4 June 40 F
5 Sally 60 F
6 Mary 70 F
Name Age Sex
0 Tom 11 M
1 Dick 21 M
2 Harry 31 M
3 Jane 41 F
4 June 41 F
5 Sally 61 F
6 Mary 71 F
2 Comments
process_frame to make a copy of the passed dataframe with df2 = df.copy(deep=True) and then subtract 1 instead of adding 1 using df2. Then the function would return a tuple (df, df2)` and the main process would build up two aggregate dataframe instances.