Which Python concurrency library should you use in 2022?
A motivating example
Let’s imagine there are three tasks that we need to perform. Task 1 has no dependencies; Task 2 has no dependencies; but Task 3 must run after Task 2:
def task1():
# Can run any time
def task2():
# Can run any time
def task3():
# Must run after task2
In synchronous code, we may simply run all tasks one after the other:
task1()
task2()
task3()
But this isn’t the optimal way to run these tasks. Task 1 and Task 2 have no dependencies, and can logically run at the same time. Is there a better way?
What we desire is the ability to encode the minimal dependencies between these tasks, and run them as fast as possible, regardless of how long they individually take:
Concurrency in Python
Python provides a variety of ways in its standard library to design concurrent programs. There are additional third party libraries that provide alternative concurrency models. We’re going to explore how to write our simple three task example in a variety of concurrency models.
Threading
The Python threading library provides access to OS threads through an interface that will feel familiar to systems or OS programmers. When talking about threading in Python, it is important to remember that Python’s threading library is its own independent abstraction around OS threads, and does not provide direct access to underlying OS threads. All Python programs execute at least one thread — the main thread.
The main thread is special. All Python signal handlers are executed on the main thread — including the default SIGINT
handler that raises KeyboardInterrupt
- even if the signal was received on a different thread. Because of this, the main thread has the unique ability to be interrupted in between any opcode in the generated Python bytecode.
The Python standard library itself is not exception safe in a variety of places because of this quirk. It is also extremely difficult to write user space exception safe code in the main thread for the same reason. If deadlocks and other bad program states are undesirable, it is recommended to install a custom SIGINT
handler to facilitate cooperative multitasking between threads.
Task scheduling with threading
Spawning new threads can be done directly by creating a threading.Thread
object with the appropriate target
function to run. Then call thread.start()
, followed by thread.join()
when we want to synchronize with the completion of the thread. We may choose to schedule our three tasks as follows:
import threading
import time
task1_finished = False
task2_finished = False
task3_finished = False
def task1():
# Can run any time
global task1_finished
print("doing task1...")
time.sleep(3)
print("task1 done!")
task1_finished = True
def task2():
# Can run any time
global task2_finished
print("doing task2...")
time.sleep(1)
task2_finished = True
print("task2 done!")
def task3():
# Must run after task2
global task2_finished, task3_finished
assert task2_finished
print("doing task3...")
time.sleep(4)
task3_finished = True
print("task3 done!")
def subtask1():
task1()
def subtask2():
task2()
task3()
def run_all_tasks():
t1 = threading.Thread(target=subtask1)
t2 = threading.Thread(target=subtask2)
t1.start()
t2.start()
t1.join()
t2.join()
def main():
run_all_tasks()
assert task1_finished
assert task2_finished
assert task3_finished
main()
While some of the methods may seem extraneous now, separating the code into a main
and run_all_tasks
will make the examples later on easier to compare with this one.
Task scheduling with concurrent.futures.ThreadPoolExecutor
Remembering to join
with all of our threads, and in the right order when it matters, can be a source of many bugs. For this reason, it is often useful to use additional abstractions to organize our tasks for us. The following example makes use of a ThreadPoolExecutor
context manager, which automatically joins with any pending tasks when the context manager exits. Our code can simply focus on scheduling the tasks with respect to their logical dependencies:
from concurrent.futures import ThreadPoolExecutor
def run_all_tasks():
with ThreadPoolExecutor() as executor:
executor.submit(subtask1)
executor.submit(subtask2)
Under the hood (CPython)
Each Python thread, at the time of writing, is backed by an OS thread executing the Python virtual machine. The Python VM implements cooperative multitasking among all threads, centered around acquiring and releasing the Global Interpreter Lock (GIL). Importantly, only one Python thread will run at a time, as only one Python thread can hold the GIL at a time. Because the GIL is implemented as platform-provided lock(s), Python threads are ultimately scheduled by the underlying OS, not by the VM itself. For more details, see David Beazley’s 2010 presentation on the GIL.
Multiprocessing
The Python multiprocessing library has a similar interface to the threading
library, but spawns threads of execution backed by processes. For CPU-bound workloads, multiprocessing can effectively circumvent the GIL, enabling true parallelism of Python threads. Of note, care would need to be taken when spawning subprocesses to set up signal handling as desired by the spawning process, which can get ugly when dealing with different platforms. No example will be provided for multiprocessing our three tasks because of these complexities.
Event loops
Event loops are a fundamentally different concurrency model than either of Python’s threading
or multiprocessing
libraries. Whereas threading
and multiprocessing
provide thin abstractions over OS-provided threads and processes, an event loop library will typically provide a single event loop in which all threads of execution will run concurrently, on a single OS thread. The main Python thread will be used only briefly to initialize the event loop and install any custom signal handlers.
Notably, event loop libraries are almost always also I/O libraries. Networking, filesystem I/O, and synchronization (locks) are all blocking operations typically provided by an OS kernel, so additional non-blocking APIs need to be written around these OS APIs to interface with the event loop. Blocking operations will typically be serviced by a thread pool managed by the event loop.
Callbacks with Twisted
In 2002, an event loop framework for Python by the name of Twisted was released. It pioneered the use of callbacks to handle asynchronous operations inside of an event loop. The fundamental abstraction is a Twisted Deferred
, an object that represents the future result of some computation. To use Twisted to schedule our tasks, quite a few changes to our code are necessary.
The exact details of how Deferred
or callback chains work in Twisted will not be discussed. For further reading, consult Twisted’s documentation.
First, the tasks themselves need to be written to return Deferred
objects. This is fundamentally changing our tasks from being synchronous to asynchronous - rather than do their work directly when we run the task, each task will immediately return an object representing the future result of their work, and schedule that work to be done “somewhere else” (in the event loop):
def task1() -> defer.Deferred:
print("doing task1...")
d = sleep(3)
def task1_done(_) -> None:
global task1_finished
task1_finished = True
print("task1 done!")
d.addCallback(task1_done)
return d
Next, we need to declare our dependencies between tasks in terms of these new Deferred
objects, using callback chains:
def subtask1() -> defer.Deferred:
return task1()
def subtask2() -> defer.Deferred:
d = task2()
d.addCallback(lambda _: task3())
return d
def run_all_tasks() -> defer.DeferredList:
return defer.DeferredList([subtask1(), subtask2()])
Finally, we can run_all_tasks
inside of the event loop, and check that the tasks finished successfully. The complete code is below:
from typing import Any, List, Tuple
from twisted.internet import reactor, defer
from twisted.internet.task import react
task1_finished = False
task2_finished = False
task3_finished = False
def sleep(s: float) -> defer.Deferred:
d = defer.Deferred()
reactor.callLater(s, d.callback, None)
return d
def task1() -> defer.Deferred:
print("doing task1...")
d = sleep(3)
def task1_done(_) -> None:
global task1_finished
task1_finished = True
print("task1 done!")
d.addCallback(task1_done)
return d
def task2() -> defer.Deferred:
print("doing task2...")
d = sleep(1)
def task2_done(_) -> None:
global task2_finished
task2_finished = True
print("task2 done!")
d.addCallback(task2_done)
return d
def task3() -> defer.Deferred:
global task2_finished
assert task2_finished
print("doing task3...")
d = sleep(4)
def task3_done(_) -> None:
global task3_finished
task3_finished = True
print("task3 done!")
d.addCallback(task3_done)
return d
def subtask1() -> defer.Deferred:
return task1()
def subtask2() -> defer.Deferred:
d = task2()
d.addCallback(lambda _: task3())
return d
def run_all_tasks() -> defer.DeferredList:
return defer.DeferredList([subtask1(), subtask2()])
def main(reactor) -> defer.Deferred:
d = run_all_tasks()
def check_tasks_finished(result: List[Tuple[bool, Any]]) -> None:
# The individual Deferred objects report their status
for success, _ in result:
assert success
# Or we can also check the global state
assert task1_finished
assert task2_finished
assert task3_finished
d.addCallback(check_tasks_finished)
return d
react(main)
Before reading further, ask yourself the following: where does this code spawn new tasks? In other words, where is the work of the tasks actually created and scheduled?
There is only one line in this code that’s creating work to be done, and that’s the reactor.callLater
function call on line 11. All other code is chaining callbacks among the various Deferred
objects, which have their callback chains kicked off by the work scheduled from line 11.
If you think this code is verbose or hard to reason about, you’re not alone! Callback-based code has the following drawbacks:
- Reading code top-to-bottom doesn’t follow the actual execution of the program
Callback-based control flow breaks causality. That is, it must be specified what to do with the result of some task, before the task is actually run! If we added error handling to the above example, that logic would have to be specified “before” the task actually ran.
- The
reactor
is a global singleton
It’s hard to know from looking at a function if it will spawn some asynchronous background task, or not. Any line in the program can schedule work in the global event loop. This can make it very hard to track down an orphaned task in your program, which may be consuming unnecessary resources or preventing clean shutdown. (threads have this same weakness!)
Is there a better way?
Coroutines with asyncio
Fully ten years after Twisted’s initial release, the asyncio
library was released for Python 3.3, and implemented as part of the standard library in Python 3.4. It provided extremely similar primitives to Twisted (e.g. a Future
instead of a Deferred
), though integrated more closely with Python coroutines, enabling an alternative to callback-based asynchronous programming with the async
and await
keywords. (Twisted also supports coroutines, and could be used in a similar way as shown below)
As with Twisted, not a lot of explanation will be given for asyncio’s details. The standard library documentation is a good starting point to learn more.
Again, we need to change our tasks to play nicely with asyncio. Notice that we’re going to return values directly from our tasks, something that we couldn’t do with threads:
async def task1() -> bool:
print("doing task1...")
await asyncio.sleep(3)
print("task1 done!")
return True
And declare dependencies between the tasks when scheduled to run:
async def subtask1() -> bool:
return await task1()
async def subtask2() -> bool:
return (await task2()) and (await task3())
async def run_all_tasks() -> List[bool]:
return await asyncio.gather(subtask1(), subtask2())
Putting it all together:
import asyncio
from typing import List
task2_finished = False
async def task1() -> bool:
print("doing task1...")
await asyncio.sleep(3)
print("task1 done!")
return True
async def task2() -> bool:
print("doing task2...")
await asyncio.sleep(1)
global task2_finished
task2_finished = True
print("task2 done!")
return True
async def task3() -> bool:
global task2_finished
assert task2_finished
print("doing task3...")
await asyncio.sleep(4)
print("task3 done!")
return True
async def subtask1() -> bool:
return await task1()
async def subtask2() -> bool:
return (await task2()) and (await task3())
async def run_all_tasks() -> List[bool]:
return await asyncio.gather(subtask1(), subtask2())
async def main() -> None:
results = await run_all_tasks()
assert all(results)
asyncio.run(main())
If we’re using Python 3.11 or greater, we could also use asyncio’s new TaskGroup
, which will look similar to the ThreadPoolExecutor
above:
async def run_all_tasks() -> List[bool]:
tasks: List["asyncio.Task[bool]"] = []
async with asyncio.TaskGroup() as tg:
tasks.append(tg.create_task(subtask1()))
tasks.append(tg.create_task(subtask2()))
return [task.result() for task in tasks]
If you’re thinking to yourself, this looks extremely similar to the threading example above, but with these async def
functions instead of normal def
functions, then you’re onto something. It’s also almost half of the number of lines of the callback version. Async/await programming looks extremely similar to traditional synchronous programming, except every time an await
keyword is encountered, the program yields control to the event loop, so that multiple functions can run concurrently!
There are other advantages to using async/await and asyncio, as well, but first…
What color is your function?
One potential drawback of any event loop-based concurrency model in Python is that it naturally divides your code into two colors of functions: synchronous functions, and asynchronous functions. And mixing and matching between the two is painful. Asynchronous libraries and programs (such as Twisted and asyncio) tend to force all other libraries and code in the program to be asynchronous themselves. In the examples above, note that we were practically required to run all of our code inside of a top-level react
or asyncio.run
call. Synchronous libraries (or threading-based concurrency libraries) can feel easier to use from this perspective, because concurrency is opt-in: you can run synchronous functions concurrently, if you want, but the interfaces don't force you to.
Cancellation and timeouts
One of the necessary primitives that all concurrency libraries need to provide is the ability to cancel a thread of execution, for example to correctly handle a timeout in code that itself has spawned multiple additional threads of execution.
Much has been written about cancellation APIs. For a good overview of designing modern cancellation APIs from first principles, see Nathaniel Smith’s Timeouts and cancellation for humans. For an overview of Herb Sutter’s thoughts in 2008 (around the time that C# was first implementing cancellation tokens), see Interrupt Politely. I’ll be referencing these strategies below, so keep this table handy:
For our purposes, let’s consider how each of the examples above handles cancellation via Control-C. The cancellation could come from anywhere — a signal handler, some deeply nested task inside of the program, or a user clicking a “X” on a dialog box in the user interface, to give some examples — but Control-C is easiest to interact with from a terminal.
Cancellation using vanilla threads
Run the example using the threading
library above a few times and press Control-C once during program execution. What do you notice? You should notice that while a stack trace is printed for a KeyboardInterrupt
, the program doesn’t actually stop, and all three tasks complete. (A similar phenomenon is observable for the ThreadPoolExecutor
version, as well) What's going on?
$ python threading_example.py
doing task1...
doing task2...
^CTraceback (most recent call last):
File "threading_example.py", line 54, in <module>
main()
File "threading_example.py", line 49, in main
run_all_tasks()
File "threading_example.py", line 45, in run_all_tasks
t1.join()
File "/usr/local/lib/python3.7/threading.py", line 1044, in join
self._wait_for_tstate_lock()
File "/usr/local/lib/python3.7/threading.py", line 1060, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
task2 done!
doing task3...
task1 done!
task3 done!
Threads, in Python as in other languages and operating systems, have no innate way to communicate with one another. When the KeyboardInterrupt
happens on the main thread, the background threads running the three tasks happily continue. The reason the program doesn’t immediately exit is because the Python interpreter itself joins with all background (non-daemon) threads on shutdown, even though the KeyboardInterrupt
killed the main Python thread.
So, while Python threads out-of-the-box have no way to cancel, they do ensure that every task gets to run to completion and clean up! This is more like a non-cancellation-strategy, and doesn’t fit on Herb’s table.
Cancellation using vanilla Twisted
As before, run the Twisted example a few times, and Control-C once during execution. What happens? (You may need to pip install twisted
to run the Twisted example. Twisted requires Python >= 3.7.)
$ python twisted_example.py
doing task1...
doing task2...
task2 done!
doing task3...
task1 done!
^C$ python twisted_example.py
doing task1...
doing task2...
^C$
From the looks of it, Twisted is immediately shutting down its event loop when it receives Control-C. No amount of error handling in your task callbacks will save you from this destructive behavior; your tasks are left to be garbage collected by the interpreter. What gives?
Fundamentally, callbacks (Deferred
) are a distinct concept from the reactor
in which work actually happens. They don’t know about each other. There’s no concept of a “task” in Twisted, so when the event loop is told to shut down, it rightfully believes there’s nothing to wait for, and shuts down.
Twisted, out-of-the-box, behaves the opposite of threads: if the program is asked to stop, all tasks will be cancelled immediately, with no opportunity to clean up! In Herb’s table, this is the 1. Kill strategy.
It’s worth noting that Deferred
does have a cancellation API, as well as a timeout API, for handling “in-band” cancellation, that is, cancellation that originates from within the event loop. To handle “out-of-band” cancellation in Twisted, such as an OS signal, would require additional user space code.
Cancellation using vanilla asyncio
(and specifically, asyncio.run
)
Now run the asyncio
example, injecting Control-C at various points.
…
Huh? It’s behaving the same as Twisted?
One thing to notice in our example is that our task functions don’t define any error handling. Async/await programming enables us to write error handling as-if the function was synchronous, using traditional try
-except
-finally
statements that are easy to reason about causally. Let’s update our tasks:
import sys
async def task1() -> bool:
try:
print("doing task1...")
await asyncio.sleep(3)
except:
print(f"task1 exception: {sys.exc_info()}")
return False
else:
return True
finally:
print("task1 done!")
These tasks will never raise exceptions on their own, but rather return False
and log the exception encountered. (What if our tasks could organically raise exceptions, as is the case in most real code? Read about asyncio.gather()
, ExceptionGroup
, Python 3.11’s new except*
syntax, and the history of Trio’s MultiError
, if you’re interested!)
Now we should observe proper cancellation of our tasks:
$ python asyncio_example_cancel.py
doing task1...
doing task2...
^Ctask1 exception: (<class 'concurrent.futures._base.CancelledError'>, CancelledError(), <traceback object at 0x7fa062c2b9b0>)
task1 done!
task2 exception: (<class 'concurrent.futures._base.CancelledError'>, CancelledError(), <traceback object at 0x7fa062c2b960>)
task2 done!
Traceback (most recent call last):
File "asyncio_example_cancel.py", line 60, in <module>
asyncio.run(main())
File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 574, in run_until_complete
self.run_forever()
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 541, in run_forever
self._run_once()
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 1750, in _run_once
event_list = self._selector.select(timeout)
File "/usr/local/lib/python3.7/selectors.py", line 468, in select
fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt
Play around with it — any tasks that have been scheduled, will be cancelled. Any tasks not scheduled, will not run at all. asyncio.run
behaves intuitively, cancelling all pending tasks by raising CancelledError
inside of them, and then letting them run to completion, before re-raising KeyboardInterrupt
inside of the asyncio library, eventually causing program termination. If our tasks wanted, they could implement cleanup code inside of except
or finally
blocks, and they’re now robust to in-band or out-of-band cancellation. There’s even a (confusing) way in asyncio to ignore the cancellation and continue anyway, making this strategy most like 3. Ask in Herb’s table.
Can we do better?
Doing better requires an understanding of what is problematic with these strategies. Three specific problems from the above paradigms are of note:
- Threads, by default, have no inter-thread communication, rendering cancellation useless
asyncio
is at times confusing to use, or downright incorrect, a historical artifact of supporting both callback-based and coroutine-based concurrency- Threading libraries and
asyncio
both have global task spawning interfaces. That is, any code, in any function, can spawn a long-lived background task, without communicating that to its caller.
There are some alternatives, not mutually exclusive, modeled in various languages and libraries today.
Cancellation Tokens
C# (.NET 4 and above) and C++20 both support the concept of cancellation tokens — a primitive used to implement cooperative multitasking among multiple threads of execution, or Herb’s 4. Flag strategy. C# has taken this quite far, with rich integration with its standard library to enable cooperative multitasking in traditional, synchronous I/O APIs.
C++20 introduced std::jthread
, a “joinable thread”, with interfaces for passing cancellation tokens between std::jthread
objects and their callers, enabling cooperative multitasking among C++ threads. While std::condition_variable_any
is the only C++ standard library interface that integrates with these cancellation tokens today, it is theoretically possible to implement one's own cooperative multithreading library on top of these primitives. (The C++ standard library, as Herb notes in the std::jthread
proposal, will likely never widely support cancellation tokens for exception safety reasons)
The same could be done in Python. Using threading.Event
or a similar primitive, it is theoretically possible to write a bunch of I/O primitives that support cancellation tokens, building a cooperative multithreading library that would look a lot like an event loop library in terms of creating checkpoints for cancellation, but preserve the color of all functions to “look” synchronous.
Aside: Is 4. Flag really a better cancellation strategy than 3. Ask?
Herb certainly thinks so, saying in favor of 4. Flag that:
people keep forgetting to check for those [cancellation] exceptions; no matter how much they are trained, they cannot remember that every wait/sleep/join might throw an InterruptedException/thread_interrupted. The whole .NET Frameworks, written by people who invented their Thread.Interrupt and were disciplined about framework quality and safety, is interruption-unsafe because they could not consistently remember…
But Nathaniel Smith argues human laziness is a reason to prefer 3. Ask:
a correct and robust program has to support cancellation in every function that ever does I/O, anywhere in your stack. If you ever get lazy and leave it out, or just forget to pass it through to any particular subroutine call, then you have a latent bug. Humans suck at this kind of boilerplate… Maybe you depend on some library that someone else wrote — how much do you trust your third-party vendors to get this right?
Structured concurrency
Some of the latest 2022 innovations in asyncio such as asyncio.timeout()
and asyncio.TaskGroup
are heavily inspired by the work done in the Python community on libraries such as Curio and Trio since around 2015. These alternative event loop libraries have been exploring the idea of structured concurrency, which boils down to removing that global task spawning primitive from programmers' toolboxes so that everyone can reason more effectively about concurrent program flow. Using Trio, it’s simply not possible to do something like loop.create_task
like you can in asyncio
. All tasks must spawn from the top-level task, and any concurrent subtasks must be joined before program flow can continue:
import sys
import trio
async def some_other_unrelated_function() -> None:
# The *only* way to spawn new threads of execution is through a nursery.
# These tasks implicitly inherit their parent task context and
# cancellation scope, rendering it impossible to "lose track" of threads
# of execution or "escape" from timeouts and cancellations by spawning
# new threads.
async with trio.open_nursery() as nursery:
nursery.start_soon(trio.sleep, 1)
nursery.start_soon(trio.sleep, 2)
async def task(s: float) -> None:
try:
print(f"doing task...")
with trio.fail_after(2*s):
await some_other_unrelated_function()
await trio.sleep(s)
except:
# Will print:
# task exception: (<class 'trio.TooSlowError'>, TooSlowError(), ...)
print(f"task exception: {sys.exc_info()}")
finally:
print("task finished")
async def main():
await task(1)
trio.run(main)
While this example looks conceptually similar to one using asyncio.TaskGroup
or ThreadPoolExecutor
, Trio has taken the liberty to remove all task-spawning footguns from its public interface. If you use Trio, you can rest assured that all tasks are accounted for.
Which concurrency paradigm to choose?
Herb’s table does not include Python, but it is worth considering how the various concurrency libraries covered here fit into it. Before presenting this table, it’s important to note that each of these libraries has numerous real-world examples of achieving success, and the recommendations column is not meant to say that writing good software with a particular library is impossible, merely harder to reason about, or prone to certain classes of concurrency bugs.
In closing, I’ll quote Nathaniel Smith from 2016 discussing whether to use these new concurrency tools, with some thoughts in brackets:
If you want to start using the async/await-native approach today, then curio is currently the only game in town. [In 2022, Trio has practically superseded Curio, asyncio is integrating features from Trio/Curio into the standard library, and AnyIO enables you to “write once, run anywhere” on top of asyncio or Trio. It has never been cheaper to experiment.]
But even if you agree that this is where we want to end up eventually, there are still very good reasons why you might decide not to switch yet. Twisted and tornado are extremely mature, asyncio is in the standard library, and curio [or trio] is neither of those things. All have seen years of intensive development by lots of very smart people and are in production use at companies you’ve heard of; curio is currently experimental alpha-status software by basically one guy [trio is only slightly better]. There’s a much larger ecosystem of supporting libraries around twisted/tornado/asyncio than curio [or trio]. And while the callback-based paradigm has its faults, those faults and their magnitude is well-understood with known workarounds, while the “curio [structured concurrency] paradigm” is still under heavy development, and curio-the-software doesn’t yet make any promises about API stability.
On the other hand, if you find the async/await-native programming model compelling, and want to help flesh out a new paradigm, aren’t reliant on the existing ecosystems (or are excited to help build a new one), and are comfortable with the risks, then you should totally go for it. Help us stride forward into a glorious
Future
-free future! Even if curio [or Trio, or asyncio] doesn’t end up being the async/await-native API to end all APIs, we’ll still learn something from the attempt.