Python usage notes - concurrency
|Syntaxish: syntax and language · changes and py2/3 · decorators · importing, modules, packages · iterable stuff · concurrency|
Read the above tl;dr.
The standard library itself is mostly thread-safe.
Exceptions are mostly in the places you'ld expect them, like IO. Python won't crash, but there are some things that aren't quite as atomic as you might expect.
Because in practice (and I paraphrase) 11 out of 10 people don't manage to implement a threaded app correctly, you may want to take a look at
- async programming (which is green threads in a single Os thread).
- frameworks like Kamaelia
to save you headaches in the long run.
Or just be conservative about what you share between threads, and how you lock that. Python makes that a little easier than lower-level languages, but there's still plenty room for mistakes.
Or just use it to separate things that won't block each other (...but in this case, also look at event driven systems - they can be more efficient depending on what you're doing).
The thread module provides threads, mutexes, and a few other things.
You can use it to fire off a function in a thread, but since you cannot wait for such threads, and main thread termination might monkeywrench things(verify), you probably want the threading module instead.
The threading module provides some more advanced locking mechanisms, and creates objects that represent threads.
When you just want to create a thread for a specific function, this saves you half a dozen boilerplatish lines:
thr1 = threading.Thread(target=func) # you can also hand along args, kwargs, and thread name thr1.start()
Since this object merely contains the thread, and the object itself stays around after the thread terminated, you can e.g. retrieve data stuck on it by the thread while it was running.
This can be convenient when you've got a bunch of bookkeeping to do.
In other cases it just adds lots of lines with no point.
from threading import Thread class MyThread(Thread): ''' An object that contains, and effectively is, a thread ''' def __init__(self): Thread.__init__(self) def run(self): #the function that indirectly gets run when you start() self.stuff=time.time() thr1 = MyThread() # create the thread object thr1.start() thr1.join() # wait for it to finish print thr1.stuff # retrieve something after the thread is done
Because basically all object state is shared between threads, you might need to worry about scopes and collisions and races, so it's practical to have something that never shares data.
local() creates an object that can be accessed by many threads, but each will only see its own data.
For example: (and making the point that the main interpreter is considered its own thread)
import threading, time i=0 perthread=threading.local() perthread.num=42 def f(): global i i+=1 perthread.num = i time.sleep(1) print perthread.num #create the thread objects thread1=threading.Thread(target=f) thread2=threading.Thread(target=f) #start the threads, wait for them to finish thread1.start();thread2.start() thread1.join();thread2.join() print perthread.num
There are three different states in perthread, one for the main thread, one for the first fired thread, one for the second fired thread.
This will likely output 1, 2 and 42 (or possibly 2, 1 and 42, depending on which thread was scheduled first) because perthread was accessed from the two new threads and the main thread, respectively (...note that its value set from a non-locked global (i), so you could theoretically get racing and 1,1,42 as the result(verify)).
|This article/section is a stub — probably a pile of half-sorted notes and is probably a first version, is not well-checked, so may have incorrect bits. (Feel free to ignore, or tell me)|
In py3 you get help in the form of the ThreadPoolExecutor - see #concurrent.futures_module
In py2, things are a little more manual.
(There is actually an undocumented multiprocessing.pool.ThreadPool (implemented with a bit of a hack and not too well documented), which can be convenient if you like to use the same interfaces as for multiprocessing)
A manual way is to start as many threads as you want, and have each source jobs safely from the same place, probably via a thread-safe, multi-producer, multi-consumer queue like queue.Queue.
Note that if you're not so careful with exceptions in the threads individual ones will stop.
If you want to start a thread for each job, then there's a quick and dirty solution in something like:
jobs=range(50) # these represent jobs to be started. You'ld use something real fired= target_threads = 5 while len(jobs)>0: #while there are jobs to be worked on if threading.active_count() < target_threads+1: # if there are fewer threads working than we want t=threading.Thread(target=some_function, args=( jobs.pop(), )) t.start() fired.append(t) else: #enough threads working time.sleep(0.2) #Make sure all threads are done before we exit. for th in fired: th.join()
By default, use of threading will add a hook (to threading._shutdown(verify)) to python's shutdown code that join()s on every thread.
This is e.g. the reason a Ctrl-C on a script with threads will often seem to make it hang - stop working but not exit - because it will often be waiting on a thread that didn't know to exit (sometimes due to how signals are handled when threads are involved).
Which is a sane default. For example, if that was your IO writing thread, you want to write to code that stops it in a sane state.
However, for threads that need zero cleanup (say, listening to keyboard input), waiting is unnecessary, and cleanup code is just extra work.
Threads started as daemon threads will be ignored by that join()ing code - and the process ending will interrupt that thread at an unpredictable point.
If you set it as a daemon thread, python won't wait on it.
- in py2 you'ld probably either
- thr.daemon = True
- or thr.setDaemon(True)
- ...before thr.start() gets called (because this management is done at thread init time)
- since py3.3 you can hand daemon=True to Thread's constructor.
Timely thread cleanup, getting Ctrl-C to work, and other subtleties
|This article/section is a stub — probably a pile of half-sorted notes and is probably a first version, is not well-checked, so may have incorrect bits. (Feel free to ignore, or tell me)|
Join waits until a thread is done.
This is a locking mechanism only, and does not imply any kind of cleanup in itself.
Often, the easiest way to do such cleanup is for each thread to have a loop in which you test "if shared stop variable is true, then do some cleanup code and return from the thread function"
It's good common practice to wait on all child threads - but often not really necessary.
Cases where it makes sense is where the child thread must do cleanup to not break something, that should not be interrupted by the main thread exiting. Say, if you have an IO thread writing files, you probably want to not break in the middle of a file write.
Note that unless you mark threads as daemon threads, the interpreter will wait on all of them before exiting.
Which, if you thought about none of this, means it'll not exit very easily.
Dealing with Ctrl-C
Ctrl-C will arrive in only one thread.
Practically, you may prefer to do the real reaction to it in the main thread, mainly for ease of management, to have one clear place that tells others to stop.
If the signal module is available, then the signal will always go to the main thread - but you can't count on that on every system. (details?(verify)) If you don't want to rely on that, the more robust variant is to ensure it goes to the main thread yourself: try-catch KeyboardInterrupt in all threads, and call thread.interrupt_main() in response.
(You may want to deal with other signals as well. In part because you may care, in part just to throw them away, because some Python implementations will try to get the OS scheduler to switch faster to get the signal handled sooner - which means that scheduling is less efficient until you actually receive the signal)
The main thread needs to be scheduled at all to catch the signal, and for some code, the OS may not have any reason to do so - e.g. if the main thread is join()ed-without-timeout on child threads, or waiting on a lock. As such, try to do those things on a timeout (in the case of a lock, this may require switching to a type of lock that supports checks / timeouts), or sleep-and-poll on these things instead.
If you want to be able to shut down threads cleanly in terms of whatever they're doing, you'll want some way of letting the main thread tell all others to stop their work soon - quite possibly just a shared variable (in the example below a global) that all threads can respond to soon enough.
Example dealing with a bunch of that:
import threading import thread import time stop_now = False # Your thread funtion might look something like: def threadfunc(): try: while not stop_now: print "child thread doing stuff" time.sleep(1) except KeyboardInterrupt: # for cases where we see it, send it to the main thread instead print "\nCtrl-C signal arrived in thread" thread.interrupt_main() # Now, assuming we've started threads we can join on, e.g. fired_threads =  f = threading.Thread(target=threadfunc) f.start() fired_threads.append( f ) # then your main loop can e.g. do while len(fired_threads)>0: try: #wait for each to finish. The main thread will be doing just this. for th in fired_threads: print "parent thread watching" th.join(0.2) #timeout quickish - this main thread still gets scheduled relatively rarely if not th.is_alive(): #print "Thread '%s' is done"%th.name # you may want to name your threads when you make them. fired_threads.remove(th) except KeyboardInterrupt: print "\nCtrl-C signal arrived in main thread, asking threads to stop" #some mechanism to get worker threads to stop, # such as a global they listen to within reasonable time stop_now = True
|This article/section is a stub — probably a pile of half-sorted notes and is probably a first version, is not well-checked, so may have incorrect bits. (Feel free to ignore, or tell me)|
The following are factories that return new locks.
- once acquire()d once, any thread doing another acquire()s will be block until the lock is released
- any thread may release it
- must be release()d by the thread that acquire()d it.
- a thread may lock it multiple times (without blocking); acts semaphore-like in that multiple aquires should be followed by just as many release()s
On python threading and the GIL
Python has a complicated relationship with threading, due to its GIL
- python models threads
- which are are OS-level threads (pthreads, windows threads, or such)
- so rely on OS scheduling, so are free to be as parallel as they want
- but only one thread can access the python interpreter internals at a time
- this is enforced for you, via the GIL
- that implies that pure-python threads cannot distribute among cores
- meaning you use threads more for convenience, like its scheduling (e.g. around IO) than for parallelism
- C extensions, in particular number crunching (think numpy, scipy, numba, various GPGPU) can happen outside of python, so the GIL need not apply (but extensions have to specifically consider this), e.g.
- numpy implicitly releases the GIL around work done in C
- in numba you tell it explicitly
- things like OpenBLAS and MKL (and some numpy) may do its own separate multithreading
- the GIL still applies whenever data needs to go in or out, or that C extension (otherwise) needs interpreter internals
- and note that for some cases, threads rather than processes can save a lot of RAM and/or data copying
For mostly-python stuff
- if you have lots of small jobs, look at
- async programming
- if you have lots of bulky jobs, take a look at
More on the GIL (largely information from David Beazley's GIL talk)
Python threads are real OS threads (pthreads, windows threads, or such) so are scheduled by the OS - a good thing in general as the OS tends to know system scheduling better than python could, or arguably even should.
On the python side, only one thread can be running in an interpreter at one time. This because only one thread getting access to of that interpreter's internals at a time is the simplest way to keep things correct. This is enforced via the Global Interpreter Lock (GIL).
There are some well known implications of the GIL, which mean multithreading in python should be treated as a convenience first, more than the most performant solution.
Yes, if there is little to no interaction between threads, we can efficiently use a single core.
But a single interpreter cannot work on more than one CPU, so cannot do work on more than one core (other than trying to get the GIL, or non-GIL-locked work).
Importantly, a thread is not an interpreter. As C extensions are external to python, they could do whatever threading they want as long as they are consistent towards the python API.
At the same time, they will often GIL-lock as an easy way for them to be thread-safe, ensuring that calls to C happen from one thread at a time.
C extensions could choose to run without GIL locking, but few do because in most cases it has few real benefits, meaning you get the "quite hard to do right" part for no useful reason.
Python cannot force an OS task switch - typically no software can, as it's just a process.
However, to help play nice in system scheduling, it does occasionally do a GIL unlock, give a hint to the threading library that "hey scheduler, I could give up right now", then a GIL lock and then gets back to more work. (verify)
Python seems to do so after every so-many high-level VM operations which, depending on the code, could mean something between milliseconds and seconds of wall-clock time. Apparently later changed to never be on the slow end of that(verify).
It also does this for blocking IO.
The OS does not know about the GIL, of course, so may schedule python threads that can't do much right now. This didn't matter much before multicore because it'd just go to the next task, but the tendency to aggressively schedule threads means that two threads from the same interpreter may easily be scheduled concurrently on multiple cores. Since the GIL means only one thread gets to run at one time, and while conceptually one will just wait, practically the signalling and timing issues can sometimes compound to cause contention on the GIL (it is partly OS prioritizing that avoids perpetual livelock-like situations) and potentially a bunch of extra signalling so significant time spent on non-work. Apparently more so on CPU bound work.(verify)
Some improved threading code (using fixed time rather than ticks) makes it more likely that threads are cycled through, intrduced around 3.2 (~2010)
- does that mean python is effectively singly-threaded?
- Worst-case? yes. General-case? no.
- The GIL itself is conceptually a basic acquire/release lock that stays acquired while a thread is running
- ...but is actually based on signalling
- when a thread does some blocking IO, python makes that thread release
- periodic thread check work is / used to be done every 100 ticks (or another amount, see sys.getcheckinterval()/sys.setcheckinterval()).
- a tick is usually a few high-level vm instructions, which may be many more low-level instructions meaning this can range from microseconds to, in some cases, seconds.
- a tick is non-uninterruptable.
- an OS-level block (thread join, mutex lock, or such) in the main thread means
blocking signal handling (which breaks things like Ctrl-C)
- handing of incoming signals (like Ctrl-C) can only be done in the main handler
- Actually, the check interval(verify) is lowered when there are pending signals, so that it is more likely the main thread is served soon
- which means more context switches and more overhead until then (did this change? (verify))
Subprocess runs a single process as a child.
- available in ≥py2.4 -- replaced earlier mess of os.popen, os.system, os.spawn, or the commands or popen2 modules, and is more predictable cross-platform than some of those.
- You usually want to use the subprocess.Popen class
- (It also has subprocess.call(), which is slightly shorter when you can wait for it to finish. It just creates a Popen object and wait()s on it)
- False: an array of strings (to be handled more execv style), a little leaner because it avoids that extra process, can be less bothersome with shell escaping, and can be more secure against injection attacks
- True: hand in single string to be parsed by the shell it is run in. Can be more predictable in terms of environment, or just lazier in general
- if you want stdin, stdout, and/or stderr to go to you rather than the terminal, you need to specify you want that
- then you can read() and write(), and/or use the gathered results
- > /dev/null can be imitated by having DEVNULL = open(os.devnull, 'w') and handing that in (though if you use shell=True you might as well do it there)
- if you can wait until it's done, use communicate()
- handles stdin/stdout/stderr in the same line
- most other options (waiting/polling, reading in chunks rather than a single blob) are also valid, but more work
- if you need to interact with it, even just read output, then blocking calls are an issue. Read up.
- see PEP 324
command in single string or array, and shell=True/False
single string and shell=True
- You are handing this string to the shell to parse and execute.
- Can e.g. include multiple commands, pipes, and other shell features.
- Gives the same escaping trouble as you would typing commands into a normal shell.
- be careful to sanitize strings. Someone can try to exploit the following example with, say, name = '"; cat /etc/shadow; echo "'
- Lets you write:
p = subprocess.Popen('ps ax | grep "%s"'%name, stdout=subprocess.PIPE, shell=True) output,_ = p.communicate()
Array of strings and shell=False
- Safer, but often a little more code.
- Some things will notice they're not running in a shell and act differently.
- (In a few cases you can only get sensible behaviour with the variant above)
- The previous example in this style would be something like:
ps = subprocess.Popen(['ps','ax'], shell=False, stdout=subprocess.PIPE) grep = subprocess.Popen(['grep', name], shell=False, stdin=ps.stdout, stdout=subprocess.PIPE) grep_output,_ = grep.communicate()
The other two combinations don't make sense
- An single string with shell=False is equivalent to placing that string into a single-item list - it won't work unless it's a single command
- A sequence with shell=True seems to use args and ignore the rest.
Popen constructor arguments
- args, which can be either a single string or a sequence of argument strings, see above
- shell=False (execute through the shell, /bin/sh). Note that
- many programs don't need shell=True, but it may be simpler for you when you use shell features like wildcards, pipes, here documents and whatnot.
- shell=True means characters special to the shell are interpreted (may be a pain to escape them)
- stdin=None, stdout=None, stderr=None, each of which can be
- None: no redirection, usually meaning they stay tied to the shell that python was started from
- subprocess.PIPE - meaning you get an object you can read() from (for stdout, stderr) or write() to (for stdin)
- a file object, or file descriptor (integer)
- also, you have the option of merging stderr and stdout (specify stderr=subprocess.STDOUT)
- bufsize=0 (applies to stdin, stdout, and stderr (verify) if subprocess fdopen()s them)
- 0 means unbuffered (default)
- 1 means line buffering
- ≥2 means a buffer of (approximately) that size
- -1 / negative values imply system default (which often means fully buffered)
- None (the default) means 'inherit from the calling process'
- You can specify your own dict, e.g. copy os.environ and add your own
- if not None, is taken as a path to change to. Useful for cases where files must be present in the current directory.
- Does not help in finding the executable
- does not affect the running python program's cwd (verify)
- When shell=True, you can specify a shell other than the default (/bin/sh on unix, the value of COMSPEC on windows)
- When shell=False, you can specify the real executable here and use args for a display name.
- If True, '\n', '\r', and '\r\n' in stdout and stderr arrive in python as '\n'.
- (Depends on code that may, in fairly rare cases, not be compiled into python)
- call a python callable in subprocess, before the external call. Unix only.
- Closes all file handles left open (other than stdin/stdout/stderr (0/1/2))
- startupinfo and creationflags
Popen object members
- if you constructed Popen with stdin=PIPE: file object
- if you didn't (default): None
- (note: if you need only a single interaction with the process, then using communicate() is often simpler)
- stdout,stderr (note: can be ignored if you use communicate())
- file objects if you constructed Popen with stdout=PIPE / stderr=PIPE
- None if you didn't (default)
related to completion
- ...sends input string (if specified), reads stdout and stderr into memory (as requested), returns those two as strings once the process is done
- see below for more detail
- poll() for child process completion. Handy when you want to watch several sub-processes, or do stuff asynchronously.
- returns process return code if it's done,
- returns None if it's not
- wait(timeout=None) for child process completion.
- once finished, returns the process's return code.
- if you specified a timeout and it timed out, raises TimeoutExpired ()
- communicate() is preferred due to the deadlock issue(verify)
- pid: child process's Process ID
- None - before the child has terminated
- integer return code - after the child has terminated
- note that 127 often means shell=True and the shell couldn't find the executable (see man sh, man bash) (if shell=False, it will have raised an OSError)
- On unix, negative values signal termination by signal (-abs(signalnum))
Cleaning up and avoiding zombies
tl;dr: You should somehow end up with wait()
wait() is called internally by call() and communicate(), so this is more relevant for cases with more manual interchange with the process.
Apparently garbage collection of the subprocess object does too, but not under all conditions(verify), so it's cleaner to handle cases explicitly.
terminate() does not imply wait, so you want to follow up terminate with an explicit wait()
If you're managing a process pool yourself, then to avoid blocking you may want to do some other checks, like that pipes have gone EOF (or explicitly terminate).
Where the streams go
By default, the subprocess object's stdout and stderr streams are not touched, typically meaning they end up in the underlying shell.
If you specify subprocess.PIPE
- then you get file handles in the the calling python process corresponding to stderr and stdout on the relevant subprocess object.
- communicate() will read from these until process is complete, and return one string each.
- Convenient for short-running programs.
- Alternatively you can read(), readline(), etc. on them as you wish
- ...but note this (unlike communicate) has some potential deadlock situations.
This deadlock is not unique to python, or unix (Python's read and write wrap libc's pretty much as-is).
It's actually a sort-of-textbook example, for cases where you have more than one limited-size pipe, combined with blocking reads/writes.
Consider one side is doing a blocking write on the other -- which only actually blocks when the corresponding buffer is currently full (waiting for the other end to start emptying it) -- and the other side is doing a blocking read on the empty other (waiting for some data to appear).
You can sometimes get away with not thinking about this at all, because this deadlock tends not to happen when:
- the subprocess tends to spam both streams with roughly equal amounts of short messages,
- when it doesn't write to one of them (will never block writes on that)
- if there is more buffering than the subprocess typically outputs to either
- if the subprocess actually acts in a prompt / request-response style (because that means both sides will tend to use one stream)
Use of wait() apparently has a similar issue.(verify)
In general, your choices:
- For a non-interactive program, use communicate(), which just gives you the full outputs as strings, and is written to be free from this specific deadlock trouble.
- If you want to react as things happen, or know the output may take too much RAM, then you must stream instead
- Consider merging the two streams using stderr=subprocess.STDOUT, stdout=subprocess.PIPE (...though how clean they mix depends on how exactly the underlying process flush. It's fine in most situations, but separating them is cleaner)
- If you must have them separately, options include:
- use threads for each pipe
- you can use select(), though the entirely correct logic is a little long (also: (verify)this is cross-platform)
- use O_NONBLOCK, though that changes your logic and also makes it a little more complex
For example, to collect both independently with threads, building on a base like:
def readerthread(fh, buffer): buffer.append( fh.read() ) out_ary =  out_thread = threading.Thread(target=readerthread, args=(p.stdout, out_ary)) out_thread.start() err_ary =  err_thread = threading.Thread(target=readerthread, args=(p.stderr, err_ary)) err_thread.start() out_thread.join() err_thread.join()
Note that when you use readline() in the above, that python by default adheres to POSIX newlines, i.e. \n, and doesn't consider \r to be newline. Which means that in some cases you want to look at universal_newline=True on the subprocess call (looks for the others, and translates them for you - see PEP 278) for it to work as you expect.
On stream buffering
Keep in mind that (and most of this is not python-specific)
- stdin is buffered
- stdout is typically buffered, or line buffered on shells
- stderr may not be buffered
- buffering at all means there is no true order to what comes in on these two streams (unless you remove all buffering (usually hinders performance) and the program isn't threaded)
- ...this applies within each process. You can often not control how a program buffers or flushes.
- a pipe also represents a buffer
bufsize: see its mention in argument list above. This is basically the buffering applied on python's size if and when it uses fdopen() on the underlying file descriptor. (verify)
It seems that iterating over a file object for its lines, i.e.
for line in fileobject:
adds its own buffering within the iterator(verify), so if you want more realtime feedback, either add your own while loop around readline - or get the same behaviour via:
for line in iter(fob.readline, ''): #note: on py3 that should typically be b''
Usage notes, examples
Handy when you want to block until the subprocess quits.
p = subprocess.Popen("ps ax | grep %s"%name, stdout=subprocess.PIPE) p.wait() output = p.stdout.read() p.stdout.close()
Handy convenience function when you want to block and handle input and output data: The communicate() function sends in data, wait()s for the process to finish, and returns stdout and stderr, as pipes, strings, or whatnot. Example:
p = subprocess.Popen("sendmail -t -v", shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate( email_message.as_string() )
If you want to watch several sub-processes, you'll be interesting in poll() (returns return code, or None if not yet finished(verify)).
- PATH and environment
You can rely on PATH (and other inherited environment(verify)) whether shell=True or False -- unless it is explicitly cleared for some reason (sudo, some embedded interpreters).
A lot of errors will fall under OSError, in which case you probably want to check the exception's .errno
- errno==2 (ENOENT, 'file not found'_, here meaning the executable
- if it's there, it may e.g. be you're handing a list into shell=True
- errno==11 (EAGAIN, 'Resource temporarily unavailable'), probably on the os.fork() call, usually means the maximum amount of child processes is reached (think ulimit) -- and probably because they're not cleaned up
- also note that communicate() does this for you, so may be convenient
- if you need streaming, then it's probably that you're not closing all pipes (also consider stdin, if you open it)
Replacing older styles with subprocess
For details, see http://docs.python.org/release/2.5.2/lib/node533.html
Summary of that:
- Most of the previous styles rely on shell parsing, so the easiest method is to pass in the string as before and set shell=True
- ...except os.spawn*, it's list-based. If you're using this, you probably want to read up on the details anyway.
- ...and popen2.popen in cases where you give it a list (it can take a string and sequence and choose what you now handle with shell)
- redirect as you need to, get the file objects from the Popen object
- hand along bufsize if you need it
- You may want to check out differences in whether the call closes open file handles
- You may want to check the way errors arrive back in python
Historically, there have been a number of system call methods, mostly:
- os members
- commands (a convenience wrapper around os.popen)
- popen2 (2.4, 2.5; deprecated in 2.6)
- popen2.Popen3 class
- popen3.Popen4 class
- lets you fire off python functions in distinct processes
- (cheaper on *nix than windows, because fork()ed)
- nice to parallelize things that do nontrivial CPU-work at a time, and don't have to communicate very much
When you have a fixed number of distinct-purpose workers, you will appreciate that
- the API is similar to the threading module and
- it comes with helpers, for
- message passing
- object sharing
- locking (e.g. to control printout on stdout)
- process pools
If your wish is a set of well defined communicating threads, look at basic use.
If your wish is "throw X non-communicating jobs at Y processes/CPUs", look at the pool stuff.
Also has some requirements, like picklability, and closing workers nicely. Read its the "Programming guidelines" notes to keep things smooth (and portable).
One way to use it is an almost direct analogue to the threading module.
from multiprocessing import Process def f(args): print args p = Process(target=f, args=('bob',)) p.start() # do other stuff. # Eventually: p.join()
An important way in which it differs from threading is in multiprocessing, instead of sharing memory, there is a manager that pickes-and-unpickles any state you pass.
Instances of the following can be handed about, and its communication is handled for you:
- multiprocessing.Queue is made for that purpose, and producer-consumer (so can have more than one) is thread-safe and process-safe.
- multiprocessing.Pipe is a a two-way, two-endpoint bytestream. It's faster, but less flexible, and since they do not communicate pre defined units, their use is not thread-safe or process-safe.
- its .dict() will be communicated - lists and dicts of primitives?
- or objects via namespaces (not automatic?)
If you have bulky data, or want to communicate more frequently and/or more efficiently, read on.
Shared memory is possible, via ctypes (in shared memory synchronized via RLock)
- multiprocessing.sharedctypes - more flexible, but more work
Keep in mind that atomicity does not mean lack of races. Even an integer += may race with another, so when in doubt, add your own locks to imitate transactions.
multiprocessing.Manager lets you dedicate one process to the actual manipulation, and have other processes use it via proxies.
The basic use is to throw an iterable at a pool object:
import multiprocessing def myfunc(x): return x*x mypool = multiprocessing.Pool(5) # pool size print mypool.map(myfunc, [1,2,3]) # map() is one choice, you probably want to read up on the alternatives
- pools are reusable, so when you are done with them you should close() them
- close is the clean way: it lets outstanding work finish, then removes the processes
- if your code wants to wait until things are done, do a close(), then a join()
- terminate() is the hard-handed: it kills current workers
- also what the garbage collector does to pool objects - often not exactly what you want
- A worker process will be reused for tasks
- You can limit the amount of tasks before it is replaced with a new worker
- the chunksize argument controls how many jobs go to individual workers at a time (see also map/imap difference)
- on the choice of map:
- if you can determine all jobs before starting, a single map is probably easiest
- apply hands a single job, map does multiple
- map_async does not block the calling process - but only gives the results in a single chunk once it's done
- imap variants gives you results as they finish
- for more details, see below
map() and its variants
If you have independent jobs, want to keep CPUs busy, and deal with results as they finish, you're probably looking for imap_unordered().
If you don't care about as-you-go, and/or handling results in order is simpler, read on.
The options here are:
- pool.map(func, iterable[, chunksize])
- map() call blocks until all results are done
- order of returned values same as original
- pool.map_async(func, iterable[, chunksize[, callback]])
- map_async() does not block, instead returns AsyncResult it returns to be polled
- that object will only return any data when everything is completely done
- (same if you use callback - only called when completely done)
- pool.imap(func, iterable[, chunksize])
- yields results as they come in - but only in order
- so when later jobs come back somewhat out of order, finished items won't yield until all earlier ones are done
- takes items from the source iterable to the worker chunksize at a time (default 1)
- analogous to itertools.imap
- pool.imap_unordered(func, iterable[, chunksize])
- yields results as they come in
- no ordering guarantees
- ...note that if you use chunksize >1, it returns chunks as they finish
- pool.apply(func[, args[, kwds]])
- really just a single call, func(*args, **kwargs) - in one of the pool's workers
- seems to just be a helper for the rest(verify)
- pool.apply_async(func[, args[, kwds[, callback]]])
- single apply, but doesn't block until it is finished
- instead returns an AsyncResult object you can poll
Counting finished jobs
The easiest way is to use imap_unordered (or imap), as they yield results while they finish.
import random, time, multiprocessing def func(arg): time.sleep( random.uniform(0,2) ) return '[bla %s]'%arg p = multiprocessing.Pool(4) count = 0 start = time.time() for res in p.imap_unordered(func, range(20), chunksize=2): print "(after %3.1fsec) returnval:%s count:%s"%(time.time()-start, res, count) count += 1
Adding jobs while it works
That is, use the same pool object for jobs you did not initially know about.
...and avoid the zombie problem.
There are a few options:
generator to _imap()
When you don't mind the call blocking, then it is probably convenient to:
- hand in a generator
- ...to an imap variant
The pool keeps doing stuff as long as the generator yields things. When it stops yielding (that is, not when it blocks, but when it raises StopIteration), and the imap function finishes jobs for everything it had yielded, it will return.
import random, time, multiprocessing def func(arg): #print '%s says %s'%(multiprocessing.current_process(), arg) return arg def generate_jobs(): while True: rval = random.uniform(0, 0.1) time.sleep( rval ) yield rval if rval<0.01: # randomly stop after a while raise StopIteration p = multiprocessing.Pool(10) count = 0 start = time.time() for i in range(10): print "chunk %d"%i for res in p.imap_unordered(func, generate_jobs()): print "[%5.2f] res:%s count:%s"%(time.time()-start, res, count) count += 1 p.close() p.join()
- consider using imap's chunksize for lover overhead
If you don't want blocking, then you can hand chunks into map_async.
It's a bit more more bookkeeping for you, because this means you have multiple AsyncResult objects to deal with.
import random, time, multiprocessing def func(arg): print '%s says %s'%(multiprocessing.current_process(), arg) time.sleep( random.uniform(0, 0.5) ) return arg p = multiprocessing.Pool(5) result1 = p.map_async( func, range(100,199) ) result2 = p.map_async( func, range(200,299) ) result3 = p.map_async( func, range(300,399) ) print "Waiting for results" p.close() p.join() print result1.get() print result2.get() print result3.get()
- seems to batch them in order
- e.g. here parts from the second range won't get queued until the first is at ~195
- doesn't do them in order
- does return them in order
- you can .get() the results from a result object once it is ready(), so in the above you could insert something like:
while True: alldone = True alldone &= result1.ready() alldone &= result2.ready() alldone &= result3.ready() print "Parts done: ",result1.ready(),result2.ready(),result3.ready() if alldone: break time.sleep(0.3)
Be careful of
An exception in a Process will
- make its exitcode will be 1.
- not print a stack trace itself (verify)
- be handed to the main function, meaning the source of the exception becomes the map/apply, which is uninformative for debugging
You probably want to put a try-except that wraps everything in your function, and then e.g.
- print the error while still in that process (e.g. traceback.print_exc() for a stacktrace), and/or still raise it.
- send a more detailed error description to the parent process, e.g. via a general messaging Queue or Pipe
- you may wish to send along its original context if you want a useful stack trace - read http://nedbatchelder.com/blog/200711/rethrowing_exceptions_in_python.html
With pools things are a little different. The Process it appeared in will stop, be replaced with another, and this individual job will not be restarted.
The state of this is a little finicky, read http://stackoverflow.com/questions/24894682/python-multiprocessing-crash-in-subprocess
The handiest thing is often the same wrapping try-except. You can ignore, send to the master process, etc.
Pools and KeyboardInterrupt is another detail.
It only arrives in the parent if it was busy doing a bit of its management(verify). More typically it arrives at a worker (when jobs are longish-running, anyway), you just kill that worker, which will be replaced as mentioned above.
You would want a KeyboardInterrupt in a child to trigger a pool.terminate() in the parent.
One approach is to make sure the KeyboardInterrupt makes it to the parent, because that makes one try-except handler get all cases. It seems just returning from the child will. This will more or less rely on an _async variant.
Abrupt kills messing with communication
Process.terminate, os.kill, and such can break the communication channels a bit, so you may wish to avoid them when they are not necessary.
It's there in the documents, just worth mentioning.
If you have a script that creates pools, then forgets about them, it leaves zombies behind (won't show as Z state(verify), but same concept). (turns out thousands of zombies can crash a host -- or the process e.g. if it runs out of file handles for shm stuff, hits the max processes for a user set via ulimit, etc.)
You won't notice this for single-run scripts, as they get implicit cleanup when they exit.
You also don't notice this when you actively reuse the same pool object.
It can become a problem e.g. when, on a loop, you create a new pool for each set of new jobs and then don't clean it up.
- do a pool.close() once you have no more jobs for the pool
- then eventually do a pool.join() to get the cleanup
Pool.close() Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit. Pool.terminate() Stops the worker processes immediately without completing outstanding work. When the pool object is garbage collected terminate() will be called immediately.
Pool.join() Wait for the worker processes to exit One must call close() or terminate() before using join().
Another way is to abuse the fact that multiprocessing.active_children() has the side effect of join()ing its children to check them (so would individual Process.is_alive() calls but that's work). So you could do:
while True: act = multiprocessing.active_children() if len(act)==0: print "Waiting for %d workers to finish"%len(act) time.sleep(2)
Processes are reused.
If that eventually leads to memory leak problems, or other leaks, you may wish to look at maxtasksperchild (argument to the Pool constructor)
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
The worker function must be a global function.
Error sending result: '[<generator object procfunc at 0x7fa71f847eb0>]'. Reason: 'TypeError("can't pickle generator objects",)'
You're trying to yield, which means returning a generator, which means picking it, and you can't pickle a generator.
If just returning won't do, you probably want multiprocessing.Queue. These are thread- and process-safe. Just pass them in and use them.
Probably means you're hitting something set by ulimit, e.g. process limit for user (ulimit -u)
Can relate to the pool zombie problem
Basically any exception
If the stack trace mentions
...that's usually the results object passing through an exception(verify), meaning you have a semi-useful exception.
But you don't get the relevant stack trace where it's from, because the worker effectively swallows it.
To do debugging and see the real stack trace, either have your job function do the exception handling (and printing), or run that function it outside of a pool.
A high-level interface for asynchronously executing callables, introduced around py3.2.
Mainly, you get a
which basically present the same API, that of submitting functions with arguments (and are wrapping the thread and process modules).
Async IO, asynchronous coding