2

I am running processes in parallel but need to create a database for each cpu process to write to. I only want as many databases as cpu's assigned on each server, so the 100 jobs written to 3 databases that can be merged after.

Is there worker id number or core id that I can identify each worker as?

def workerProcess(job):
  if workerDBexist(r'c:\temp\db\' + workerid):
    ##processjob into this database
  else:
    makeDB(r'c:\temp\db\' + workerid)
    ##first time this 'worker/ core' used, make DB then process

import pp
ppservers = ()
ncpus = 3
job_server = pp.Server(ncpus, ppservers=ppservers)

for work in 100WorkItems:
  job_server.submit(workerProcess, (work,))
6
  • Is there a reason you're using pp instead of the stdlib modules multithreading or concurrent.futures? Because this would be a lot simpler with the stdlib, and you aren't making any obvious use of any of the extra features in pp. Commented Dec 18, 2013 at 22:55
  • I'm just familiar with pp having used it before, I'll look into the other options Commented Dec 18, 2013 at 23:04
  • actually, I did want the option of clustering Commented Dec 18, 2013 at 23:08
  • Well, multiprocessing has its own distributed processing solution, Manager, so you still have that option. It is different from the way pp does it, but if you're not getting pp to do what you want even in the simplest cases, I'm not sure how much your familiarity with pp is worth in the long run. Commented Dec 19, 2013 at 1:00
  • Meanwhile, pp isn't a dead project, but it's become a lot less active since multiprocessing entered the stdlib a few years ago, so I'm not sure how much I'd count on it into the future. For example, Py 3.x support has been "coming soon" for a few years at this point, and I don't know of any actual progress. Commented Dec 19, 2013 at 1:03

1 Answer 1

2

As far as I know, pp doesn't have any such feature in its API.

If you used the stdlib modules instead, that would make your life a lot easier—e.g., multiprocessing.Pool takes an initializer argument, which you could use to initialize a database for each process, which would then be available as a variable that each task could use.

However, there is a relatively easy workaround.

Each process has a unique (at least while it's running) process ID.* In Python, you can access the process ID of the current process with os.getpid(). So, in each task, you can do something like this:

dbname = 'database{}'.format(os.getpid())

Then use dbname to open/create the database. I don't know whether by "database" you mean a dbm file, a sqlite3 file, a database on a MySQL server, or what. You may need to, e.g., create a tempfile.TemporaryDirectory in the parent, pass it to all of the children, and have them os.path.join it to the dbname (so after all the children are done, you can grab everything in os.listdir(the_temp_dir)).


The problem with this is that if pp.Server restarts one of the processes, you'll end up with 4 databases instead of 3. Probably not a huge deal, but your code should deal with that possibility. (IIRC, pp.Server usually doesn't restart the processes unless you pass restart=True, but it may do so if, e.g., one of them crashes.)

But what if (as seems to be the case) you're actually running each task in a brand-new process, rather than using a pool of 3 processes? Well, then you're going to end up with as many databases as there are processes, which probably isn't what you want. Your real problem here is that you're not using a pool of 3 processes, which is what you ought to fix. But are there other ways you could get what you want? Maybe.

For example, let's say you created three locks, one for each database, maybe as lockfiles. Then, each task could do this pseudocode:

for i, lockfile in enumerate(lockfiles):
    try:
        with lockfile:
            do stuff with databases[i]
            break
    except AlreadyLockedError:
        pass
else:
    assert False, "oops, couldn't get any of the locks"

If you can actually lock the databases themselves (with an flock, or with some API for the relevant database, etc.) things are even easier: just try to connect to them in turn until one of them succeeds.

As long as your code isn't actually segfaulting or the like,** if you're actually never running more than 3 tasks at a time, there's no way all 3 lockfiles could be locked, so you're guaranteed to get one.


* This isn't quite true, but it's true enough for your purposes. For example, on Windows, each process has a unique HANDLE, and if you ask for its pid one will be generated if it didn't already have one. And on some *nixes, each thread has a unique thread ID, and the process's pid is the thread ID of the first thread. And so on. But as far as your code can tell, each of your processes has a unique pid, which is what matters.

** Even if your code is crashing, you can deal with that, it's just more complicated. For example, use pidfiles instead of empty lockfiles. Get a read lock on the pidfile, then try to upgrade to a write lock. If it fails, read the pid from the file, and check whether any such process exists (e.g., on *nix, if os.kill(pid, 0) raises, there is no such process), and if so forcibly break the lock. Either way, now you've got a write lock, so write your pid to the file.

Sign up to request clarification or add additional context in comments.

4 Comments

I tried process ID, but each job gets a different pid, whereas I only want as many db's as there are works (not jobs). I then tried to look back at a specific core from the pid but couldn't find a way to do this either. The db is a geographic one, I can't reliable check if it's locked which is why I want to run separate databses.
@Steven: If each job gets a different PID, then each job is running in a different process, rather than them all running on a pool of three long-lived processes. So there is absolutely no way to do what you want, unless you change that. I'll update the answer with some alternatives that might work, but really, if your question assumes pooling, you have to get your tasks pooled.
Thanks for your thoughts, definitely helped my thinking. I think I could get where I want in pp by creating my own lock files in a common spot, then staggering the initial job distribution to ensure the locks are created.
@Steven: You shouldn't need to stagger the job distribution. Just create the lock files before kicking off the tasks, then let them all fight over locking the files. It's guaranteed that exactly one of them will succeed for each lock, and the time spent trying and failing to lock a file should be inconsequential compared to the time spent for your tasks and IPC (and for process startup, if you really are spawning a new process for each task).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.