4

I have this code which generates autoregressive terms within each unique combination of variables 'grouping A' and 'grouping B'.

for i in range(1, 5):
    df.loc[:,'var_' + str(i)] = df.sort_values(by='date']) \
                                    .groupby(['grouping A', 'grouping B']) \
                                    ['target'].sum().shift(i).ffill().bfill().values

Is it possible to sort values, group, shift, and then assign to a new variable without computing in Dask?

2
  • I'm gussing that df is small and fits in memory and that your main goal of Dask is to speed things up by parallelizing over the for loop. Is this guess correct? Commented Mar 15, 2017 at 19:26
  • It is indeed small (150 million rows) and fits in memory - but I'm trying to build the script to utilize much, much larger data frames. Commented Mar 15, 2017 at 19:28

1 Answer 1

4

Dask.delayed

So if you want to just parallelize the for loop you might do the following with dask.delayed

ddf = dask.delayed(df)
results = []

for i in range(1, 5):
    result = ddf.sort_values(by='date']) \
                .groupby(['grouping A', 'grouping B']) \
                ['target'].sum().shift(i).ffill().bfill().values
    results.append(result)

results = dask.compute(results)

for i, result in results:
    df[...] = result  # mutate dataframe as you like

That is we wrap the dataframe in dask.delayed. Any method call on it will be lazy. We collect up all of these lazy method calls and then call them together with dask.compute. We don't want to mutate the dataframe during this period (that would be weird) so we do it afterwards.

Large dataframe

If you want to do this with a large dataframe then you would probably want to use dask.dataframe instead. This will be less straightforward, but will hopefully work decently well. You should really look out for the sort_values operation. Distributed sorting is a very hard problem and very expensive. You want to minimize this if possible.

import dask.dataframe as dd
df = load distributed dataframe with `dd.read_csv`, `dd.read_parquet`, etc.

df = df.set_index('date').persist()

results = []
for i in range(1, 5):
    results = ddf.groupby(['grouping A', 'grouping B']) \
                ['target'].sum().shift(i).ffill().bfill()

ddf2 = dd.concat([ddf] + results, axis=1)

Here we use set_index rather than sort_values and we make sure to do it exactly once (it's likely to take 10-100x longer than any other operation here). We then use normal groupby etc.. syntax and things should be fine (although I have to admit I haven't verified that ffill and bfill are definitely implement. I assume so though. As before we don't want to mutate our data during computation (this is weird) so we do a concat afterwards.

Maybe simpler

Probably you'll get a greatly reduced dataframe after the groupby-sum. Use Dask.dataframe for this and then ditch Dask and head back to the comfort of Pandas

ddf = load distributed dataframe with `dd.read_csv`, `dd.read_parquet`, etc.
pdf = ddf.groupby(['grouping A', 'grouping B']).target.sum().compute()
... do whatever you want with a much smaller pandas dataframe ...
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.