Python usage notes/Multiprocessing notes

From Helpful
Jump to: navigation, search
Syntaxish: syntax and language · importing, modules, packages · iterable stuff · concurrency

IO: networking and web · filesystem

Data: Numpy, scipy · pandas · struct, buffer, array, bytes, memoryview · Python database notes

Image, Visualization: PIL · Matplotlib, pylab · seaborn · bokeh · plotly

Processes: threading · subprocess · multiprocessing · joblib · pty and pexpect

Stringy: strings, unicode, encodings · regexp · command line argument parsing · XML

date and time



  • lets you fire off (fork()ed where supported) python functions in distinct processes
  • nice to parallelize things that do nontrivial CPU-work at a time, and don't have to communicate very much
  • Py≥2.6

When you have a fixed number of distinct-purpose workers, you will appreciate that

the API is similar to the
module and
it comes with helpers for
message passing
object sharing
locking (e.g. to control printout on stdout)
process pools
...if your wish is "throw X non-communicating jobs at Y processes/CPUs", look at the pool stuff.

Also has some requirements, like picklability, and closing workers nicely. Read its the "Programming guidelines" notes to keep things smooth (and portable).

Basic use

One way to use it is an almost direct analogue to the threading module.

from multiprocessing import Process 
def f(args):
    print args
p = Process(target=f, args=('bob',))
# do other stuff. 
# Eventually:

Communication options

This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

See also:


This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

The basic use is to throw an iterable at a pool object:

import multiprocessing
def myfunc(x):
    return x*x
mypool = multiprocessing.Pool(5) # pool size
print, [1,2,3]) 
# map() is one choice, you probably want to read up on the alternatives


  • pools are reusable, so when you are done with them you should close() them
close is the clean way: it lets outstanding work finish, then removes the processes
if your code wants to wait until things are done, do a close(), then a join()
terminate() is the hard-handed: it kills current workers
also what the garbage collector does to pool objects - often not exactly what you want
  • A worker process will be reused for tasks
You can limit the amount of tasks before it is replaced with a new worker
  • the chunksize argument controls how many jobs go to individual workers at a time (see also map/imap difference)
  • on the choice of map:
if you can determine all jobs before starting, a single map is probably easiest
apply hands a single job, map does multiple
map_async does not block the calling process - but only gives the results in a single chunk once it's done
imap variants gives you results as they finish
for more details, see below

map() and its variants

If you have independent jobs, want to keep CPUs busy, and deal with results as they finish, you're probably looking for imap_unordered().

If you don't care about as-you-go, and/or handling results in order is simpler, read on.

The options here are:

  •, iterable[, chunksize])
map() call blocks until all results are done
order of returned values same as original
  • pool.map_async(func, iterable[, chunksize[, callback]])
map_async() does not block, instead returns AsyncResult it returns to be polled
that object will only return any data when everything is completely done
(same if you use callback - only called when completely done)

  • pool.imap(func, iterable[, chunksize])
yields results as they come in - but only in order
so when later jobs come back somewhat out of order, finished items won't yield until all earlier ones are done
takes items from the source iterable to the worker chunksize at a time (default 1)
analogous to itertools.imap[1]
  • pool.imap_unordered(func, iterable[, chunksize])
yields results as they come in
no ordering guarantees
...note that if you use chunksize >1, it returns chunks as they finish

  • pool.apply(func[, args[, kwds]])
really just a single call, func(*args, **kwargs) - in one of the pool's workers
seems to just be a helper for the rest(verify)
  • pool.apply_async(func[, args[, kwds[, callback]]])
single apply, but doesn't block until it is finished
instead returns an AsyncResult object you can poll

Counting finished jobs

The easiest way is to use imap_unordered (or imap), as they yield results while they finish.


import random, time, multiprocessing
def func(arg):
    time.sleep( random.uniform(0,2) )
    return '[bla %s]'%arg
p     = multiprocessing.Pool(4)
count = 0
start = time.time()
for res in p.imap_unordered(func, range(20), chunksize=2):
   print "(after %3.1fsec)  returnval:%s  count:%s"%(time.time()-start, res, count)
   count += 1

Adding jobs while it works

This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

That is, use the same pool object for jobs you did not initially know about.

...and avoid the zombie problem.

There are a few options:

generator to _imap()

When you don't mind the call blocking, then it is probably convenient to:

  • hand in a generator
  • an imap variant

The pool keeps doing stuff as long as the generator yields things. When it stops yielding (that is, not when it blocks, but when it raises StopIteration), and the imap function finishes jobs for everything it had yielded, it will return.


import random, time, multiprocessing
def func(arg):
    #print '%s says %s'%(multiprocessing.current_process(), arg)
    return arg
def generate_jobs():
    while True:
        rval  = random.uniform(0, 0.1)
        time.sleep( rval )
        yield rval
        if rval<0.01: # randomly stop after a while
           raise StopIteration
