Python usage notes/Multiprocessing notes

From Helpful
Jump to: navigation, search
Various things have their own pages, see Category:Python. Some of the pages that collect various practical notes include:


tl;dr:

  • lets you fire off (fork()ed where supported) python functions in distinct processes
  • nice to parallelize things that do nontrivial CPU-work at a time, and don't have to communicate very much
  • Py≥2.6


When you have a fixed number of distinct-purpose workers, you will appreciate that

the API is similar to the
threading
module and
it comes with helpers for
message passing
object sharing
locking (e.g. to control printout on stdout)
process pools
...if your wish is "throw X non-communicating jobs at Y processes/CPUs", look at the pool stuff.


Also has some requirements, like picklability, and closing workers nicely. Read its the "Programming guidelines" notes to keep things smooth (and portable).


Basic use

One way to use it is an almost direct analogue to the threading module.

from multiprocessing import Process 
def f(args):
    print args
 
p = Process(target=f, args=('bob',))
p.start()
# do other stuff. 
 
# Eventually:
p.join()


Communication options

This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

See also:



Pools

This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

The basic use is to throw an iterable at a pool object:

import multiprocessing
 
def myfunc(x):
    return x*x
 
mypool = multiprocessing.Pool(5) # pool size
print mypool.map(myfunc, [1,2,3]) 
# map() is one choice, you probably want to read up on the alternatives


Notes:

  • pools are reusable, so when you are done with them you should close() them
close is the clean way: it lets outstanding work finish, then removes the processes
if your code wants to wait until things are done, do a close(), then a join()
terminate() is the hard-handed: it kills current workers
also what the garbage collector does to pool objects - often not exactly what you want
  • A worker process will be reused for tasks
You can limit the amount of tasks before it is replaced with a new worker
  • the chunksize argument controls how many jobs go to individual workers at a time (see also map/imap difference)
  • on the choice of map:
if you can determine all jobs before starting, a single map is probably easiest
apply hands a single job, map does multiple
map_async does not block the calling process - but only gives the results in a single chunk once it's done
imap variants gives you results as they finish
for more details, see below


map() and its variants

If you have independent jobs, want to keep CPUs busy, and deal with results as they finish, you're probably looking for imap_unordered().

If you don't care about as-you-go, and/or handling results in order is simpler, read on.


The options here are:

  • pool.map(func, iterable[, chunksize])
map() call blocks until all results are done
order of returned values same as original
  • pool.map_async(func, iterable[, chunksize[, callback]])
map_async() does not block, instead returns AsyncResult it returns to be polled
that object will only return any data when everything is completely done
(same if you use callback - only called when completely done)


  • pool.imap(func, iterable[, chunksize])
yields results as they come in - but only in order
so when later jobs come back somewhat out of order, finished items won't yield until all earlier ones are done
takes items from the source iterable to the worker chunksize at a time (default 1)
analogous to itertools.imap[1]
  • pool.imap_unordered(func, iterable[, chunksize])
yields results as they come in
no ordering guarantees
...note that if you use chunksize >1, it returns chunks as they finish


  • pool.apply(func[, args[, kwds]])
synchronous
really just a single call, func(*args, **kwargs) - in one of the pool's workers
seems to just be a helper for the rest(verify)
  • pool.apply_async(func[, args[, kwds[, callback]]])
single apply, but doesn't block until it is finished
instead returns an AsyncResult object you can poll



Counting finished jobs

The easiest way is to use imap_unordered (or imap), as they are yield results while they finish.


Example:

import random, time, multiprocessing
 
def func(arg):
    time.sleep( random.uniform(0,2) )
    return 'bla'
 
p     = multiprocessing.Pool(4)
count = 0
start = time.time()
for res in p.imap_unordered(func, range(20), chunksize=2):
   print "[%5.2f] res:%s  count:%s"%(time.time()-start, res, count)
   count += 1


Adding jobs while it works

This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

That is, use the same pool object for jobs you did not initially know about.

...and avoid the zombie problem.


generator to _imap()

When you don't mind the call blocking, then it is probably convenient to:

  • hand in a generator
  • ...to an imap variant

The pool keeps doing stuff as long as the generator yields things. When it stops yielding (that is, not when it blocks, but when it raises StopIteration), and the imap function finishes jobs for everything it had yielded, it will return.


Example:

import random, time, multiprocessing
 
def func(arg):
    #print '%s says %s'%(multiprocessing.current_process(), arg)
    return arg
 
def generate_jobs():
    while True:
        rval  = random.uniform(0, 0.1)
        time.sleep( rval )
        yield rval
        if rval<0.01: # randomly stop after a while
           raise StopIteration
 
p     = multiprocessing.Pool(10)
count = 0
start = time.time()
for i in range(10):
    print "chunk %d"%i
    for res in p.imap_unordered(func, generate_jobs()):
        print "[%5.2f] res:%s  count:%s"%(time.time()-start, res, count)
        count += 1
p.close()
p.join()

Notes:

  • consider using imap's chunksize for lover overhead
