1

I am trying to improve the performance of my code and can't figure out how to implement multiprocessing module in it.

I am using linux (CentOS 7.2) and python 2.7

The code that I need to run in a parallel environment:

def start_fetching(directory):
    with open("test.txt", "a") as myfile:
        try:
            for dirpath, dirnames, filenames in os.walk(directory):
                for current_file in filenames:
                    current_file = dirpath + "/" + current_file
                    myfile.write(current_file)
            return 0
        except:
            return sys.exc_info()[0]

if __name__ == "__main__":
    cwd = "/home/"
    final_status = start_fetching(cwd)
    exit(final_status)

I need to save the meta-data of all the files (here, only filename is shown) in a database. Here I am only storing the file name in a text file.

12
  • You just create a new Thread for each time you do something inside the second for loop. Same as usual. docs.python.org/2/library/threading.html#thread-objects Commented Apr 26, 2017 at 5:14
  • this might help! Commented Apr 26, 2017 at 5:14
  • 2
    Trying to append to a file from multiple threads simultaneously is not generally a good idea. Commented Apr 26, 2017 at 5:16
  • @AndreyShipilov Thanks, I'll try that and see if it helps. Commented Apr 26, 2017 at 5:24
  • 1
    You can try to find it out, since it depends on your system's configuration. But based on your description you're writing to a database. Here the database is almost always the bottleneck (unless it's in-memory). You can try parallelize the writing but that doesn't mean the database can scale up and handle the contention. Running it on a "HPC cluster" doesn't necessarily mean parallelism should be always faster. Commented Apr 26, 2017 at 5:44

2 Answers 2

1

I guess you want have parallelize tasks which are big. whatever you have give is just the filename into a file. I h have created a separate file for every thread output and later you can combine all these files as well. There are other-ways to achieve this.

If the main problem is parallelizing, below could be a solution.

Python supports multi-threading and multi-processing. Multi-threading is not truly parallel processing, in case of IO blocks we can have parallel executions. If you want code in parallel, use multi-processing[https://docs.python.org/2/library/multiprocessing.html]. You code may look something like below.

from multiprocessing import Process

def task(filename):
    with open(filename+"test.txt", "a") as myfile:
         myfile.write(filename)

def start_fetching(directory):
    try:
        processes = []
        for dirpath, dirnames, filenames in os.walk(directory):
            for current_file in filenames:
                current_file = dirpath + "/" + current_file
                # Create Seperate process and do what you want, becausee Multi-threading wont help in parallezing
                p = Process(target=f, args=(current_file,))
                p.start()
                processes.append(p)

        # Let all the child processes finish and do some post processing if needed.
        for process in processes:
            process.join()

        return 0
    except:
        return sys.exc_info()[0] 

if __name__ == "__main__":
    cwd = "/home/"
    final_status = start_fetching(cwd)
    exit(final_status)
Sign up to request clarification or add additional context in comments.

4 Comments

Can you please elaborate?
you start a process then instantly wait it to finish, nothing is running parallelized here.
that looks much better
Thank you @ArunKumar, Please check the answer I just posted.
0

Thank you all for helping me reduce this script's processing time to almost half. (I am adding this as an answer as I can't add this much content in a comment)

I found two ways to achieve what I wished for:

  1. Using this link mentioned by @KeerthanaPrabhakaran, Which is concerned with the multi-threading.

    def worker(filename):
        subprocess_out = subprocess.Popen(["stat", "-c",
                                   "INSERT INTO file VALUES (NULL, \"%n\", '%F', %s, %u, %g, datetime(%X, 'unixepoch', 'localtime'), datetime(%Y, 'unixepoch', 'localtime'), datetime(%Z, 'unixepoch', 'localtime'));", filename], stdout=subprocess.PIPE)
        return subprocess_out.communicate()[0]
    
    def start_fetching(directory, threads):
        filename = fetch_filename() + ".txt"
        with contextlib.closing(multiprocessing.Pool(threads)) as pool:   # pool of threads processes
            with open(filename, "a") as myfile:
                walk = os.walk(directory)
                fn_gen = itertools.chain.from_iterable((os.path.join(root, file) for file in files) for root, dirs, files in walk)
    
                results_of_work = pool.map(worker, fn_gen)  # this does the parallel processing
                print "Concatenating the result into the text file"
                for result in results_of_work:
                    myfile.write(str(result))
        return filename
    

    This is traversing 15203 files in 0m15.154s.

  2. The Second one, That @ArunKumar mentioned, was related to multiprocessing:

    def task(filename, process_no, return_dict):
        subprocess_out = subprocess.Popen(["stat", "-c",
                                   "INSERT INTO file VALUES (NULL, \"%n\", '%F', %s, %u, %g, datetime(%X, 'unixepoch', 'localtime'), datetime(%Y, 'unixepoch', 'localtime'), datetime(%Z, 'unixepoch', 'localtime'));",
                                   filename], stdout=subprocess.PIPE)
        return_dict[process_no] = subprocess_out.communicate()[0]
    
    
    def start_fetching_1(directory):
        try:
            processes = []
            i = 0
            manager = multiprocessing.Manager()
            return_dict = manager.dict()
    
            for dirpath, dirnames, filenames in os.walk(directory):
                for current_file in filenames:
                    current_file = dirpath + "/" + current_file
                    # Create Seperate process and do what you want, becausee Multi-threading wont help in parallezing
                    p = multiprocessing.Process(target=task, args=(current_file, i, return_dict))
                    i += 1
                    p.start()
                    processes.append(p)
    
            # Let all the child processes finish and do some post processing if needed.
            for process in processes:
                process.join()
    
            with open("test.txt", "a") as myfile:
                myfile.write(return_dict.values())
    
            return 0
        except:
            return sys.exc_info()[0]
    

    This is traversing 15203 files in 1m12.197s

I don't understand why multiprocessing is taking that much time (my initial code was taking 0m27.884s only), but utilizing almost 100% CPU.

The above Codes are exact codes that I am running, (I am storing these info in a file and than use these test.txt file to create database entries)

I am trying to optimize the above code further, but can't think of a better way, as @CongMa mentioned, it might have finally come to the I/O bottleneck.

3 Comments

To summarize - for traversing 15203 files: sequential impl took 0m27.884s, multi-threading took 0m15.154s and multi-process took 1m12.197s. It is logical multi-threading performed better than sequential coz the task has lot of I/O. And the multi-process didn't perform well coz of the no.of processes created (15203 files). Since, we created 1 process for every file and assigned a separate task. Creation of process is really costly and 15K process is really huge and CPU will be chocked to schedule among these processes. While it is not the same with creation of threads.
That's what I was thinking. And I am planning to run this script on a much much larger number of files (x10^4 times). Do you think it will be stable at that level?
Multi-process should not be the approach for such big scale of parallelism. It will benefit of the number is <= no.of actual physical cores. Not sure multi-thread will also have benefits because of resource contention. Anyway its good to experiment the multi-thread and sequential thing.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.