7

So dask.dataframe.map_partitions() takes a func argument and the meta kwarg. How exactly does it decide its return type? As an example:

Lots of csv's in ...\some_folder.

ddf = dd.read_csv(r"...\some_folder\*", usecols=['ColA', 'ColB'], 
                                        blocksize=None, 
                                        dtype={'ColA': np.float32, 'ColB': np.float32})
example_func = lambda x: x.iloc[-1] / len(x)
metaResult = pd.Series({'ColA': .1234, 'ColB': .1234})
result = ddf.map_partitions(example_func, meta=metaResult).compute()

I'm pretty new to "distributed" computing, but I would intuitively expect this to return a collection (a list or dict, most likely) of Series objects, yet the result is a Series object that could be considered a concatenation of the results of example_func on each partition. This in and of itself would also suffice, if this series had a MultiIndex to indicate the partition label.

From what I can tell from this question, the docs, and the source code itself, this is because ddf.divisions will return a (None, None, ..., None) as a result of reading csv's? Is there a dask-native way to do this, or do I need to manually go in and break the returned Series (a concatenation of the Series that were returned by example_func on each partition) myself?

Also, feel free to correct my assumptions/practices here, as I'm new to dask.

1 Answer 1

6

So dask.dataframe.map_partitions() takes a func argument and the meta kwarg. How exactly does it decide its return type?

map_partition tries to concatenate the results returned by func to either a dask DataFrame or a dask Series object in an 'intelligent' way. This decision is based on the return value of func:

  • If func returns a scalar, map_partitions returns a dask Series object.
  • If func returns a pd.Series object, map_partition returns a dask Series object, in which all pd.Series objects returned by func are concatenated.
  • If func returns a pd.DataFrame, map_partitions returns a dask Dataframe object, in which these pd.DataFrame obejcts are concatenated along the first axis.

If you are interested in the result of a special partition, you could use get_partition(). If the partition label is in general an important information for you, I would consider to assign a separate column of your ddf directly after reading in the data from csv, which contains all the information you need. Afterwards, you could construct func in a way, that it returns a pd.DataFrame containing the result of your calculation in one column and the information you need to identify the result in another.

Sign up to request clarification or add additional context in comments.

2 Comments

An indicator column...duh! Accepted.
Ok, but how do you tell it to use a pandas.Series? It won't take the class. What is the symbol? It isn't in the docs.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.