2

I am trying to write certain files after editing using multiprocessing code python (2.7). It works like a charm for small number(<20). but when I try for more files (20+), it goes berserk. I am using Python 2.7.5 on CentOS 6.5 with 4 Core Processor.

import sys, os
import multiprocessing

import glob
list_files = glob.glob("Protein/*.txt")

def Some_func(some_file):
    with open(some_file) as some:
        with open(file_output) as output:
            for lines in Some:
                #Do Something
                #edited_lines = func(lines)
                output.write(edited_lines)


pool = multiprocessing.Pool(10) # Desired number of threads = 10
pool.map(Some_func, list_files,)
pool.close()
pool.join()

The final written files overlap each other.

File 1
Lines 1 .. File 1
Lines 2 .. File 1
Lines 3 .. File 1
Lines 4 .. File 1
Lines 5 .. File 1
Lines 6 .. File 1
Lines 7 .. File 1
Lines 8 .. File 1
Lines 9 .. File 1

File 2
Lines 1 .. File 2
Lines 2 .. File 2
Lines 3 .. File 2
Lines 4 .. File 2
Lines 5 .. File 2
Lines 6 .. File 2
Lines 7 .. File 2
Lines 8 .. File 2
Lines 9 .. File 2



Output:

Lines 1 .. File 1
Lines 2 .. File 1
Lines 3 .. File 1 Lines 1 .. File 2
Lines 4 .. File 1
Lines 5 .. File 1Lines 2 .. File 2
Lines 3 .. File 2
Lines 4 .. File 2
Lines 6 .. File 1
0

1 Answer 1

1

The problem is that you're trying to write to the file from many processes in parallel, which isn't synchronized. That means its possible that different processes will try to write at the same time, leading to the oddities you're seeing.

You can solve this by either having a single writer process, with each worker sending the lines to write to that single process, or by synchronizing the writes done by each process using a multiprocessing.Lock.

Using a single writer:

import glob
import multiprocessing
from functools import partial
from threading import Thread

list_files = glob.glob("Protein/*.txt")

def Some_func(out_q, some_file):
    with open(some_file) as some:
        for lines in Some:
            #Do Something
            #edited_lines = func(lines)

            out_q.put(edited_lines)

def write_lines(q):
   with open(file_output) as output:
       for line in iter(q.get, None): # This will end when None is received
           output.write(line)

pool = multiprocessing.Pool(10) # Desired number of threads = 10
m = multiprocessing.Manager()
q = m.Queue()
t = Thread(target=write_lines, args=(q,))
t.start()
pool.map(partial(Some_func, q), list_files)
pool.close()
pool.join()
q.put(None)  # Shut down the writer thread
t.join()

Using a multiprocessing.Lock:

import glob
import multiprocessing
from functools import partial

list_files = glob.glob("Protein/*.txt")

def Some_func(lock, some_file):
    with open(some_file) as some:
        with open(file_output) as output:
            for lines in Some:
                #Do Something
                #edited_lines = func(lines)
                with lock:
                    output.write(edited_lines)


pool = multiprocessing.Pool(10) # Desired number of threads = 10
m = multiprocessing.Manager()
lock = m.Lock()
pool.map(partial(Some_func, lock), list_files)
pool.close()
pool.join()

We need to use a Manager to create the shared objects because you're passing them to a Pool, which requires pickling them. Normal multiprocessing.Lock/multiprocessing.Queue objects can only be passed to a multiprocessing.Process constructor, and will cause an exception when passed to a Pool method like map.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.