Python usage notes/Multiprocessing notes

From Helpful
Jump to: navigation, search
Various things have their own pages, see Category:Python. Some of the pages that collect various practical notes include:


tl;dr:

  • lets you fire off (fork()ed where supported) python functions in distinct processes
  • nice to parallelize things that do nontrivial CPU-work at a time, and don't have to communicate very much
  • Py≥2.6


When you have a fixed number of distinct-purpose workers, you will appreciate that

the API is similar to the
threading
module and
it comes with helpers for
message passing
object sharing
locking (e.g. to control printout on stdout)
process pools
...if your wish is "throw X non-communicating jobs at Y processes/CPUs", look at the pool stuff.


Also has some requirements, like picklability, and closing workers nicely. Read its the "Programming guidelines" notes to keep things smooth (and portable).


Basic use

One way to use it is an almost direct analogue to the threading module.

from multiprocessing import Process 
def f(args):
    print args
 
p = Process(target=f, args=('bob',))
p.start()
# do other stuff. 
 
# Eventually:
p.join()


Communication options

This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

See also:



Pools

This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

The basic use is to throw an iterable at a pool object:

import multiprocessing
 
def myfunc(x):
    return x*x
 
mypool = multiprocessing.Pool(5) # pool size
print mypool.map(myfunc, [1,2,3]) # map is one choice, there are others you may want


Notes:

  • pools are reusable, so when you are done with them you should close() them
close is the clean way: it lets outstanding work finish, then removes the processes
if your code wants to wait until things are done, do a close(), then a join()
terminate() is the hard-handed: it kills current workers
also what the garbage collector does to pool objects - often not exactly what you want
  • A worker process will be reused for tasks
You can limit the amount of tasks before it is replaced with a new worker
  • the chunksize argument controls how many jobs go to individual workers at a time (see also map/imap difference)
  • on the choice of map:
if you can determine all jobs before starting, a single map is probably easiest
apply hands a single job, map does multiple
map_async does not block the calling process - but only gives the results in a single chunk once it's done
imap variants gives you results as they finish
for more details, see below


Map variant details

The options here are:

  • pool.map(func, iterable[, chunksize])
waits around for all results before returning
order of returned values same as original
  • pool.map_async(func, iterable[, chunksize[, callback]])
does not wait around for all results
you can poll the the returned AsyncResult - which will only give data when all tasks are done(verify)
  • pool.imap(func, iterable[, chunksize])
yields results as they come in
takes items from the source iterable to the worker chunksize at a time (default 1)
lets you work with a generator and avoid memory costs of settling very large lists before chunking them up (if chunksize is small, also means measurably more CPU overhead, so don't leave it at 1 unless you really want to)
analogous to itertools.imap[1]
  • pool.imap_unordered(func, iterable[, chunksize])
yields results as they come in
no ordering guarantees
...note that if you use chunksize >1, it returns chunks as they finish


  • pool.apply(func[, args[, kwds]])
synchronous
really just a single call, func(*args, **kwargs) - in one of the pool's workers
seems to just be a helper for the rest(verify)
  • pool.apply_async(func[, args[, kwds[, callback]]])
single apply, but doesn't block until it is finished
instead returns an AsyncResult object you can poll



Counting finished jobs

The easiest way is to use imap_unordered (or imap), as they are yield results while they finish.


Example:

import random, time, multiprocessing
 
def func(arg):
    time.sleep( random.uniform(0,2) )
    return 'bla'
 
p     = multiprocessing.Pool(4)
count = 0
start = time.time()
for res in p.imap_unordered(func, range(20), chunksize=2):
   print "[%5.2f] res:%s  count:%s"%(time.time()-start, res, count)
   count += 1


Adding jobs while it works

This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)

That is, use the same pool object for jobs you did not initially know about.

...and avoid the zombie problem.


generator to _imap()

When you don't mind the call blocking, then it is probably convenient to:

  • hand in a generator
  • ...to an imap variant

