14

What would be the equivalent of sort_values in pandas for a dask DataFrame ? I am trying to scale some Pandas code which has memory issues to use a dask DataFrame instead.

Would the equivalent be :

ddf.set_index([col1, col2], sorted=True)

?

3 Answers 3

12

Sorting in parallel is hard. You have two options in Dask.dataframe

set_index

As now, you can call set_index with a single column index:

In [1]: import pandas as pd

In [2]: import dask.dataframe as dd

In [3]: df = pd.DataFrame({'x': [3, 2, 1], 'y': ['a', 'b', 'c']})

In [4]: ddf = dd.from_pandas(df, npartitions=2)

In [5]: ddf.set_index('x').compute()
Out[5]: 
   y
x   
1  c
2  b
3  a

Unfortunately dask.dataframe does not (as of November 2016) support multi-column indexes

In [6]: ddf.set_index(['x', 'y']).compute()
NotImplementedError: Dask dataframe does not yet support multi-indexes.
You tried to index with this index: ['x', 'y']
Indexes must be single columns only.

nlargest

Given how you phrased your question I suspect that this doesn't apply to you, but often cases that use sorting can get by with the much cheaper solution nlargest.

In [7]: ddf.x.nlargest(2).compute()
Out[7]: 
0    3
1    2
Name: x, dtype: int64

In [8]: ddf.nlargest(2, 'x').compute()
Out[8]: 
   x  y
0  3  a
1  2  b
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks Matthew. If I knew the number of rows of the dataframe in advance, then nlargest would work right ?
nlargest returns a dask.dataframe of a single partition, so it's generally not a good way to sort things.
So, for applying sorting operation on all the partitions and getting an ultimate result. Which way will be a good one?
5

My preferred method is to first set_index using a single column in dask and then distribute Pandas' sort_values using map_partitions

# Prepare data
import dask
import dask.dataframe as dd
data = dask.datasets.timeseries()

# Sort by 'name' and 'id'
data = data.set_index('name')
data = data.map_partitions(lambda df: df.sort_values(['name', 'id']))

One possible gotcha would that a single index value must not be in multiple partitions. From what I saw in practice though, Dask does not seem to allow that to happen. Would be good to have a more well-founded opinion on that, though.

edit: I have asked about this in Dask dataframe: Can a single index be in multiple partitions?

Comments

1

You would use this code to add a new composite column and set index to it:

newcol = ddf.col1 + "|" + ddf.col2
ddf = ddf.assign(ind=newcol)
ddf = ddf.set_index('ind', sorted=True)

If the dataframe is already sorted by (col1, col2) then it is already sorted by newcol too, so you can use sorted=True.

2 Comments

I don't believe this works — sorted=True means that you're promising Dask the index is already sorted, not that you're requesting Dask sort it. See github.com/dask/dask/issues/2388
I have fixed the explanation, it was not clear. Thank you.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.