4

I'm using the Pool function of the multiprocessing module in order to run the same code in parallel on different data.

It turns out that on some data my code raises an exception, but the precise line in which this happens is not given:

Traceback (most recent call last):
  File "my_wrapper_script.py", line 366, in <module>
    main()
  File "my_wrapper_script.py", line 343, in main
    results = pool.map(process_function, folders)
  File "/usr/lib64/python2.6/multiprocessing/pool.py", line 148, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib64/python2.6/multiprocessing/pool.py", line 422, in get
    raise self._value
KeyError: 'some_key'

I am aware of multiprocessing.log_to_stderr() , but it seems that it is useful when concurrency issues arise, which is not my case.

Any ideas?

2 Answers 2

7

If you're using a new enough version of Python, you'll actually see the real exception get printed prior to that one. For example, here's a sample that fails:

import multiprocessing

def inner():
    raise Exception("FAIL")

def f():
    print("HI")
    inner()

p = multiprocessing.Pool()
p.apply(f)
p.close()
p.join()

Here's the exception when running this with python 3.4:

multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "test.py", line 9, in f
    inner()
  File "test.py", line 4, in inner
    raise Exception("FAIL")
Exception: FAIL
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "test.py", line 13, in <module>
    p.apply(f)
  File "/usr/local/lib/python3.4/multiprocessing/pool.py", line 253, in apply
    return self.apply_async(func, args, kwds).get()
  File "/usr/local/lib/python3.4/multiprocessing/pool.py", line 599, in get
    raise self._value
Exception: FAIL

If using a newer version isn't an option, the easiest thing to do is to wrap your worker function in a try/except block that will print the exception prior to re-raising it:

import multiprocessing
import traceback

def inner():
    raise Exception("FAIL")

def f():
    try:
        print("HI")
        inner()
    except Exception:
        print("Exception in worker:")
        traceback.print_exc()
        raise

p = multiprocessing.Pool()
p.apply(f)
p.close()
p.join()

Output:

HI
Exception in worker:
Traceback (most recent call last):
  File "test.py", line 11, in f
    inner()
  File "test.py", line 5, in inner
    raise Exception("FAIL")
Exception: FAIL
Traceback (most recent call last):
  File "test.py", line 18, in <module>
    p.apply(f)
  File "/usr/local/lib/python2.7/multiprocessing/pool.py", line 244, in apply
    return self.apply_async(func, args, kwds).get()
  File "/usr/local/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
Exception: FAIL
Sign up to request clarification or add additional context in comments.

Comments

1

You need to implement your own try/except block in the worker. Depending on how you want to organize your code, you could log to stderr as you mention above, log to some other place like a file, return some sort of error code or even tag the exception with the current traceback and re-raise. Here's an example of the last technique:

import traceback
import multiprocessing as mp

class MyError(Exception):
    pass

def worker():
    try:
        # your real code here
        raise MyError("boom")
    except Exception, e:
        e.traceback = traceback.format_exc()
        raise

def main():
    pool = mp.Pool()
    try:
        print "run worker"
        result = pool.apply_async(worker)
        result.get()
    # handle exceptions you expect
    except MyError, e:
        print e.traceback
    # re-raise the rest
    except Exception, e:
        print e.traceback
        raise


if __name__=="__main__":
    main()

It returns

run worker
Traceback (most recent call last):
  File "doit.py", line 10, in worker
    raise MyError("boom")
MyError: boom

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.