The pool keeps doing stuff as long as the generator yields things. When it stops yielding (that is, when you raise StopIteration), and the imap function finishes jobs for everything it had yielded, it will return.


Example:

import random, time, multiprocessing
 
def func(arg):
    #print '%s says %s'%(multiprocessing.current_process(), arg)
    return arg
 
def generate_jobs():
    while True:
        rval  = random.uniform(0, 0.1)
        time.sleep( rval )
        yield rval
        if rval<0.01: # randomly stop after a while
           raise StopIteration
 
p     = multiprocessing.Pool(10)
count = 0
start = time.time()
for i in range(10):
    print "chunk %d"%i
    for res in p.imap_unordered(func, generate_jobs()):
        print "[%5.2f] res:%s  count:%s"%(time.time()-start, res, count)
        count += 1
p.close()
p.join()

Notes:

  • consider using imap's chunksize for lover overhead
map_async

If you don't want blocking, then you can hand chunks into map_async.

It's a bit more more bookkeeping for you, because this means you have multiple AsyncResult objects to deal with.

import random, time, multiprocessing
 
def func(arg):
    print '%s says %s'%(multiprocessing.current_process(), arg)
    time.sleep( random.uniform(0, 0.5) )
    return arg
 
p     = multiprocessing.Pool(5)
 
result1 = p.map_async( func, range(100,199) )
result2 = p.map_async( func, range(200,299) )
result3 = p.map_async( func, range(300,399) )
 
print "Waiting for results"
 
p.close()
p.join()
print result1.get()
print result2.get()
print result3.get()

Notes:

  • seems to batch them in order
e.g. here parts from the second range won't get queued until the first is at ~195
  • doesn't do them in order
  • does return them in order
  • you can
    .get()
    the results from a result object once it is
    ready()
    , so in the above you could insert something like:
while True:
   alldone = True
   alldone &= result1.ready()
   alldone &= result2.ready()
   alldone &= result3.ready()
   print "Parts done: ",result1.ready(),result2.ready(),result3.ready()
   if alldone:
       break
   time.sleep(0.3)

Be careful of

Exceptions

This article/section is a stub — probably a pile of half-sorted notes, is not well-checked so may have incorrect bits. (Feel free to ignore, fix, or tell me)


https://azylstra.net/blog/post/exception-handling-with-python-processes

Abrupt kills messing with communication

Process.terminate, os.kill, and such can break the communication channels a bit, so you may wish to avoid them when they are not necessary.

It's there in the documents, just worth mentioning.


Pool zombies

If you have a script that creates pools, then forgets about them, it leaves zombies behind (won't show as Z state(verify), but same concept).


This should rarely be noticeable because:

  • a single-run script will clean up when it quits
  • a pool will reuse its processes


The best way to create a problem is to creating a new pool for each set of jobs, and then don't clean up (turns out thousands of zombies will crash the system).


The full story is that cleanup happens during a pool.join(), which can only happen after pool.close(), so:

# create pool, do stuff then
pool.close()
pool.join()
Another way is to abuse the fact that
multiprocessing.active_children()
has the side effect of join()ing its children to check them (so does
Process.is_alive()
but that's work)
. So you could do:
while True:
    act = multiprocessing.active_children()
    if len(act)==0:
        print "Waiting for %d workers to finish"%len(act)
    time.sleep(2)



Relevant:

Pool.join()
  Wait for the worker processes to exit
  One must call close() or terminate() before using join().

Pool.close()
  Prevents any more tasks from being submitted to the pool.
  Once all the tasks have been completed the worker processes will exit.

Pool.terminate()
  Stops the worker processes immediately without completing outstanding work.
  When the pool object is garbage collected terminate() will be called immediately.

Memory leaks

Processes are reused.

If that eventually leads to memory leak problems, you may wish to look at maxtasksperchild (argument to the Pool constructor)

See also