16

Is it somehow possible, using Scala's parallel collections to parallelize an Iterator without evaluating it completely beforehand?

Here I am talking about parallelizing the functional transformations on an Iterator, namely map and flatMap. I think this requires evaluating some elements of the Iterator in advance, and then computing more, once some are consumed via next.

All I could find would require the iterator to be converted to a Iterable or a Stream at best. The Stream then gets completely evaluated when I call .par on it.

I also welcome implementation proposals if this is not readily available. Implementations should support parallel map and flatMap.

2
  • The answer is probably no but can you say a little more about what you want from this? In particular, when should the computation start running--after you create the iterator, or once you call something that forces evaluation? Commented Jun 18, 2013 at 20:35
  • @RexKerr Seems like a design choice; but having it start on first request makes the first request somehow special. I'm currently trying to implement something like this and I choose to start running right away and store the next n results. Once one is consumed, I compute a replacement. Commented Jun 18, 2013 at 20:38

4 Answers 4

6

I realize that this is an old question, but does the ParIterator implementation in the iterata library do what you were looking for?

scala> import com.timgroup.iterata.ParIterator.Implicits._
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId))
scala> it.map(_._2).toSet.size
res2: Int = 8 // addition was distributed over 8 threads
Sign up to request clarification or add additional context in comments.

6 Comments

It adresses the problem. It could be a bit more efficient, though, since you get much blocking, if you have large variation in the runtimes of the operations within one chunk.
How might it be made more efficient @ziggystar?
ParIterator splits the Iterator into chunks. So if you have small chunks (e.g. size 2) and one element takes 1s and the other takes 10s, then you have bad parallelization. A different implementation could feed the workers new elements from the iterator once a worker becomes free.
@ziggystar ParIterator in iterata defers that consideration to the standard library parallel collections. So within a single chunk, is that how scala parallel collections behave?
I'm not sure you got my point. Even if you do the best thing possible within a chunk, chunking creates barriers across which there is no parallelization. This means you cannot get the maximum CPU utilization possible. Another disadvantage is a higher memory requirement. To parallelize the chunks Scala requires them to be forced, which results in the whole chunk being in memory at the same time (assuming the iterator creates the objects). In theory you only have to have the elements in memory that are currently processed. Large chunks -> good par/bad mem and small chunks -> bad par/good mem.
|
4

Your best bet with the standard library is probably not using parallel collections but concurrent.Future.traverse:

import concurrent._
import ExecutionContext.Implicits.global
Future.traverse(Iterator(1,2,3))(i => Future{ i*i })

though I think this will execute the whole thing starting as soon as it can.

Comments

2

From the ML, Traversing iterator elements in parallel:

https://groups.google.com/d/msg/scala-user/q2NVdE6MAGE/KnutOq3iT3IJ

I moved off Future.traverse for a similar reason. For my use case, keeping N jobs working, I wound up with code to throttle feeding the execution context from the job queue.

My first attempt involved blocking the feeder thread, but that risked also blocking tasks which wanted to spawn tasks on the execution context. What do you know, blocking is evil.

4 Comments

Can you comment on why you use (NUM_CPUs + 1)^2 as size for the blocking queue?
Also I found out the hard way that 1. I'm no good at concurrent programming 2. flatMap is more difficult.
@ziggystar By "you" you mean "Juha" on the ML. I don't think it's a magic number: big enough so the consumer doesn't get ahead of the original iterator (which might do i/o, maybe) plus the mapping function (CPU-bound, he says, but long or short running?). I see that the future feeding the queue will block without calling blocking; maybe the +1 is left over from the "desired parallelism". My solution had the end of the pipeline check for more work, i.e., the last thing a job would do is check if enough jobs are in process, and if not, feed the beast. I agree it's hard, simplicity is key.
This seem to work OK, and the API is much easier than Future.traverse. I combine it with iterator.grouped so that it chunks elements together, which I assume reduces overhead.
0

It's a bit hard to follow exactly what you're after, but perhaps it's something like this:

val f = (x: Int) => x + 1
val s = (0 to 9).toStream map f splitAt(6) match { 
  case (left, right) => left.par; right 
}

This will eveluate f on the first 6 elements in parallel and then return a stream over the rest.

1 Comment

This doesn't seem to run in parallel - don't you need to move the map f to after the par ?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.