p     = multiprocessing.Pool(10)
count = 0
start = time.time()
for i in range(10):
    print "chunk %d"%i
    for res in p.imap_unordered(func, generate_jobs()):
        print "[%5.2f] res:%s  count:%s"%(time.time()-start, res, count)
        count += 1


  • consider using imap's chunksize for lover overhead


If you don't want blocking, then you can hand chunks into map_async.

It's a bit more more bookkeeping for you, because this means you have multiple AsyncResult objects to deal with.

import random, time, multiprocessing
def func(arg):
    print '%s says %s'%(multiprocessing.current_process(), arg)
    time.sleep( random.uniform(0, 0.5) )
    return arg
p     = multiprocessing.Pool(5)
result1 = p.map_async( func, range(100,199) )
result2 = p.map_async( func, range(200,299) )
result3 = p.map_async( func, range(300,399) )
print "Waiting for results"
print result1.get()
print result2.get()
print result3.get()


  • seems to batch them in order
e.g. here parts from the second range won't get queued until the first is at ~195
  • doesn't do them in order
  • does return them in order
  • you can
    the results from a result object once it is
    , so in the above you could insert something like:
while True:
   alldone = True
   alldone &= result1.ready()
   alldone &= result2.ready()
   alldone &= result3.ready()
   print "Parts done: ",result1.ready(),result2.ready(),result3.ready()
   if alldone:

Be careful of


This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

An exception in a Process will

  • make its exitcode will be 1.
  • not print a stack trace itself (verify)
  • be handed to the main function, meaning the source of the exception becomes the map/apply, which is uninformative for debugging

You probably want to put a try-except that wraps everything in your function, and then e.g.

  • print the error while still in that process (e.g. traceback.print_exc() for a stacktrace), and/or still raise it.
  • send a more detailed error description to the parent process, e.g. via a general messaging Queue or Pipe
  • you may wish to send along its original context if you want a useful stack trace - read

With pools things are a little different. The Process it appeared in will stop, be replaced with another, and this individual job will not be restarted.

The state of this is a little finicky, read

The handiest thing is often the same wrapping try-except. You can ignore, send to the master process, etc.

Pools and KeyboardInterrupt is another detail.

It only arrives in the parent if it was busy doing a bit of its management(verify). More typically it arrives at a worker (when jobs are longish-running, anyway), you just kill that worker, which will be replaced as mentioned above.

You would want a KeyboardInterrupt in a child to trigger a pool.terminate() in the parent.

One approach is to make sure the KeyboardInterrupt makes it to the parent, because that makes one try-except handler get all cases. It seems just returning from the child will. This will more or less rely on an _async variant.

Abrupt kills messing with communication

Process.terminate, os.kill, and such can break the communication channels a bit, so you may wish to avoid them when they are not necessary.

It's there in the documents, just worth mentioning.

Pool zombies

If you have a script that creates pools, then forgets about them, it leaves zombies behind (won't show as Z state(verify), but same concept). (turns out thousands of zombies will crash the system -- or the process e.g. if it runs out of file handles for shm stuff, hits the max processes for a user set via ulimit, etc.)

You won't notice this for single-run scripts, as they get implicit cleanup when they exit.

You also don't notice this when you actively reuse the same pool object.

It can become a problem e.g. when, on a loop, you create a new pool for each set of new jobs and then don't clean it up.


do a pool.close() once you have no more jobs for the pool
then eventually do a pool.join() to get the cleanup


  Prevents any more tasks from being submitted to the pool.
  Once all the tasks have been completed the worker processes will exit.

  Stops the worker processes immediately without completing outstanding work.
  When the pool object is garbage collected terminate() will be called immediately.
  Wait for the worker processes to exit
  One must call close() or terminate() before using join().

Another way is to abuse the fact that
has the side effect of join()ing its children to check them (so would individual
calls but that's work)
. So you could do:
while True:
    act = multiprocessing.active_children()
    if len(act)==0:
        print "Waiting for %d workers to finish"%len(act)


Processes are reused.

If that eventually leads to memory leak problems, or other leaks, you may wish to look at maxtasksperchild (argument to the Pool constructor)


Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

The worker function must be a global function.

A local function isn't pickleable.

Error sending result: '[<generator object procfunc at 0x7fa71f847eb0>]'. Reason: 'TypeError("can't pickle generator objects",)'

You're trying to yield, which means returning a generator, which means picking it, and you can't pickle a generator[2].

If just returning won't do, you probably want multiprocessing.Queue. These are thread- and process-safe. Just pass them in and use them.

OSError: [Errno 11] Resource temporarily unavailable

Probably means you're hitting something set by ulimit, e.g. process limit for user (ulimit -u)

Can relate to the pool zombie problem

Basically any exception

If the stack trace mentions

raise self._value

Rather than

return self._value

...that's usually the results object passing through an exception(verify), meaning you have a semi-useful exception.

But you don't get the relevant stack trace where it's from, because the worker effectively swallows it.

To do debugging and see the real stack trace, either have your job function do the exception handling (and printing), or run that function it outside of a pool.

See also