Python usage notes/Multiprocessing notes

From Helpful
Jump to: navigation, search
Various things have their own pages, see Category:Python. Some of the pages that collect various practical notes include:


  • lets you fire off (fork()ed where supported) python functions in distinct processes
  • nice to parallelize things that do nontrivial CPU-work at a time, and don't have to communicate very much
  • Py≥2.6

When you have a fixed number of distinct-purpose workers, you will appreciate that

the API is similar to the
module and
it comes with helpers for
message passing
object sharing
locking (e.g. to control printout on stdout)
process pools
...if your wish is "throw X non-communicating jobs at Y processes/CPUs", look at the pool stuff.

Also has some requirements, like picklability, and closing workers nicely. Read its the "Programming guidelines" notes to keep things smooth (and portable).

Basic use

One way to use it is an almost direct analogue to the threading module.

from multiprocessing import Process 
def f(args):
    print args
p = Process(target=f, args=('bob',))
# do other stuff. 
# Eventually:

Communication options

This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

See also:


This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

The basic use is to throw an iterable at a pool object:

import multiprocessing
def myfunc(x):
    return x*x
mypool = multiprocessing.Pool(5) # pool size
print, [1,2,3]) # map is one choice, there are others you may want


  • pools are reusable, so when you are done with them you should close() them
close is the clean way: it lets outstanding work finish, then removes the processes
if your code wants to wait until things are done, do a close(), then a join()
terminate() is the hard-handed: it kills current workers
also what the garbage collector does to pool objects - often not exactly what you want
  • A worker process will be reused for tasks
You can limit the amount of tasks before it is replaced with a new worker
  • the chunksize argument controls how many jobs go to individual workers at a time (see also map/imap difference)
  • on the choice of map:
if you can determine all jobs before starting, a single map is probably easiest
apply hands a single job, map does multiple
map_async does not block the calling process - but only gives the results in a single chunk once it's done
imap variants gives you results as they finish
for more details, see below

Map variant details

The options here are:

  •, iterable[, chunksize])
waits around for all results before returning
order of returned values same as original
  • pool.map_async(func, iterable[, chunksize[, callback]])
does not wait around for all results
you can poll the the returned AsyncResult - which will only give data when all tasks are done(verify)
  • pool.imap(func, iterable[, chunksize])
yields results as they come in
takes items from the source iterable to the worker chunksize at a time (default 1)
lets you work with a generator and avoid memory costs of settling very large lists before chunking them up (if chunksize is small, also means measurably more CPU overhead, so don't leave it at 1 unless you really want to)
analogous to itertools.imap[1]
  • pool.imap_unordered(func, iterable[, chunksize])
yields results as they come in
no ordering guarantees
...note that if you use chunksize >1, it returns chunks as they finish

  • pool.apply(func[, args[, kwds]])
really just a single call, func(*args, **kwargs) - in one of the pool's workers
seems to just be a helper for the rest(verify)
  • pool.apply_async(func[, args[, kwds[, callback]]])
single apply, but doesn't block until it is finished
instead returns an AsyncResult object you can poll

Counting finished jobs

The easiest way is to use imap_unordered (or imap), as they are yield results while they finish.


import random, time, multiprocessing
def func(arg):
    time.sleep( random.uniform(0,2) )
    return 'bla'
p     = multiprocessing.Pool(4)
count = 0
start = time.time()
for res in p.imap_unordered(func, range(20), chunksize=2):
   print "[%5.2f] res:%s  count:%s"%(time.time()-start, res, count)
   count += 1

Adding jobs while it works

This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

That is, use the same pool object for jobs you did not initially know about.

...and avoid the zombie problem.

generator to _imap()

When you don't mind the call blocking, then it is probably convenient to:

  • hand in a generator
  • an imap variant

The pool keeps doing stuff as long as the generator yields things. When it stops yielding (that is, when you raise StopIteration), and the imap function finishes jobs for everything it had yielded, it will return.


import random, time, multiprocessing
def func(arg):
    #print '%s says %s'%(multiprocessing.current_process(), arg)
    return arg
def generate_jobs():
    while True:
        rval  = random.uniform(0, 0.1)
        time.sleep( rval )
        yield rval
        if rval<0.01: # randomly stop after a while
           raise StopIteration
p     = multiprocessing.Pool(10)
count = 0
start = time.time()
for i in range(10):
    print "chunk %d"%i
    for res in p.imap_unordered(func, generate_jobs()):
        print "[%5.2f] res:%s  count:%s"%(time.time()-start, res, count)
        count += 1


  • consider using imap's chunksize for lover overhead

If you don't want blocking, then you can hand chunks into map_async.

It's a bit more more bookkeeping for you, because this means you have multiple AsyncResult objects to deal with.

import random, time, multiprocessing
def func(arg):
    print '%s says %s'%(multiprocessing.current_process(), arg)
    time.sleep( random.uniform(0, 0.5) )
    return arg
p     = multiprocessing.Pool(5)
result1 = p.map_async( func, range(100,199) )
result2 = p.map_async( func, range(200,299) )
result3 = p.map_async( func, range(300,399) )
print "Waiting for results"
print result1.get()
print result2.get()
print result3.get()


  • seems to batch them in order
e.g. here parts from the second range won't get queued until the first is at ~195
  • doesn't do them in order
  • does return them in order
  • you can
    the results from a result object once it is
    , so in the above you could insert something like:
while True:
   alldone = True
   alldone &= result1.ready()
   alldone &= result2.ready()
   alldone &= result3.ready()
   print "Parts done: ",result1.ready(),result2.ready(),result3.ready()
   if alldone:

Be careful of


This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

Abrupt kills messing with communication

Process.terminate, os.kill, and such can break the communication channels a bit, so you may wish to avoid them when they are not necessary.

It's there in the documents, just worth mentioning.

Pool zombies

If you have a script that creates pools, then forgets about them, it leaves zombies behind (won't show as Z state(verify), but same concept).

This should rarely be noticeable because:

  • a single-run script will clean up when it quits
  • a pool will reuse its processes

The best way to create a problem is to creating a new pool for each set of jobs, and then don't clean up (turns out thousands of zombies will crash the system).

The full story is that cleanup happens during a pool.join(), which can only happen after pool.close(), so:

# create pool, do stuff then
Another way is to abuse the fact that
has the side effect of join()ing its children to check them (so does
but that's work)
. So you could do:
while True:
    act = multiprocessing.active_children()
    if len(act)==0:
        print "Waiting for %d workers to finish"%len(act)


  Wait for the worker processes to exit
  One must call close() or terminate() before using join().

  Prevents any more tasks from being submitted to the pool.
  Once all the tasks have been completed the worker processes will exit.

  Stops the worker processes immediately without completing outstanding work.
  When the pool object is garbage collected terminate() will be called immediately.

Memory leaks

Processes are reused.

If that eventually leads to memory leak problems, you may wish to look at maxtasksperchild (argument to the Pool constructor)

See also