map_async

If you don't want blocking, then you can hand chunks into map_async.

It's a bit more more bookkeeping for you, because this means you have multiple AsyncResult objects to deal with.

import random, time, multiprocessing
 
def func(arg):
    print '%s says %s'%(multiprocessing.current_process(), arg)
    time.sleep( random.uniform(0, 0.5) )
    return arg
 
p     = multiprocessing.Pool(5)
 
result1 = p.map_async( func, range(100,199) )
result2 = p.map_async( func, range(200,299) )
result3 = p.map_async( func, range(300,399) )
 
print "Waiting for results"
 
p.close()
p.join()
print result1.get()
print result2.get()
print result3.get()

Notes:

  • seems to batch them in order
e.g. here parts from the second range won't get queued until the first is at ~195
  • doesn't do them in order
  • does return them in order
  • you can
    .get()
    the results from a result object once it is
    ready()
    , so in the above you could insert something like:
while True:
   alldone = True
   alldone &= result1.ready()
   alldone &= result2.ready()
   alldone &= result3.ready()
   print "Parts done: ",result1.ready(),result2.ready(),result3.ready()
   if alldone:
       break
   time.sleep(0.3)

Be careful of

Exceptions

This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

An exception in a Process will

  • make its exitcode will be 1.
  • not print a stack trace itself (verify)
  • be handed to the main function, meaning the source of the exception becomes the map/apply, which is uninformative for debugging


You probably want to put a try-except that wraps everything in your function, and then e.g.

  • print the error while still in that process (e.g. traceback.print_exc() for a stacktrace), and/or still raise it.
  • send a more detailed error description to the parent process, e.g. via a general messaging Queue or Pipe
  • you may wish to send along its original context if you want a useful stack trace - read http://nedbatchelder.com/blog/200711/rethrowing_exceptions_in_python.html


With pools things are a little different. The Process it appeared in will stop, be replaced with another, and this individual job will not be restarted.

The state of this is a little finicky, read http://stackoverflow.com/questions/24894682/python-multiprocessing-crash-in-subprocess

The handiest thing is often the same wrapping try-except. You can ignore, send to the master process, etc.


Pools and KeyboardInterrupt is another detail.

It only arrives in the parent if it was busy doing a bit of its management(verify). More typically it arrives at a worker (when jobs are longish-running, anyway), you just kill that worker, which will be replaced as mentioned above.

You would want a KeyboardInterrupt in a child to trigger a pool.terminate() in the parent.

One approach is to make sure the KeyboardInterrupt makes it to the parent, because that makes one try-except handler get all cases. It seems just returning from the child will. This will more or less rely on an _async variant.


https://azylstra.net/blog/post/exception-handling-with-python-processes

Abrupt kills messing with communication

Process.terminate, os.kill, and such can break the communication channels a bit, so you may wish to avoid them when they are not necessary.

It's there in the documents, just worth mentioning.


Pool zombies

If you have a script that creates pools, then forgets about them, it leaves zombies behind (won't show as Z state(verify), but same concept). (turns out thousands of zombies will crash the system -- or the process e.g. if it runs out of file handles for shm stuff)


You won't notice this for single-run scripts, as they get implicit cleanup when they exit.

You also don't notice this when you actively reuse the same pool object.

It can become a problem e.g. when, on a loop, you create a new pool for each set of new jobs.


tl;dr:

do a pool.close() once you have no more jobs for the pool
then eventually do a pool.join() to get the cleanup

Relevant:

Pool.close()
  Prevents any more tasks from being submitted to the pool.
  Once all the tasks have been completed the worker processes will exit.

Pool.terminate()
  Stops the worker processes immediately without completing outstanding work.
  When the pool object is garbage collected terminate() will be called immediately.
Pool.join()
  Wait for the worker processes to exit
  One must call close() or terminate() before using join().



Another way is to abuse the fact that
multiprocessing.active_children()
has the side effect of join()ing its children to check them (so would individual
Process.is_alive()
calls but that's work)
. So you could do:
while True:
    act = multiprocessing.active_children()
    if len(act)==0:
        print "Waiting for %d workers to finish"%len(act)
    time.sleep(2)

Leaks

Processes are reused.

If that eventually leads to memory leak problems, or other leaks, you may wish to look at maxtasksperchild (argument to the Pool constructor)

Errors

Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

The worker function must be a global function.

A local function isn't pickleable.

Error sending result: '[<generator object procfunc at 0x7fa71f847eb0>]'. Reason: 'TypeError("can't pickle generator objects",)'

You're trying to yield, which means returning a generator, which means picking it, and you can't pickle a generator[2].


If just returning won't do, you probably want multiprocessing.Queue. These are thread- and process-safe. Just pass them in and use them.

Exception via pool.py in get

If the stack trace mentions

raise self._value

Rather than

return self._value

...that's usually the results object passing through an exception(verify), meaning you have a semi-useful exception -- but no relevant stack trace where it's from.

To see the real stack trace, either have your job function do more exception handling, or run it outside of a pool.

See also