Trio: async programming for humans and snake people

P.S. your API is a user interface – Kenneth Reitz

The Trio project’s goal is to produce a production-quality, permissively licensed, async/await-native I/O library for Python. Like all async libraries, its main purpose is to help you write programs that do multiple things at the same time with parallelized I/O. A web spider that wants to fetch lots of pages in parallel, a web server that needs to juggle lots of downloads and websocket connections at the same time, a process supervisor monitoring multiple subprocesses… that sort of thing. Compared to other libraries, Trio attempts to distinguish itself with an obsessive focus on usability and correctness. Concurrency is complicated; we try to make it easy to get things right.

Trio was built from the ground up to take advantage of the latest Python features, and draws inspiration from many sources, in particular Dave Beazley’s Curio. The resulting design is radically simpler than older competitors like asyncio and Twisted, yet just as capable. Trio is the Python I/O library I always wanted; I find it makes building I/O-oriented programs easier, less error-prone, and just plain more fun. Perhaps you’ll find the same.

This project is young and still somewhat experimental: the overall design is solid and the existing features are fully tested and documented, but you may encounter missing functionality or rough edges. We do encourage you do use it, but you should read and subscribe to issue #1 to get warning and a chance to give feedback about any compatibility-breaking changes.

Vital statistics:

Tutorial

Welcome to the Trio tutorial! Trio is a modern Python library for writing asynchronous applications – that is, programs that want to do multiple things at the same time with parallelized I/O, like a web spider that fetches lots of pages in parallel, a web server juggling lots of simultaneous downloads… that sort of thing. Here we’ll try to give a gentle introduction to asynchronous programming with Trio.

We assume that you’re familiar with Python in general, but don’t worry – we don’t assume you know anything about asynchronous programming or Python’s new async/await feature.

Also, unlike many async/await tutorials, we assume that your goal is to use Trio to write interesting programs, so we won’t go into the nitty-gritty details of how async/await is implemented inside the Python interpreter. The word “coroutine” is never mentioned. The fact is, you really don’t need to know any of that stuff unless you want to implement a library like Trio, so we leave it out (though we’ll throw in a few links for those who want to dig deeper).

Okay, ready? Let’s get started.

Before you begin

  1. Make sure you’re using Python 3.5 or newer.
  2. python3 -m pip install --upgrade trio (or on Windows, maybe py -3 -m pip install --upgrade triodetails)
  3. Can you import trio? If so then you’re good to go!

If you get lost or confused…

…then we want to know! We have a friendly chat channel, you can ask questions using the “trio” tag on StackOverflow, or just file a bug (if our documentation is confusing, that’s our fault, and we want to fix it!).

Async functions

Python 3.5 added a major new feature: async functions. Using Trio is all about writing async functions, so let’s start there.

An async function is defined like a normal function, except you write async def instead of def:

# A regular function
def regular_double(x):
    return 2 * x

# An async function
async def async_double(x):
    return 2 * x

“Async” is short for “asynchronous”; we’ll sometimes refer to regular functions like regular_double as “synchronous functions”, to distinguish them from async functions.

From a user’s point of view, there are two differences between an async function and a regular function:

  1. To call an async function, you have to use the await keyword. So instead of writing regular_double(3), you write await async_double(3).

  2. You can’t use the await keyword inside the body of a regular function. If you try it, you’ll get a syntax error:

    def print_double(x):
        print(await async_double(x))   # <-- SyntaxError here
    

    But inside an async function, await is allowed:

    async def print_double(x):
        print(await async_double(x))   # <-- OK!
    

Now, let’s think about the consequences here: if you need await to call an async function, and only async functions can use await… here’s a little table:

If a function like this wants to call a function like this is it gonna happen?
sync sync
sync async NOPE
async sync
async async

So in summary: As a user, the entire advantage of async functions over regular functions is that async functions have a superpower: they can call other async functions.

This immediately raises two questions: how, and why? Specifically:

When your Python program starts up, it’s running regular old sync code. So there’s a chicken-and-the-egg problem: once we’re running an async function we can call other async functions, but how do we call that first async function?

And, if the only reason to write an async function is that it can call other async functions, why on earth would we ever use them in the first place? I mean, as superpowers go this seems a bit pointless. Wouldn’t it be simpler to just… not use any async functions at all?

This is where an async library like Trio comes in. It provides two things:

  1. A runner function, which is a special synchronous function that takes and calls an asynchronous function. In Trio, this is trio.run:

    import trio
    
    async def async_double(x):
        return 2 * x
    
    trio.run(async_double, 3)  # returns 6
    

    So that answers the “how” part.

  2. A bunch of useful async functions – in particular, functions for doing I/O. So that answers the “why”: these functions are async, and they’re useful, so if you want to use them, you have to write async code. If you think keeping track of these async and await things is annoying, then too bad – you’ve got no choice in the matter! (Well, OK, you could just not use trio. That’s a legitimate option. But it turns out that the async/await stuff is actually a good thing, for reasons we’ll discuss a little bit later.)

    Here’s an example function that uses trio.sleep(). (trio.sleep() is like time.sleep(), but with more async.)

    import trio
    
    async def double_sleep(x):
        await trio.sleep(2 * x)
    
    trio.run(double_sleep, 3)  # does nothing for 6 seconds then returns
    

So it turns out our async_double function is actually a bad example. I mean, it works, it’s fine, there’s nothing wrong with it, but it’s pointless: it could just as easily be written as a regular function, and it would be more useful that way. double_sleep is a much more typical example: we have to make it async, because it calls another async function. The end result is a kind of async sandwich, with trio on both sides and our code in the middle:

trio.run -> double_sleep -> trio.sleep

This “sandwich” structure is typical for async code; in general, it looks like:

trio.run -> [async function] -> ... -> [async function] -> trio.whatever

It’s exactly the functions on the path between trio.run() and trio.whatever that have to be async. Trio provides the async bread, and then your code makes up the async sandwich’s tasty async filling. Other functions (e.g., helpers you call along the way) should generally be regular, non-async functions.

Warning: don’t forget that await!

Now would be a good time to open up a Python prompt and experiment a little with writing simple async functions and running them with trio.run.

At some point in this process, you’ll probably write some code like this, that tries to call an async function but leaves out the await:

import time
import trio

async def broken_double_sleep(x):
    print("*yawn* Going to sleep")
    start_time = time.monotonic()

    # Whoops, we forgot the 'await'!
    trio.sleep(2 * x)

    sleep_time = time.monotonic() - start_time
    print("Woke up after {:.2f} seconds, feeling well rested!".format(sleep_time))

trio.run(broken_double_sleep, 3)

You might think that Python would raise an error here, like it does for other kinds of mistakes we sometimes make when calling a function. Like, if we forgot to pass trio.sleep() it’s required argument, then we would get a nice TypeError saying so. But unfortunately, if you forget an await, you don’t get that. What you actually get is:

>>> trio.run(broken_double_sleep, 3)
*yawn* Going to sleep
Woke up again after 0.00 seconds, feeling well rested!
__main__:4: RuntimeWarning: coroutine 'sleep' was never awaited
>>>

This is clearly broken – 0.00 seconds is not long enough to feel well rested! Yet the code acts like it succeeded – no exception was raised. The only clue that something went wrong is that it prints RuntimeWarning: coroutine 'sleep' was never awaited. Also, the exact place where the warning is printed might vary, because it depends on the whims of the garbage collector. If you’re using PyPy, you might not even get a warning at all until the next GC collection runs:

# On PyPy:
>>>> trio.run(broken_double_sleep, 3)
*yawn* Going to sleep
Woke up again after 0.00 seconds, feeling well rested!
>>>> # what the ... ?? not even a warning!

>>>> # but forcing a garbage collection gives us a warning:
>>>> import gc
>>>> gc.collect()
/home/njs/pypy-3.5-nightly/lib-python/3/importlib/_bootstrap.py:191: RuntimeWarning: coroutine 'sleep' was never awaited
if _module_locks.get(name) is wr:    # XXX PyPy fix?
0
>>>>

(If you can’t see the warning above, try scrolling right.)

Forgetting an await like this is an incredibly common mistake. You will mess this up. Everyone does. And Python will not help you as much as you’d hope 😞. The key thing to remember is: if you see the magic words RuntimeWarning: coroutine '...' was never awaited, then this always means that you made the mistake of leaving out an await somewhere, and you should ignore all the other error messages you see and go fix that first, because there’s a good chance the other stuff is just collateral damage. I’m not even sure what all that other junk in the PyPy output is. Fortunately I don’t need to know, I just need to fix my function!

(“I thought you said you weren’t going to mention coroutines!” Yes, well, I didn’t mention coroutines, Python did. Take it up with Guido! But seriously, this is unfortunately a place where the internal implementation details do leak out a bit.)

Why does this happen? In Trio, every time we use await it’s to call an async function, and every time we call an async function we use await. But Python’s trying to keep its options open for other libraries that are ahem a little less organized about things. So while for our purposes we can think of await trio.sleep(...) as a single piece of syntax, Python thinks of it as two things: first a function call that returns this weird “coroutine” object:

>>> trio.sleep(3)
<coroutine object sleep at 0x7f5ac77be6d0>

and then that object gets passed to await, which actually runs the function. So if you forget await, then two bad things happen: your function doesn’t actually get called, and you get a “coroutine” object where you might have been expecting something else, like a number:

>>> async_double(3) + 1
TypeError: unsupported operand type(s) for +: 'coroutine' and 'int'

If you didn’t already mess this up naturally, then give it a try on purpose: try writing some code with a missing await, or an extra await, and see what you get. This way you’ll be prepared for when it happens to you for real.

And remember: watch out for RuntimeWarning: coroutine '...' was never awaited; it means you need to find and fix your missing await.

Okay, let’s see something cool already

So now we’ve started using trio, but so far all we’ve learned to do is write functions that print things and sleep for various lengths of time. Interesting enough, but we could just as easily have done that with time.sleep(). async/await is useless!

Well, not really. Trio has one more trick up its sleeve, that makes async functions more powerful than regular functions: it can run multiple async function at the same time. Here’s an example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# tasks-intro.py

import trio

async def child1():
    print("  child1: started! sleeping now...")
    await trio.sleep(1)
    print("  child1: exiting!")

async def child2():
    print("  child2: started! sleeping now...")
    await trio.sleep(1)
    print("  child2: exiting!")

async def parent():
    print("parent: started!")
    async with trio.open_nursery() as nursery:
        print("parent: spawning child1...")
        nursery.start_soon(child1)

        print("parent: spawning child2...")
        nursery.start_soon(child2)

        print("parent: waiting for children to finish...")
        # -- we exit the nursery block here --
    print("parent: all done!")

trio.run(parent)

There’s a lot going on in here, so we’ll take it one step at a time. In the first part, we define two async functions child1 and child2. These should look familiar from the last section:

 5
 6
 7
 8
 9
10
11
12
13
async def child1():
    print("  child1: started! sleeping now...")
    await trio.sleep(1)
    print("  child1: exiting!")

async def child2():
    print("  child2: started! sleeping now...")
    await trio.sleep(1)
    print("  child2: exiting!")

Next, we define parent as an async function that’s going to call child1 and child2 concurrently:

15
16
17
18
19
20
21
22
23
24
25
26
async def parent():
    print("parent: started!")
    async with trio.open_nursery() as nursery:
        print("parent: spawning child1...")
        nursery.start_soon(child1)

        print("parent: spawning child2...")
        nursery.start_soon(child2)

        print("parent: waiting for children to finish...")
        # -- we exit the nursery block here --
    print("parent: all done!")

It does this by using a mysterious async with statement to create a “nursery”, and then “spawns” child1 and child2 into the nursery.

Let’s start with this async with thing. It’s actually pretty simple. In regular Python, a statement like with someobj: ... instructs the interpreter to call someobj.__enter__() at the beginning of the block, and to call someobj.__exit__() at the end of the block. We call someobj a “context manager”. An async with does exactly the same thing, except that where a regular with statement calls regular methods, an async with statement calls async methods: at the start of the block it does await someobj.__aenter__() and at that end of the block it does await someobj.__aexit__(). In this case we call someobj an “async context manager”. So in short: with blocks are a shorthand for calling some functions, and since with async/await Python now has two kinds of functions, it also needs two kinds of with blocks. That’s all there is to it! If you understand async functions, then you understand async with.

Note

This example doesn’t use them, but while we’re here we might as well mention the one other piece of new syntax that async/await added: async for. It’s basically the same idea as async with versus with: An async for loop is just like a for loop, except that where a for loop does iterator.__next__() to fetch the next item, an async for does await async_iterator.__anext__(). Now you understand all of async/await. Basically just remember that it involves making sandwiches and sticking the word “async” in front of everything, and you’ll do fine.

Now that we understand async with, let’s look at parent again:

15
16
17
18
19
20
21
22
23
24
25
26
async def parent():
    print("parent: started!")
    async with trio.open_nursery() as nursery:
        print("parent: spawning child1...")
        nursery.start_soon(child1)

        print("parent: spawning child2...")
        nursery.start_soon(child2)

        print("parent: waiting for children to finish...")
        # -- we exit the nursery block here --
    print("parent: all done!")

There are only 4 lines of code that really do anything here. On line 17, we use trio.open_nursery() to get a “nursery” object, and then inside the async with block we call nursery.start_soon twice, on lines 19 and 22. There are actually two ways to call an async function: the first one is the one we already saw, using await async_fn(); the new one is nursery.start_soon(async_fn): it asks trio to start running this async function, but then returns immediately without waiting for the function to finish. So after our two calls to nursery.start_soon, child1 and child2 are now running in the background. And then at line 25, the commented line, we hit the end of the async with block, and the nursery’s __aexit__ function runs. What this does is force parent to stop here and wait for all the children in the nursery to exit. This is why you have to use async with to get a nursery: it gives us a way to make sure that the child calls can’t run away and get lost. One reason this is important is that if there’s a bug or other problem in one of the children, and it raises an exception, then it lets us propagate that exception into the parent; in many other frameworks, exceptions like this are just discarded. Trio never discards exceptions.

Ok! Let’s try running it and see what we get:

parent: started!
parent: spawning child1...
parent: spawning child2...
parent: waiting for children to finish...
  child2: started! sleeping now...
  child1: started! sleeping now...
    [... 1 second passes ...]
  child1: exiting!
  child2: exiting!
parent: all done!

(Your output might have the order of the “started” and/or “exiting” lines swapped compared to to mine.)

Notice that child1 and child2 both start together and then both exit together, and that the whole program only takes 1 second to run, even though we made two calls to trio.sleep(1), which should take two seconds in total. So it looks like child1 and child2 really are running at the same time!

Now, if you’re familiar with programming using threads, this might look familiar – and that’s intentional. But it’s important to realize that there are no threads here. All of this is happening in a single thread. To remind ourselves of this, we use slightly different terminology: instead of spawning two “threads”, we say that we spawned two “tasks”. There are two differences between tasks and threads: (1) many tasks can take turns running on a single thread, and (2) with threads, the Python interpreter/operating system can switch which thread is running whenever they feel like it; with tasks, we can only switch at certain designated places we call “checkpoints”. In the next section, we’ll dig into what this means.

Task switching illustrated

The big idea behind async/await-based libraries like trio is to run lots of tasks simultaneously on a single thread by switching between them at appropriate places – so for example, if we’re implementing a web server, then one task could be sending an HTTP response at the same time as another task is waiting for new connections. If all you want to do is use trio, then you don’t need to understand all the nitty-gritty detail of how this switching works – but it’s very useful to have at least a general intuition about what trio is doing “under the hood” when your code is executing. To help build that intuition, let’s look more closely at how trio ran our example from the last section.

Fortunately, trio provides a rich set of tools for inspecting and debugging your programs. Here we want to watch trio.run() at work, which we can do by writing a class we’ll call Tracer, which implements trio’s Instrument interface. Its job is to log various events as they happen:

class Tracer(trio.abc.Instrument):
    def before_run(self):
        print("!!! run started")

    def _print_with_task(self, msg, task):
        # repr(task) is perhaps more useful than task.name in general,
        # but in context of a tutorial the extra noise is unhelpful.
        print("{}: {}".format(msg, task.name))

    def task_spawned(self, task):
        self._print_with_task("### new task spawned", task)

    def task_scheduled(self, task):
        self._print_with_task("### task scheduled", task)

    def before_task_step(self, task):
        self._print_with_task(">>> about to run one step of task", task)

    def after_task_step(self, task):
        self._print_with_task("<<< task step finished", task)

    def task_exited(self, task):
        self._print_with_task("### task exited", task)

    def before_io_wait(self, timeout):
        if timeout:
            print("### waiting for I/O for up to {} seconds".format(timeout))
        else:
            print("### doing a quick check for I/O")
        self._sleep_time = trio.current_time()

    def after_io_wait(self, timeout):
        duration = trio.current_time() - self._sleep_time
        print("### finished I/O check (took {} seconds)".format(duration))

    def after_run(self):
        print("!!! run finished")

Then we re-run our example program from the previous section, but this time we pass trio.run() a Tracer object:

trio.run(parent, instruments=[Tracer()])

This generates a lot of output, so we’ll go through it one step at a time.

First, there’s a bit of chatter while trio gets ready to run our code. Most of this is irrelevant to us for now, but in the middle you can see that trio has created a task for the __main__.parent function, and “scheduled” it (i.e., made a note that it should be run soon):

$ python3 tutorial/tasks-with-trace.py
!!! run started
### new task spawned: <init>
### task scheduled: <init>
### doing a quick check for I/O
### finished I/O check (took 1.1122087016701698e-05 seconds)
>>> about to run one step of task: <init>
### new task spawned: <call soon task>
### task scheduled: <call soon task>
### new task spawned: __main__.parent
### task scheduled: __main__.parent
<<< task step finished: <init>
### doing a quick check for I/O
### finished I/O check (took 6.4980704337358475e-06 seconds)

Once the initial housekeeping is done, trio starts running the parent function, and you can see parent creating the two child tasks. Then it hits the end of the async with block, and pauses:

>>> about to run one step of task: __main__.parent
parent: started!
parent: spawning child1...
### new task spawned: __main__.child1
### task scheduled: __main__.child1
parent: spawning child2...
### new task spawned: __main__.child2
### task scheduled: __main__.child2
parent: waiting for children to finish...
<<< task step finished: __main__.parent

Control then goes back to trio.run(), which logs a bit more internal chatter:

>>> about to run one step of task: <call soon task>
<<< task step finished: <call soon task>
### doing a quick check for I/O
### finished I/O check (took 5.476875230669975e-06 seconds)

And then gives the two child tasks a chance to run:

>>> about to run one step of task: __main__.child2
  child2 started! sleeping now...
<<< task step finished: __main__.child2

>>> about to run one step of task: __main__.child1
  child1: started! sleeping now...
<<< task step finished: __main__.child1

Each task runs until it hits the call to trio.sleep(), and then suddenly we’re back in trio.run() deciding what to run next. How does this happen? The secret is that trio.run() and trio.sleep() work together to make it happen: trio.sleep() has access to some special magic that lets it pause its entire callstack, so it sends a note to trio.run() requesting to be woken again after 1 second, and then suspends the task. And once the task is suspended, Python gives control back to trio.run(), which decides what to do next. (If this sounds similar to the way that generators can suspend execution by doing a yield, then that’s not a coincidence: inside the Python interpreter, there’s a lot of overlap between the implementation of generators and async functions.)

Note

You might wonder whether you can mix-and-match primitives from different async libraries. For example, could we use trio.run() together with asyncio.sleep()? The answer is no, we can’t, and the paragraph above explains why: the two sides of our async sandwich have a private language they use to talk to each other, and different libraries use different languages. So if you try to call asyncio.sleep() from inside a trio.run(), then trio will get very confused indeed and probably blow up in some dramatic way.

Only async functions have access to the special magic for suspending a task, so only async functions can cause the program to switch to a different task. What this means if a call doesn’t have an await on it, then you know that it can’t be a place where your task will be suspended. This makes tasks much easier to reason about than threads, because there are far fewer ways that tasks can be interleaved with each other and stomp on each others’ state. (For example, in trio a statement like a += 1 is always atomic – even if a is some arbitrarily complicated custom object!) Trio also makes some further guarantees beyond that, but that’s the big one.

And now you also know why parent had to use an async with to open the nursery: if we had used a regular with block, then it wouldn’t have been able to pause at the end and wait for the children to finish; we need our cleanup function to be async, which is exactly what async with gives us.

Now, back to our execution trace. To recap: at this point parent is waiting on child1 and child2, and both children are sleeping. So trio.run() checks its notes, and sees that there’s nothing to be done until those sleeps finish – unless possibly some external I/O event comes in. If that happened, then it might give us something to do. Of course we aren’t doing any I/O here so it won’t happen, but in other situations it could. So next it calls an operating system primitive to put the whole process to sleep:

### waiting for I/O for up to 0.9999009938910604 seconds

And in fact no I/O does arrive, so one second later we wake up again, and trio checks its notes again. At this point it checks the current time, compares it to the notes that trio.sleep() sent saying when the two child tasks should be woken up again, and realizes that they’ve slept for long enough, so it schedules them to run soon:

### finished I/O check (took 1.0006483688484877 seconds)
### task scheduled: __main__.child1
### task scheduled: __main__.child2

And then the children get to run, and this time they run to completion. Remember how parent is waiting for them to finish? Notice how parent gets scheduled when the first child exits:

>>> about to run one step of task: __main__.child1
  child1: exiting!
### task scheduled: __main__.parent
### task exited: __main__.child1
<<< task step finished: __main__.child1

>>> about to run one step of task: __main__.child2
  child2 exiting!
### task exited: __main__.child2
<<< task step finished: __main__.child2

Then, after another check for I/O, parent wakes up. The nursery cleanup code notices that all its children have exited, and lets the nursery block finish. And then parent makes a final print and exits:

### doing a quick check for I/O
### finished I/O check (took 9.045004844665527e-06 seconds)

>>> about to run one step of task: __main__.parent
parent: all done!
### task scheduled: <init>
### task exited: __main__.parent
<<< task step finished: __main__.parent

And finally, after a bit more internal bookkeeping, trio.run() exits too:

### doing a quick check for I/O
### finished I/O check (took 5.996786057949066e-06 seconds)
>>> about to run one step of task: <init>
### task scheduled: <call soon task>
### task scheduled: <init>
<<< task step finished: <init>
### doing a quick check for I/O
### finished I/O check (took 6.258022040128708e-06 seconds)
>>> about to run one step of task: <call soon task>
### task exited: <call soon task>
<<< task step finished: <call soon task>
>>> about to run one step of task: <init>
### task exited: <init>
<<< task step finished: <init>
!!! run finished

You made it!

That was a lot of text, but again, you don’t need to understand everything here to use trio – in fact, trio goes to great lengths to make each task feel like it executes in a simple, linear way. (Just like your operating system goes to great lengths to make it feel like your single-threaded code executes in a simple linear way, even though under the covers the operating system juggles between different threads and processes in essentially the same way trio does.) But it is useful to have a rough model in your head of how the code you write is actually executed, and – most importantly – the consequences of that for parallelism.

Alternatively, if this has just whetted your appetite and you want to know more about how async/await works internally, then this blog post is a good deep dive, or check out this great walkthrough to see how to build a simple async I/O framework from the ground up.

A kinder, gentler GIL

Speaking of parallelism – let’s zoom out for a moment and talk about how async/await compares to other ways of handling concurrency in Python.

As we’ve already noted, trio tasks are conceptually rather similar to Python’s built-in threads, as provided by the threading module. And in all common Python implementations, threads have a famous limitation: the Global Interpreter Lock, or “GIL” for short. The GIL means that even if you use multiple threads, your code still (mostly) ends up running on a single core. People tend to find this frustrating.

But from trio’s point of view, the problem with the GIL isn’t that it restricts parallelism. Of course it would be nice if Python had better options for taking advantage of multiple cores, but that’s an extremely difficult problem to solve, and in the mean time there are lots of problems where a single core is totally adequate – or where if it isn’t, then process- or machine-level parallelism works fine.

No, the problem with the GIL is that it’s a lousy deal: we give up on using multiple cores, and in exchange we get… almost all the same challenges and mind bending bugs that come with real parallel programming, and – to add insult to injury – pretty poor scalability. Threads in Python just aren’t that appealing.

Trio doesn’t make your code run on multiple cores; in fact, as we saw above, it’s baked into trio’s design that you never have two tasks running at the same time. We’re not so much overcoming the GIL as embracing it. But if you’re willing to accept that, plus a bit of extra work to put these new async and await keywords in the right places, then in exchange you get:

  • Excellent scalability: trio can run 10,000+ tasks simultaneously without breaking a sweat, so long as their total CPU demands don’t exceed what a single core can provide. (This is common in, for example, network servers that have lots of clients connected, but only a few active at any given time.)
  • Fancy features: most threading systems are implemented in C and restricted to whatever features the operating system provides. In trio our logic is all in Python, which makes it possible to implement powerful and ergonomic features like trio’s cancellation system.
  • Code that’s easier to reason about: the await keyword means that potential task-switching points are explicitly marked within each function. This can make trio code dramatically easier to reason about than the equivalent program using threads.

Certainly it’s not appropriate for every app… but there are a lot of situations where the trade-offs here look pretty appealing.

There is one downside that’s important to keep in mind, though. Making checkpoints explicit gives you more control over how your tasks can be interleaved – but with great power comes great responsibility. With threads, the runtime environment is responsible for making sure that each thread gets its fair share of running time. With trio, if some task runs off and does stuff for seconds on end without executing a checkpoint, then… all your other tasks will just have to wait.

Here’s an example of how this can go wrong. Take our example from above, and replace the calls to trio.sleep() with calls to time.sleep(). If we run our modified program, we’ll see something like:

parent: started!
parent: spawning child1...
parent: spawning child2...
parent: waiting for children to finish...
  child2 started! sleeping now...
    [... pauses for 1 second ...]
  child2 exiting!
  child1: started! sleeping now...
    [... pauses for 1 second ...]
  child1: exiting!
parent: all done!

One of the major reasons why trio has such a rich instrumentation API is to make it possible to write debugging tools to catch issues like this.

Networking with trio

Now let’s take what we’ve learned and use it to do some I/O, which is where async/await really shines.

An echo client: low-level API

The traditional application for demonstrating network APIs is an “echo server”: a program that accepts arbitrary data from a client, and then sends that same data right back. Probably a more relevant example these days would be an application that does lots of concurrent HTTP requests, but trio doesn’t have an HTTP library yet, so we’ll stick with the echo server tradition.

To start with, here’s an example echo client, i.e., the program that will send some data at our echo server and get responses back:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# echo-client-low-level.py

import sys
import trio

# arbitrary, but:
# - must be in between 1024 and 65535
# - can't be in use by some other program on your computer
# - must match what we set in our echo server
PORT = 12345
# How much memory to spend (at most) on each call to recv. Pretty arbitrary,
# but shouldn't be too big or too small.
BUFSIZE = 16384

async def sender(client_sock):
    print("sender: started!")
    while True:
        data = b"async can sometimes be confusing, but I believe in you!"
        print("sender: sending {!r}".format(data))
        await client_sock.sendall(data)
        await trio.sleep(1)

async def receiver(client_sock):
    print("receiver: started!")
    while True:
        data = await client_sock.recv(BUFSIZE)
        print("receiver: got data {!r}".format(data))
        if not data:
            print("receiver: connection closed")
            sys.exit()

async def parent():
    print("parent: connecting to 127.0.0.1:{}".format(PORT))
    with trio.socket.socket() as client_sock:
        await client_sock.connect(("127.0.0.1", PORT))
        async with trio.open_nursery() as nursery:
            print("parent: spawning sender...")
            nursery.start_soon(sender, client_sock)

            print("parent: spawning receiver...")
            nursery.start_soon(receiver, client_sock)

trio.run(parent)

The overall structure here should be familiar, because it’s just like our last example: we have a parent task, which spawns two child tasks to do the actual work, and then at the end of the async with block it switches into full-time parenting mode while waiting for them to finish. But now instead of just calling trio.sleep(), the children use some of trio’s networking APIs.

Let’s look at the parent first:

32
33
34
35
36
37
38
39
40
41
async def parent():
    print("parent: connecting to 127.0.0.1:{}".format(PORT))
    with trio.socket.socket() as client_sock:
        await client_sock.connect(("127.0.0.1", PORT))
        async with trio.open_nursery() as nursery:
            print("parent: spawning sender...")
            nursery.start_soon(sender, client_sock)

            print("parent: spawning receiver...")
            nursery.start_soon(receiver, client_sock)

We’re using the trio.socket API to access network functionality. (If you know the socket module in the standard library, then trio.socket is very similar, just asyncified.) First we call trio.socket.socket() to create the socket object we’ll use to connect to the server, and we use a with block to make sure that it will be closed properly. (Trio is designed around the assumption that you’ll be using with blocks to manage resource cleanup – highly recommended!) Then we call connect to connect to the echo server. 127.0.0.1 is a magic IP address meaning “the computer I’m running on”, so (127.0.0.1, PORT) means that we want to connect to whatever program on the current computer is using PORT as its contact point. And then once the connection is made, we pass the connected client socket into the two child tasks. (This is also a good example of how nursery.start_soon lets you pass positional arguments to the spawned function.)

Our first task’s job is to send data to the server:

15
16
17
18
19
20
21
async def sender(client_sock):
    print("sender: started!")
    while True:
        data = b"async can sometimes be confusing, but I believe in you!"
        print("sender: sending {!r}".format(data))
        await client_sock.sendall(data)
        await trio.sleep(1)

It uses a loop that alternates between calling await client_sock.sendall(...) to send some data, and then sleeping for a second to avoid making the output scroll by too fast on your terminal.

And the second task’s job is to process the data the server sends back:

23
24
25
26
27
28
29
30
async def receiver(client_sock):
    print("receiver: started!")
    while True:
        data = await client_sock.recv(BUFSIZE)
        print("receiver: got data {!r}".format(data))
        if not data:
            print("receiver: connection closed")
            sys.exit()

It repeatedly calls await client_sock.recv(...) to get more data from the server, and then checks to see if the server has closed the connection. recv only returns an empty bytestring if the connection has been closed; if there’s no data available, then it blocks until more data arrives.

And now we’re ready to look at the server.

An echo server: low-level API

The server is a little trickier. As usual, let’s look at the whole thing, and then we’ll discuss the pieces:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# echo-server-low-level.py

import trio

# Port is arbitrary, but:
# - must be in between 1024 and 65535
# - can't be in use by some other program on your computer
# - must match what we set in our echo client
PORT = 12345
# How much memory to spend (at most) on each call to recv. Pretty arbitrary,
# but shouldn't be too big or too small.
BUFSIZE = 16384

async def echo_server(server_sock, ident):
    with server_sock:
        print("echo_server {}: started".format(ident))
        try:
            while True:
                data = await server_sock.recv(BUFSIZE)
                print("echo_server {}: received data {!r}".format(ident, data))
                if not data:
                    print("echo_server {}: connection closed".format(ident))
                    return
                print("echo_server {}: sending data {!r}".format(ident, data))
                await server_sock.sendall(data)
        except Exception as exc:
            # Unhandled exceptions will propagate into our parent and take
            # down the whole program. If the exception is KeyboardInterrupt,
            # that's what we want, but otherwise maybe not...
            print("echo_server {}: crashed: {!r}".format(ident, exc))

async def echo_listener(nursery):
    with trio.socket.socket() as listen_sock:
        # Notify the operating system that we want to receive connection
        # attempts at this address:
        await listen_sock.bind(("127.0.0.1", PORT))
        listen_sock.listen()
        print("echo_listener: listening on 127.0.0.1:{}".format(PORT))

        ident = 0
        while True:
            server_sock, _ = await listen_sock.accept()
            print("echo_listener: got new connection, spawning echo_server")
            ident += 1
            nursery.start_soon(echo_server, server_sock, ident)

async def parent():
    async with trio.open_nursery() as nursery:
        print("parent: spawning echo_listener")
        nursery.start_soon(echo_listener, nursery)

trio.run(parent)

Let’s start with echo_server. As we’ll see below, each time an echo client connects, our server will start a child task running echo_server; there might be lots of these running at once if lots of clients are connected. Its job is to read data from its particular client, and then echo it back. It should be pretty straightforward to understand, because it uses the same socket functions we saw in the last section:

14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async def echo_server(server_sock, ident):
    with server_sock:
        print("echo_server {}: started".format(ident))
        try:
            while True:
                data = await server_sock.recv(BUFSIZE)
                print("echo_server {}: received data {!r}".format(ident, data))
                if not data:
                    print("echo_server {}: connection closed".format(ident))
                    return
                print("echo_server {}: sending data {!r}".format(ident, data))
                await server_sock.sendall(data)
        except Exception as exc:
            # Unhandled exceptions will propagate into our parent and take
            # down the whole program. If the exception is KeyboardInterrupt,
            # that's what we want, but otherwise maybe not...
            print("echo_server {}: crashed: {!r}".format(ident, exc))

We take a socket object that’s connected to the client (so the data we pass to sendall on the client comes out of recv here, and vice-versa), plus ident which is just a unique number used to make the print output less confusing when there are multiple clients connected at the same time. Then we have our usual with block to make sure the socket gets closed, a try block discussed below, and finally the server loop which alternates between reading some data from the socket and then sending it back out again (unless the socket was closed, in which case we quit).

Remember that in trio, like Python in general, exceptions keep propagating until they’re caught. Here we think it’s plausible there might be unexpected exceptions, and we want to isolate that to making just this one task crash, without taking down the whole program. For example, if the client closes the connection at the wrong moment then it’s possible this code will end up calling sendall on a closed connection and get an OSError; that’s unfortunate, and in a more serious program we might want to handle it more explicitly, but it doesn’t indicate a problem for any other connections. On the other hand, if the exception is something like a KeyboardInterrupt, we do want that to propagate out into the parent task and cause the program to exit. To express this, we use a try block with an except Exception: handler.

But where do these echo_server tasks come from? An important part of writing a trio program is deciding how you want to organize your tasks. In the examples we’ve seen so far, this was simple, because the set of tasks was fixed. Here, we want to wait for clients to connect, and then start a new task for each one. The tricky part is that like we mentioned above, managing a nursery is a full time job: you don’t want the task that has the nursery and is supervising the child tasks to do anything else, like listen for new connections.

There’s a standard trick for handling this in trio: our parent task creates a nursery, spawns a child task to listen for new connections, and then passes the nursery object to the child task:

47
48
49
50
async def parent():
    async with trio.open_nursery() as nursery:
        print("parent: spawning echo_listener")
        nursery.start_soon(echo_listener, nursery)

Now echo_listener can start “siblings” instead of children – even though the echo_listener is the one spawning echo_server tasks, we end up with a task tree that looks like:

parent
│
├─ echo_listener
│
├─ echo_server 1
│
├─ echo_server 2
┆

This lets parent focus on supervising the children, echo_listener focus on listening for new connections, and each echo_server focus on handling a single client.

Once we know this trick, the listener code becomes pretty straightforward:

32
33
34
35
36
37
38
39
40
41
42
43
44
45
async def echo_listener(nursery):
    with trio.socket.socket() as listen_sock:
        # Notify the operating system that we want to receive connection
        # attempts at this address:
        await listen_sock.bind(("127.0.0.1", PORT))
        listen_sock.listen()
        print("echo_listener: listening on 127.0.0.1:{}".format(PORT))

        ident = 0
        while True:
            server_sock, _ = await listen_sock.accept()
            print("echo_listener: got new connection, spawning echo_server")
            ident += 1
            nursery.start_soon(echo_server, server_sock, ident)

We create a listen socket, start it listening, and then go into an infinite loop, accepting connections from clients and spawning an echo_server task to handle each one.

We don’t expect there to be any errors here in the listener code – if there are, it’s probably a bug, and probably means that our whole program is broken (a server that doesn’t accept connections isn’t very useful!). So we don’t have a catch-all try block here. In general, trio leaves it up to you to decide whether and how you want to handle exceptions.

Try it out

Open a few terminals, run echo-server-low-level.py in one, run echo-client-low-level.py in another, and watch the messages scroll by! When you get bored, you can exit by hitting control-C.

Some things to try:

  • Open another terminal, and run 2 clients at the same time.
  • See how the server reacts when you hit control-C on the client
  • See how the client reacts when you hit control-C on the server

Flow control in our echo client and server

Here’s a question you might be wondering about: why does our client use two separate tasks for sending and receiving, instead of a single task that alternates between them – like the server has? For example, our client could use a single task like:

# Can you spot the two problems with this code?
async def send_and_receive(client_sock):
    while True:
        data = ...
        await client_sock.sendall(data)
        received = await client_sock.recv(BUFSIZE)
        if not received:
            sys.exit()
        await trio.sleep(1)

It turns out there are two problems with this – one minor and one major. Both relate to flow control. The minor problem is that when we call recv here we’re not waiting for all the data to be available; recv returns as soon as any data is available. If data is small, then our operating systems / network / server will probably keep it all together in a single chunk, but there’s no guarantee. If the server sends hello then we might get hello, or hel lo, or h e l l o, or … bottom line, any time we’re expecting more than one byte of data, we have to be prepared to call recv multiple times.

And where this would go especially wrong is if we find ourselves in the situation where len(data) > BUFSIZE. On each pass through the loop, we send len(data) bytes, but only read at most BUFSIZE bytes. The result is something like a memory leak: we’ll end up with more and more data backed up in the network, until eventually something breaks.

We could fix this by keeping track of how much data we’re expecting at each moment, and then keep calling recv until we get it all:

expected = len(data)
while expected > 0:
    received = await client_sock.recv(BUFSIZE)
    if not received:
        sys.exit(1)
    expected -= len(received)

This is a bit cumbersome, but it would solve this problem.

There’s another problem, though, that’s deeper. We’re still alternating between sending and receiving. Notice that when we send data, we use await: this means that sending can potentially block. Why does this happen? Any data that we send goes first into an operating system buffer, and from there onto the network, and then another operating system buffer on the receiving computer, before the receiving program finally calls recv to take the data out of these buffers. If we call sendall with a small amount of data, then it goes into these buffers and sendall returns immediately. But if we send enough data fast enough, eventually the buffers fill up, and sendall will block until the remote side calls recv and frees up some space.

Now let’s think about this from the server’s point of view. Each time it calls recv, it gets some data that it needs to send back. And until it sends it back, the data is sitting around takes up memory. Computers have finite amounts of RAM, so if our server is well behaved then at some point it needs to stop calling recv until it gets rid of some of the old data by doing its own call to sendall. So for the server, really the only viable option is to alternate between receiving and sending.

But we need to remember that it’s not just the client’s call to sendall that might block: the server’s call to sendall can also get into a situation where it blocks until the client calls recv. So if the server is waiting for sendall to finish before it calls recv, and our client also waits for sendall to finish before it calls recv,… we have a problem! The client won’t call recv until the server has called recv, and the server won’t call recv until the client has called recv. If our client is written to alternate between sending and receiving, and the chunk of data it’s trying to send is large enough (e.g. 10 megabytes will probably do it in most configurations), then the two processes will deadlock.

Moral: trio gives you powerful tools to manage sequential and concurrent execution. In this example we saw that the server needs send and recv to alternate in sequence, while the client needs them to run concurrently, and both were straightforward to implement. But when you’re implementing network code like this then it’s important to think carefully about flow control and buffering, because it’s up to you to choose the right execution mode!

Other popular async libraries like Twisted and asyncio tend to paper over these kinds of issues by throwing in unbounded buffers everywhere. This can avoid deadlocks, but can introduce its own problems and in particular can make it difficult to keep memory usage and latency under control. While both approaches have their advantages, trio takes the position that it’s better to expose the underlying problem as directly as possible and provide good tools to confront it head-on.

Note

If you want to try and make the deadlock happen on purpose to see for yourself, and you’re using Windows, then you might need to split the sendall call up into two calls that each send half of the data. This is because Windows has a somewhat unusual way of handling buffering.

An echo client and server: higher-level API

TODO: Not implemented yet!

When things go wrong: timeouts, cancellation and exceptions in concurrent tasks

TODO: give an example using fail_after()

TODO: explain Cancelled

TODO: explain how cancellation is also used when one child raises an exception

TODO: show an example MultiError traceback and walk through its structure

TODO: maybe a brief discussion of KeyboardInterrupt handling?

Trio’s core functionality

Entering trio

If you want to use trio, then the first thing you have to do is call trio.run():

trio.run(async_fn, *args, clock=None, instruments=[], restrict_keyboard_interrupt_to_checkpoints=False)

Run a trio-flavored async function, and return the result.

Calling:

run(async_fn, *args)

is the equivalent of:

await async_fn(*args)

except that run() can (and must) be called from a synchronous context.

This is trio’s main entry point. Almost every other function in trio requires that you be inside a call to run().

Parameters:
  • async_fn – An async function.
  • args – Positional arguments to be passed to async_fn. If you need to pass keyword arguments, then use functools.partial().
  • clockNone to use the default system-specific monotonic clock; otherwise, an object implementing the trio.abc.Clock interface, like (for example) a trio.testing.MockClock instance.
  • instruments (list of trio.abc.Instrument objects) – Any instrumentation you want to apply to this run. This can also be modified during the run; see Instrument API.
  • restrict_keyboard_interrupt_to_checkpoints (bool) –

    What happens if the user hits control-C while run() is running? If this argument is False (the default), then you get the standard Python behavior: a KeyboardInterrupt exception will immediately interrupt whatever task is running (or if no task is running, then trio will wake up a task to be interrupted). Alternatively, if you set this argument to True, then KeyboardInterrupt delivery will be delayed: it will be only be raised at checkpoints, like a Cancelled exception.

    The default behavior is nice because it means that even if you accidentally write an infinite loop that never executes any checkpoints, then you can still break out of it using control-C. The the alternative behavior is nice if you’re paranoid about a KeyboardInterrupt at just the wrong place leaving your program in an inconsistent state, because it means that you only have to worry about KeyboardInterrupt at the exact same places where you already have to worry about Cancelled.

    This setting has no effect if your program has registered a custom SIGINT handler, or if run() is called from anywhere but the main thread (this is a Python limitation), or if you use catch_signals() to catch SIGINT.

Returns:

Whatever async_fn returns.

Raises:
  • TrioInternalError – if an unexpected error is encountered inside trio’s internal machinery. This is a bug and you should let us know.
  • Anything else – if async_fn raises an exception, then run() propagates it.

General principles

Checkpoints

When writing code using trio, it’s very important to understand the concept of a checkpoint. Many of trio’s functions act as checkpoints.

A checkpoint is two things:

  1. It’s a point where trio checks for cancellation. For example, if the code that called your function set a timeout, and that timeout has expired, then the next time your function executes a checkpoint trio will raise a Cancelled exception. See Cancellation and timeouts below for more details.
  2. It’s a point where the trio scheduler checks its scheduling policy to see if it’s a good time to switch to another task, and potentially does so. (Currently, this check is very simple: the scheduler always switches at every checkpoint. But this might change in the future.)

When writing trio code, you need to keep track of where your checkpoints are. Why? First, because checkpoints require extra scrutiny: whenever you execute a checkpoint, you need to be prepared to handle a Cancelled error, or for another task to run and rearrange some state out from under you. And second, because you also need to make sure that you have enough checkpoints: if your code doesn’t pass through a checkpoint on a regular basis, then it will be slow to notice and respond to cancellation and – much worse – since trio is a cooperative multi-tasking system where the only place the scheduler can switch tasks is at checkpoints, it’ll also prevent the scheduler from fairly allocating time between different tasks and adversely effect the response latency of all the other code running in the same process. (Informally we say that a task that does this is “hogging the run loop”.)

So when you’re doing code review on a project that uses trio, one of the things you’ll want to think about is whether there are enough checkpoints, and whether each one is handled correctly. Of course this means you need a way to recognize checkpoints. How do you do that? The underlying principle is that any operation that blocks has to be a checkpoint. This makes sense: if an operation blocks, then it might block for a long time, and you’ll want to be able to cancel it if a timeout expires; and in any case, while this task is blocked we want another task to be scheduled to run so our code can make full use of the CPU.

But if we want to write correct code in practice, then this principle is a little too sloppy and imprecise to be useful. How do we know which functions might block? What if a function blocks sometimes, but not others, depending on the arguments passed / network speed / phase of the moon? How do we figure out where the checkpoints are when we’re stressed and sleep deprived but still want to get this code review right, and would prefer to reserve our mental energy for thinking about the actual logic instead of worrying about check points?

Don’t worry – trio’s got your back. Since checkpoints are important and ubiquitous, we make it as simple as possible to keep track of them. Here are the rules:

  • Regular (synchronous) functions never contain any checkpoints.

  • Every async function provided by trio always acts as a check point; if you see await <something in trio>, or async for ... in <a trio object>, or async with <trio.something>, then that’s definitely a checkpoint.

    (Partial exception: for async context managers, it might be only the entry or only the exit that acts as a checkpoint; this is documented on a case-by-case basis.)

  • Third-party async functions can act as checkpoints; if you see await <something> or one of its friends, then that might be a checkpoint. So to be safe, you should prepare for scheduling or cancellation happening there.

The reason we distinguish between trio functions and other functions is that we can’t make any guarantees about third party code. Checkpoint-ness is a transitive property: if function A acts as a checkpoint, and you write a function that calls function A, then your function also acts as a checkpoint. If you don’t, then it isn’t. So there’s nothing stopping someone from writing a function like:

# technically legal, but bad style:
async def why_is_this_async():
    return 7

that never calls any of trio’s async functions. This is an async function, but it’s not a checkpoint. But why make a function async if it never calls any async functions? It’s possible, but it’s a bad idea. If you have a function that’s not calling any async functions, then you should make it synchronous. The people who use your function will thank you, because it makes it obvious that your function is not a checkpoint, and their code reviews will go faster.

(Remember how in the tutorial we emphasized the importance of the “async sandwich”, and the way it means that await ends up being a marker that shows when you’re calling a function that calls a function that … eventually calls one of trio’s built-in async functions? The transitivity of async-ness is a technical requirement that Python imposes, but since it exactly matches the transitivity of checkpoint-ness, we’re able to exploit it to help you keep track of checkpoints. Pretty sneaky, eh?)

A slightly trickier case is a function like:

async def sleep_or_not(should_sleep):
    if should_sleep:
        await trio.sleep(1)
    else:
        pass

Here the function acts as a checkpoint if you call it with should_sleep set to a true value, but not otherwise. This is why we emphasize that trio’s own async functions are unconditional check points: they always check for cancellation and check for scheduling, regardless of what arguments they’re passed. If you find an async function in trio that doesn’t follow this rule, then it’s a bug and you should let us know.

Inside trio, we’re very picky about this, because trio is the foundation of the whole system so we think it’s worth the extra effort to make things extra predictable. It’s up to you how picky you want to be in your code. To give you a more realistic example of what this kind of issue looks like in real life, consider this function:

async def recv_exactly(sock, nbytes):
    data = bytearray()
    while nbytes > 0:
        # recv() reads up to 'nbytes' bytes each time
        chunk += await sock.recv(nbytes)
        if not chunk:
            raise RuntimeError("socket unexpected closed")
        nbytes -= len(chunk)
        data += chunk
    return data

If called with an nbytes that’s greater than zero, then it will call sock.recv at least once, and recv is an async trio function, and thus an unconditional checkpoint. So in this case, recv_exactly acts as a checkpoint. But if we do await recv_exactly(sock, 0), then it will immediately return an empty buffer without executing a checkpoint. If this were a function in trio itself, then this wouldn’t be acceptable, but you may decide you don’t want to worry about this kind of minor edge case in your own code.

If you do want to be careful, or if you have some CPU-bound code that doesn’t have enough checkpoints in it, then it’s useful to know that await trio.sleep(0) is an idiomatic way to execute a checkpoint without doing anything else, and that trio.testing.assert_checkpoints() can be used to test that an arbitrary block of code contains a checkpoint.

Thread safety

The vast majority of trio’s API is not thread safe: it can only be used from inside a call to trio.run(). This manual doesn’t bother documenting this on individual calls; unless specifically noted otherwise, you should assume that it isn’t safe to call any trio functions from anywhere except the trio thread. (But see below if you really do need to work with threads.)

Time and clocks

Every call to run() has an associated clock.

By default, trio uses an unspecified monotonic clock, but this can be changed by passing a custom clock object to run() (e.g. for testing).

You should not assume that trio’s internal clock matches any other clock you have access to, including the clocks of simultaneous calls to trio.run() happening in other processes or threads!

The default clock is currently implemented as time.monotonic() plus a large random offset. The idea here is to catch code that accidentally uses time.monotonic() early, which should help keep our options open for changing the clock implementation later, and (more importantly) make sure you can be confident that custom clocks like trio.testing.MockClock will work with third-party libraries you don’t control.

trio.current_time()

Returns the current time according to trio’s internal clock.

Returns:The current time.
Return type:float
Raises:RuntimeError – if not inside a call to trio.run().
await trio.sleep(seconds)

Pause execution of the current task for the given number of seconds.

Parameters:seconds (float) – The number of seconds to sleep. May be zero to insert a checkpoint without actually blocking.
Raises:ValueError – if seconds is negative.
await trio.sleep_until(deadline)

Pause execution of the current task until the given time.

The difference between sleep() and sleep_until() is that the former takes a relative time and the latter takes an absolute time.

Parameters:deadline (float) – The time at which we should wake up again. May be in the past, in which case this function yields but does not block.
await trio.sleep_forever()

Pause execution of the current task forever (or until cancelled).

Equivalent to calling await sleep(math.inf).

If you’re a mad scientist or otherwise feel the need to take direct control over the PASSAGE OF TIME ITSELF, then you can implement a custom Clock class:

class trio.abc.Clock

The interface for custom run loop clocks.

abstractmethod current_time()

Return the current time, according to this clock.

This is used to implement functions like trio.current_time() and trio.move_on_after().

Returns:The current time.
Return type:float
abstractmethod deadline_to_sleep_time(deadline)

Compute the real time until the given deadline.

This is called before we enter a system-specific wait function like :func:~select.select`, to get the timeout to pass.

For a clock using wall-time, this should be something like:

return deadline - self.current_time()

but of course it may be different if you’re implementing some kind of virtual clock.

Parameters:deadline (float) – The absolute time of the next deadline, according to this clock.
Returns:The number of real seconds to sleep until the given deadline. May be math.inf.
Return type:float
abstractmethod start_clock()

Do any setup this clock might need.

Called at the beginning of the run.

Cancellation and timeouts

Trio has a rich, composable system for cancelling work, either explicitly or when a timeout expires.

A simple timeout example

In the simplest case, you can apply a timeout to a block of code:

with trio.move_on_after(30):
    result = await do_http_get("https://...")
    print("result is", result)
print("with block finished")

We refer to move_on_after() as creating a “cancel scope”, which contains all the code that runs inside the with block. If the HTTP request takes more than 30 seconds to run, then it will be cancelled: we’ll abort the request and we won’t see result is ... printed on the console; instead we’ll go straight to printing the with block finished message.

Note

Note that this is a single 30 second timeout for the entire body of the with statement. This is different from what you might have seen with other Python libraries, where timeouts often refer to something more complicated. We think this way is easier to reason about.

How does this work? There’s no magic here: trio is built using ordinary Python functionality, so we can’t just abandon the code inside the with block. Instead, we take advantage of Python’s standard way of aborting a large and complex piece of code: we raise an exception.

Here’s the idea: whenever you call a cancellable function like await trio.sleep(...) or await sock.recv(...) – see Checkpoints – then the first thing that function does is to check if there’s a surrounding cancel scope whose timeout has expired, or otherwise been cancelled. If so, then instead of performing the requested operation, the function fails immediately with a Cancelled exception. In this example, this probably happens somewhere deep inside the bowels of do_http_get. The exception then propagates out like any normal exception (you could even catch it if you wanted, but that’s generally a bad idea), until it reaches the with move_on_after(...):. And at this point, the Cancelled exception has done its job – it’s successfully unwound the whole cancelled scope – so move_on_after() catches it, and execution continues as normal after the with block. And this all works correctly even if you have nested cancel scopes, because every Cancelled object carries an invisible marker that makes sure that the cancel scope that triggered it is the only one that will catch it.

Handling cancellation

Pretty much any code you write using trio needs to have some strategy to handle Cancelled exceptions – even if you didn’t set a timeout, then your caller might (and probably will).

You can catch Cancelled, but you shouldn’t! Or more precisely, if you do catch it, then you should do some cleanup and then re-raise it or otherwise let it continue propagating (unless you encounter an error, in which case it’s OK to let that propagate instead). To help remind you of this fact, Cancelled inherits from BaseException, like KeyboardInterrupt and SystemExit do, so that it won’t be caught by catch-all except Exception: blocks.

It’s also important in any long-running code to make sure that you regularly check for cancellation, because otherwise timeouts won’t work! This happens implicitly every time you call a cancellable operation; see below for details. If you have a task that has to do a lot of work without any I/O, then you can use await sleep(0) to insert an explicit cancel+schedule point.

Here’s a rule of thumb for designing good trio-style (“trionic”?) APIs: if you’re writing a reusable function, then you shouldn’t take a timeout= parameter, and instead let your caller worry about it. This has several advantages. First, it leaves the caller’s options open for deciding how they prefer to handle timeouts – for example, they might find it easier to work with absolute deadlines instead of relative timeouts. If they’re the ones calling into the cancellation machinery, then they get to pick, and you don’t have to worry about it. Second, and more importantly, this makes it easier for others to re-use your code. If you write a http_get function, and then I come along later and write a log_in_to_twitter function that needs to internally make several http_get calls, I don’t want to have to figure out how to configure the individual timeouts on each of those calls – and with trio’s timeout system, it’s totally unnecessary.

Of course, this rule doesn’t apply to APIs that need to impose internal timeouts. For example, if you write a start_http_server function, then you probably should give your caller some way to configure timeouts on individual requests.

Cancellation semantics

You can freely nest cancellation blocks, and each Cancelled exception “knows” which block it belongs to. So long as you don’t stop it, the exception will keep propagating until it reaches the block that raised it, at which point it will stop automatically.

Here’s an example:

print("starting...")
with trio.move_on_after(5):
    with trio.move_on_after(10):
        await sleep(20)
        print("sleep finished without error")
    print("move_on_after(10) finished without error")
print("move_on_after(5) finished without error")

In this code, the outer scope will expire after 5 seconds, causing the sleep() call to return early with a Cancelled exception. Then this exception will propagate through the with move_on_after(10) line until it’s caught by the with move_on_after(5) context manager. So this code will print:

starting...
move_on_after(5) finished without error

The end result is that trio has successfully cancelled exactly the work that was happening within the scope that was cancelled.

Looking at this, you might wonder how you can tell whether the inner block timed out – perhaps you want to do something different, like try a fallback procedure or report a failure to our caller. To make this easier, move_on_after()´s __enter__ function returns an object representing this cancel scope, which we can use to check whether this scope caught a Cancelled exception:

with trio.move_on_after(5) as cancel_scope:
    await sleep(10)
print(cancel_scope.cancelled_caught)  # prints "True"

The cancel_scope object also allows you to check or adjust this scope’s deadline, explicitly trigger a cancellation without waiting for the deadline, check if the scope has already been cancelled, and so forth – see open_cancel_scope() below for the full details.

Cancellations in trio are “level triggered”, meaning that once a block has been cancelled, all cancellable operations in that block will keep raising Cancelled. This helps avoid some pitfalls around resource clean-up. For example, imagine that we have a function that connects to a remote server and sends some messages, and then cleans up on the way out:

with trio.move_on_after(TIMEOUT):
    conn = make_connection()
    try:
        await conn.send_hello_msg()
    finally:
        await conn.send_goodbye_msg()

Now suppose that the remote server stops responding, so our call to await conn.send_hello_msg() hangs forever. Fortunately, we were clever enough to put a timeout around this code, so eventually the timeout will expire and send_hello_msg will raise Cancelled. But then, in the finally block, we make another blocking operation, which will also hang forever! At this point, if we were using asyncio or another library with “edge-triggered” cancellation, we’d be in trouble: since our timeout already fired, it wouldn’t fire again, and at this point our application would lock up forever. But in trio, this doesn’t happen: the await conn.send_goodbye_msg() call is still inside the cancelled block, so it will also raise Cancelled.

Of course, if you really want to make another blocking call in your cleanup handler, trio will let you; it’s trying to prevent you from accidentally shooting yourself in the foot. Intentional foot-shooting is no problem (or at least – it’s not trio’s problem). To do this, create a new scope, and set its shield attribute to True:

with trio.move_on_after(TIMEOUT):
    conn = make_connection()
    try:
        await conn.send_hello_msg()
    finally:
        with move_on_after(CLEANUP_TIMEOUT) as cleanup_scope:
            cleanup_scope.shield = True
            await conn.send_goodbye_msg()

So long as you’re inside a scope with shield = True set, then you’ll be protected from outside cancellations. Note though that this only applies to outside cancellations: if CLEANUP_TIMEOUT expires then await conn.send_goodbye_msg() will still be cancelled, and if await conn.send_goodbye_msg() call uses any timeouts internally, then those will continue to work normally as well. This is a pretty advanced feature that most people probably won’t use, but it’s there for the rare cases where you need it.

Cancellation and primitive operations

We’ve talked a lot about what happens when an operation is cancelled, and how you need to be prepared for this whenever calling a cancellable operation… but we haven’t gone into the details about which operations are cancellable, and how exactly they behave when they’re cancelled.

Here’s the rule: if it’s in the trio namespace, and you use await to call it, then it’s cancellable (see Checkpoints above). Cancellable means:

  • If you try to call it when inside a cancelled scope, then it will raise Cancelled.
  • If it blocks, and while it’s blocked then one of the scopes around it becomes cancelled, it will return early and raise Cancelled.
  • Raising Cancelled means that the operation did not happen. If a trio socket’s send method raises Cancelled, then no data was sent. If a trio socket’s recv method raises Cancelled then no data was lost – it’s still sitting in the socket recieve buffer waiting for you to call recv again. And so forth.

There are a few idiosyncratic cases where external constraints make it impossible to fully implement these semantics. These are always documented. There is also one systematic exception:

  • Async cleanup operations – like __aexit__ methods or async close methods – are cancellable just like anything else except that if they are cancelled, they still perform a minimum level of cleanup before raising Cancelled.

For example, closing a TLS-wrapped socket normally involves sending a notification to the remote peer, so that they can be cryptographically assured that you really meant to close the socket, and your connection wasn’t just broken by a man-in-the-middle attacker. But handling this robustly is a bit tricky. Remember our example above where the blocking send_goodbye_msg caused problems? That’s exactly how closing a TLS socket works: if the remote peer has disappeared, then our code may never be able to actually send our shutdown notification, and it would be nice if it didn’t block forever trying. Therefore, the method for closing a TLS-wrapped socket will try to send that notification – and if it gets cancelled, then it will give up on sending the message, but will still close the underlying socket before raising Cancelled, so at least you don’t leak that resource.

Cancellation API details

The primitive operation for creating a new cancellation scope is:

with trio.open_cancel_scope(*, deadline=inf, shield=False) as cancel_scope

Returns a context manager which creates a new cancellation scope.

Cancel scope objects provide the following interface:

The cancel scope interface
deadline

Read-write, float. An absolute time on the current run’s clock at which this scope will automatically become cancelled. You can adjust the deadline by modifying this attribute, e.g.:

# I need a little more time!
cancel_scope.deadline += 30

Note that for efficiency, the core run loop only checks for expired deadlines every once in a while. This means that in certain cases there may be a short delay between when the clock says the deadline should have expired, and when checkpoints start raising Cancelled. This is a very obscure corner case that you’re unlikely to notice, but we document it for completeness. (If this does cause problems for you, of course, then we want to know!)

Defaults to math.inf, which means “no deadline”, though this can be overridden by the deadline= argument to open_cancel_scope().

shield

Read-write, bool, default False. So long as this is set to True, then the code inside this scope will not receive Cancelled exceptions from scopes that are outside this scope. They can still receive Cancelled exceptions from (1) this scope, or (2) scopes inside this scope. You can modify this attribute:

with trio.open_cancel_scope() as cancel_scope:
    cancel_scope.shield = True
    # This cannot be interrupted by any means short of
    # killing the process:
    await sleep(10)

    cancel_scope.shield = False
    # Now this can be cancelled normally:
    await sleep(10)

Defaults to False, though this can be overridden by the shield= argument to open_cancel_scope().

cancel()

Cancels this scope immediately.

This method is idempotent, i.e. if the scope was already cancelled then this method silently does nothing.

cancel_called

Readonly bool. Records whether this scope has been cancelled, either by an explicit call to cancel() or by the deadline expiring.

cancelled_caught

Readonly bool. Records whether this scope caught a Cancelled exception. This requires two things: (1) the with block exited with a Cancelled exception, and (2) this scope is the one that was responsible for triggering this Cancelled exception.

Trio also provides several convenience functions for the common situation of just wanting to impose a timeout on some code:

with trio.move_on_after(seconds) as cancel_scope

Use as a context manager to create a cancel scope whose deadline is set to now + seconds.

Parameters:seconds (float) – The timeout.
Raises:ValueError – if timeout is less than zero.
with trio.move_on_at(deadline) as cancel_scope

Use as a context manager to create a cancel scope with the given absolute deadline.

Parameters:deadline (float) – The deadline.
with trio.fail_after(seconds) as cancel_scope

Creates a cancel scope with the given timeout, and raises an error if it is actually cancelled.

This function and move_on_after() are similar in that both create a cancel scope with a given timeout, and if the timeout expires then both will cause Cancelled to be raised within the scope. The difference is that when the Cancelled exception reaches move_on_after(), it’s caught and discarded. When it reaches fail_after(), then it’s caught and TooSlowError is raised in its place.

Raises:
with trio.fail_at(deadline) as cancel_scope

Creates a cancel scope with the given deadline, and raises an error if it is actually cancelled.

This function and move_on_at() are similar in that both create a cancel scope with a given absolute deadline, and if the deadline expires then both will cause Cancelled to be raised within the scope. The difference is that when the Cancelled exception reaches move_on_at(), it’s caught and discarded. When it reaches fail_after(), then it’s caught and TooSlowError is raised in its place.

Raises:TooSlowError – if a Cancelled exception is raised in this scope and caught by the context manager.

Cheat sheet:

  • If you want to impose a timeout on a function, but you don’t care whether it timed out or not:

    with trio.move_on_after(TIMEOUT):
        await do_whatever()
    # carry on!
    
  • If you want to impose a timeout on a function, and then do some recovery if it timed out:

    with trio.move_on_after(TIMEOUT) as cancel_scope:
        await do_whatever()
    if cancel_scope.cancelled_caught:
        # The operation timed out, try something else
        try_to_recover()
    
  • If you want to impose a timeout on a function, and then if it times out then just give up and raise an error for your caller to deal with:

    with trio.fail_after(TIMEOUT):
        await do_whatever()
    

It’s also possible to check what the current effective deadline is, which is sometimes useful:

trio.current_effective_deadline()

Returns the current effective deadline for the current task.

This function examines all the cancellation scopes that are currently in effect (taking into account shielding), and returns the deadline that will expire first.

One example of where this might be is useful is if your code is trying to decide whether to begin an expensive operation like an RPC call, but wants to skip it if it knows that it can’t possibly complete in the available time. Another example would be if you’re using a protocol like gRPC that propagates timeout information to the remote peer; this function gives a way to fetch that information so you can send it along.

If this is called in a context where a cancellation is currently active (i.e., a blocking call will immediately raise Cancelled), then returned deadline is -inf. If it is called in a context where no scopes have a deadline set, it returns inf.

Returns:the effective deadline, as an absolute time.
Return type:float

Tasks let you do multiple things at once

One of trio’s core design principles is: no implicit concurrency. Every function executes in a straightforward, top-to-bottom manner, finishing each operation before moving on to the next – like Guido intended.

But, of course, the entire point of an async library is to let you do multiple things at once. The one and only way to do that in trio is through the task spawning interface. So if you want your program to walk and chew gum, this is the section for you.

Nurseries and spawning

Most libraries for concurrent programming let you start new child tasks (or threads, or whatever) willy-nilly, whenever and where-ever you feel like it. Trio is a bit different: you can’t start a child task unless you’re prepared to be a responsible parent. The way you demonstrate your responsibility is by creating a nursery:

async with trio.open_nursery() as nursery:
    ...

And once you have a reference to a nursery object, you can start children in that nursery:

async def child():
    ...

async def parent():
    async with trio.open_nursery() as nursery:
        # Make two concurrent calls to child()
        nursery.start_soon(child)
        nursery.start_soon(child)

This means that tasks form a tree: when you call run(), then this creates an initial task, and all your other tasks will be children, grandchildren, etc. of the initial task.

Essentially, the body of the async with block acts like an initial task that’s running inside the nursery, and then each call to nursery.start_soon adds another task that runs in parallel. Two crucial things to keep in mind:

  • If any task inside the nursery finishes with an unhandled exception, then the nursery immediately cancels all the tasks inside the nursery.
  • Since all of the tasks are running concurrently inside the async with block, the block does not exit until all tasks have completed. If you’ve used other concurrency frameworks, then you can think of it as, the de-indentation at the end of the async with automatically “joins” (waits for) all of the tasks in the nursery.
  • Once all the tasks have finished, then:
    • The nursery is marked as “closed”, meaning that no new tasks can be started inside it.
    • Any unhandled exceptions are re-raised inside the parent task. If there are multiple exceptions, then they’re collected up into a single MultiError exception.

Since all tasks are descendents of the initial task, one consequence of this is that run() can’t finish until all tasks have finished.

Child tasks and cancellation

In trio, child tasks inherit the parent nursery’s cancel scopes. So in this example, both the child tasks will be cancelled when the timeout expires:

with move_on_after(TIMEOUT):
    async with trio.open_nursery() as nursery:
        nursery.start_soon(child1)
        nursery.start_soon(child2)

Note that what matters here is the scopes that were active when open_nursery() was called, not the scopes active when start_soon is called. So for example, the timeout block below does nothing at all:

async with trio.open_nursery() as nursery:
    with move_on_after(TIMEOUT):  # don't do this!
        nursery.start_soon(child)

Errors in multiple child tasks

Normally, in Python, only one thing happens at a time, which means that only one thing can wrong at a time. Trio has no such limitation. Consider code like:

async def broken1():
    d = {}
    return d["missing"]

async def broken2():
    seq = range(10)
    return seq[20]

async def parent():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(broken1)
        nursery.start_soon(broken2)

broken1 raises KeyError. broken2 raises IndexError. Obviously parent should raise some error, but what? In some sense, the answer should be “both of these at once”, but in Python there can only be one exception at a time.

Trio’s answer is that it raises a MultiError object. This is a special exception which encapsulates multiple exception objects – either regular exceptions or nested MultiErrors. To make these easier to work with, trio installs a custom sys.excepthook that knows how to print nice tracebacks for unhandled MultiErrors, and it also provides some helpful utilities like MultiError.catch(), which allows you to catch “part of” a MultiError.

Spawning tasks without becoming a parent

Sometimes it doesn’t make sense for the task that starts a child to take on responsibility for watching it. For example, a server task may want to start a new task for each connection, but it can’t listen for connections and supervise children at the same time.

The solution here is simple once you see it: there’s no requirement that a nursery object stay in the task that created it! We can write code like this:

async def new_connection_listener(handler, nursery):
    while True:
        conn = await get_new_connection()
        nursery.start_soon(handler, conn)

async def server(handler):
    async with trio.open_nursery() as nursery:
        nursery.start_soon(new_connection_listener, handler, nursery)

Notice that server opens a nursery and passes it to new_connection_listener, and then new_connection_listener is able to start new tasks as “siblings” of itself. Of course, in this case, we could just as well have written:

async def server(handler):
    async with trio.open_nursery() as nursery:
        while True:
            conn = await get_new_connection()
            nursery.start_soon(handler, conn)

…but sometimes things aren’t so simple, and this trick comes in handy.

One thing to remember, though: cancel scopes are inherited from the nursery, not from the task that calls start_soon. So in this example, the timeout does not apply to child (or to anything else):

async def do_spawn(nursery):
    with move_on_after(TIMEOUT):  # don't do this, it has no effect
        nursery.start_soon(child)

async with trio.open_nursery() as nursery:
    nursery.start_soon(do_spawn, nursery)

Custom supervisors

The default cleanup logic is often sufficient for simple cases, but what if you want a more sophisticated supervisor? For example, maybe you have Erlang envy and want features like automatic restart of crashed tasks. Trio itself doesn’t provide these kinds of features, but you can build them on top; Trio’s goal is to enforce basic hygiene and then get out of your way. (Specifically: Trio won’t let you build a supervisor that exits and leaves orphaned tasks behind, and if you have an unhandled exception due to bugs or laziness then Trio will make sure they propagate.) And then you can wrap your fancy supervisor up in a library and put it on PyPI, because supervisors are tricky and there’s no reason everyone should have to write their own.

For example, here’s a function that takes a list of functions, runs them all concurrently, and returns the result from the one that finishes first:

async def race(*async_fns):
    if not async_fns:
        raise ValueError("must pass at least one argument")

    q = trio.Queue(1)

    async def jockey(async_fn):
        await q.put(await async_fn())

    async with trio.open_nursery() as nursery:
        for async_fn in async_fns:
            nursery.start_soon(jockey, async_fn)
        winner = await q.get()
        nursery.cancel_scope.cancel()
        return winner

This works by starting a set of tasks which each try to run their function, and then report back the value it returns. The main task uses q.get() to wait for one to finish; as soon as the first task crosses the finish line, it cancels the rest, and then returns the winning value.

Here if one or more of the racing functions raises an unhandled exception then Trio’s normal handling kicks in: it cancels the others and then propagates the exception. If you want different behavior, you can get that by adding a try block to the jockey function to catch exceptions and handle them however you like.

Task-local storage

Suppose you’re writing a server that responds to network requests, and you log some information about each request as you process it. If the server is busy and there are multiple requests being handled at the same time, then you might end up with logs like this:

Request handler started
Request handler started
Request handler finished
Request handler finished

In this log, it’s hard to know which lines came from which request. (Did the request that started first also finish first, or not?) One way to solve this is to assign each request a unique identifier, and then include this identifier in each log message:

request 1: Request handler started
request 2: Request handler started
request 2: Request handler finished
request 1: Request handler finished

This way we can see that request 1 was slow: it started before request 2 but finished afterwards. (You can also get much fancier, but this is enough for an example.)

Now, here’s the problem: how does the logging code know what the request identifier is? One approach would be to explicitly pass it around to every function that might want to emit logs… but that’s basically every function, because you never know when you might need to add a log.debug(...) call to some utility function buried deep in the call stack, and when you’re in the middle of a debugging a nasty problem that last thing you want is to have to stop first and refactor everything to pass through the request identifier! Sometimes this is the right solution, but other times it would be much more convenient if we could store the identifier in a global variable, so that the logging function could look it up whenever it needed it. Except… a global variable can only have one value at a time, so if we have multiple handlers running at once then this isn’t going to work. What we need is something that’s like a global variable, but that can have different values depending on which request handler is accessing it.

That’s what trio.TaskLocal gives you:

class trio.TaskLocal(**kwargs)

Task-local storage.

Instances of this class have no particular attributes or methods. Instead, they serve as a blank slate to which you can add whatever attributes you like. Modifications made within one task will only be visible to that task – with one exception: when you start a new task, then any TaskLocal attributes that are visible in the task that called start or start_soon will be inherited by the child. This inheritance takes the form of a shallow copy: further changes in the parent will not affect the child, and changes in the child will not affect the parent. (If you’re familiar with how environment variables are inherited across processes, then TaskLocal inheritance is somewhat similar.)

If you’re familiar with threading.local, then trio.TaskLocal is very similar, except adapted to work with tasks instead of threads, and with the added feature that values are automatically inherited across tasks.

When creating a TaskLocal object, you can provide default values as keyword arguments:

local = trio.TaskLocal(a=1)

async def main():
    # The first time we access the TaskLocal object, the 'a' attribute
    # is already present:
    assert local.a == 1

The default values are like the default values to functions: they’re only evaluated once, when the object is created. So you shouldn’t use mutable objects as defaults – they’ll be shared not just across tasks, but even across entirely unrelated runs! For example:

# Don't do this!!
local = trio.TaskLocal(a=[])

async def main():
    assert local.a == []
    local.a.append(1)

# First time, everything seems to work
trio.run(main)

# Second time, the assertion fails, because the first time modified
# the list object.
trio.run(main)

And here’s a toy example demonstrating how to use TaskLocal:

import random
import trio

request_info = trio.TaskLocal()

# Example logging function that tags each line with the request identifier.
def log(msg):
    # Read from task-local storage:
    request_tag = request_info.tag

    print("request {}: {}".format(request_tag, msg))

# An example "request handler" that does some work itself and also
# spawns some helper tasks to do some concurrent work.
async def handle_request(tag):
    # Write to task-local storage:
    request_info.tag = tag

    log("Request handler started")
    await trio.sleep(random.random())
    async with trio.open_nursery() as nursery:
        nursery.start_soon(concurrent_helper, "a")
        nursery.start_soon(concurrent_helper, "b")
    await trio.sleep(random.random())
    log("Request received finished")

async def concurrent_helper(job):
    log("Helper task {} started".format(job))
    await trio.sleep(random.random())
    log("Helper task {} finished".format(job))

# Spawn several "request handlers" simultaneously, to simulate a
# busy server handling multiple requests at the same time.
async def main():
    async with trio.open_nursery() as nursery:
        for i in range(3):
            nursery.start_soon(handle_request, i)

trio.run(main)

Example output (yours may differ slightly):

request 1: Request handler started
request 2: Request handler started
request 0: Request handler started
request 2: Helper task a started
request 2: Helper task b started
request 1: Helper task a started
request 1: Helper task b started
request 0: Helper task b started
request 0: Helper task a started
request 2: Helper task b finished
request 2: Helper task a finished
request 2: Request received finished
request 0: Helper task a finished
request 1: Helper task a finished
request 1: Helper task b finished
request 1: Request received finished
request 0: Helper task b finished
request 0: Request received finished

Synchronizing and communicating between tasks

Trio provides a standard set of synchronization and inter-task communication primitives. These objects’ APIs are generally modelled off of the analogous classes in the standard library, but with some differences.

Blocking and non-blocking methods

The standard library synchronization primitives have a variety of mechanisms for specifying timeouts and blocking behavior, and of signaling whether an operation returned due to success versus a timeout.

In trio, we standardize on the following conventions:

  • We don’t provide timeout arguments. If you want a timeout, then use a cancel scope.
  • For operations that have a non-blocking variant, the blocking and non-blocking variants are different methods with names like X and X_nowait, respectively. (This is similar to queue.Queue, but unlike most of the classes in threading.) We like this approach because it allows us to make the blocking version async and the non-blocking version sync.
  • When a non-blocking method cannot succeed (the queue is empty, the lock is already held, etc.), then it raises trio.WouldBlock. There’s no equivalent to the queue.Empty versus queue.Full distinction – we just have the one exception that we use consistently.

Fairness

These classes are all guaranteed to be “fair”, meaning that when it comes time to choose who will be next to acquire a lock, get an item from a queue, etc., then it always goes to the task which has been waiting longest. It’s not entirely clear whether this is the best choice, but for now that’s how it works.

As an example of what this means, here’s a small program in which two tasks compete for a lock. Notice that the task which releases the lock always immedately attempts to re-acquire it, before the other task has a chance to run. (And remember that we’re doing cooperative multi-tasking here, so it’s actually deterministic that the task releasing the lock will call acquire() before the other task wakes up; in trio releasing a lock is not a checkpoint.) With an unfair lock, this would result in the same task holding the lock forever and the other task being starved out. But if you run this, you’ll see that the two tasks politely take turns:

# fairness-demo.py

import trio

async def loopy_child(number, lock):
    while True:
        async with lock:
            print("Child {} has the lock!".format(number))
            await trio.sleep(0.5)

async def main():
    async with trio.open_nursery() as nursery:
        lock = trio.Lock()
        nursery.start_soon(loopy_child, 1, lock)
        nursery.start_soon(loopy_child, 2, lock)

trio.run(main)

Broadcasting an event with Event

class trio.Event

A waitable boolean value useful for inter-task synchronization, inspired by threading.Event.

An event object manages an internal boolean flag, which is initially False, and tasks can wait for it to become True.

clear()

Set the internal flag value to False.

is_set()

Return the current value of the internal flag.

set()

Set the internal flag value to True, and wake any waiting tasks.

statistics()

Return an object containing debugging information.

Currently the following fields are defined:

  • tasks_waiting: The number of tasks blocked on this event’s wait() method.
await wait()

Block until the internal flag value becomes True.

If it’s already True, then this method is still a checkpoint, but otherwise returns immediately.

Passing messages with Queue

You can use Queue objects to safely pass objects between tasks. Trio Queue objects always have a bounded size. Here’s a toy example to demonstrate why this is important. Suppose we have a queue with two producers and one consumer:

async def producer(queue):
    while True:
        await queue.put(1)

async def consumer(queue):
    while True:
        print(await queue.get())

async def main():
    # This example won't work with Trio's actual Queue class, so
    # imagine we have some sort of platonic ideal of an unbounded
    # queue here:
    queue = trio.HypotheticalQueue()
    async with trio.open_nursery() as nursery:
        # Two producers
        nursery.start_soon(producer, queue)
        nursery.start_soon(producer, queue)
        # One consumer
        nursery.start_soon(consumer, queue)

trio.run(main)

If we naively cycle between these three tasks in round-robin style, then we put an item, then put an item, then get an item, then put an item, then put an item, then get an item, … and since on each cycle we add two items to the queue but only remove one, then over time the queue size grows arbitrarily large, our latency is terrible, we run out of memory, it’s just generally bad news all around.

By placing an upper bound on our queue’s size, we avoid this problem. If the queue gets too big, then it applies backpressure: put blocks and forces the producers to slow down and wait until the consumer calls get.

class trio.Queue(capacity)

A bounded queue suitable for inter-task communication.

This class is generally modelled after queue.Queue, but with the major difference that it is always bounded.

A Queue object can be used as an asynchronous iterator that dequeues objects one at a time. That is, these two loops are equivalent:

async for obj in queue:
    ...

while True:
    obj = await queue.get()
    ...
Parameters:capacity (int) – The maximum number of items allowed in the queue before put() blocks. Choosing a sensible value here is important to ensure that backpressure is communicated promptly and avoid unnecessary latency. If in doubt, use 1.
empty()

Returns True if the queue is empty, False otherwise.

There is some subtlety to interpreting this method’s return value: see issue #63.

full()

Returns True if the queue is at capacity, False otherwise.

There is some subtlety to interpreting this method’s return value: see issue #63.

await get()

Get an object from the queue, blocking if necessary.

Returns:The dequeued object.
Return type:object
get_nowait()

Attempt to get an object from the queue, without blocking.

Returns:The dequeued object.
Return type:object
Raises:WouldBlock – if the queue is empty.
await put(obj)

Put an object into the queue, blocking if necessary.

Parameters:obj (object) – The object to enqueue.
put_nowait(obj)

Attempt to put an object into the queue, without blocking.

Parameters:obj (object) – The object to enqueue.
Raises:WouldBlock – if the queue is full.
qsize()

Returns the number of items currently in the queue.

There is some subtlety to interpreting this method’s return value: see issue #63.

statistics()

Returns an object containing debugging information.

Currently the following fields are defined:

  • qsize: The number of items currently in the queue.
  • capacity: The maximum number of items the queue can hold.
  • tasks_waiting_put: The number of tasks blocked on this queue’s put() method.
  • tasks_waiting_get: The number of tasks blocked on this queue’s get() method.

Lower-level synchronization primitives

Personally, I find that events and queues are usually enough to implement most things I care about, and lead to easier to read code than the lower-level primitives discussed in this section. But if you need them, they’re here. (If you find yourself reaching for these because you’re trying to implement a new higher-level synchronization primitive, then you might also want to check out the facilities in trio.hazmat for a more direct exposure of trio’s underlying synchronization logic. All of classes discussed in this section are implemented on top of the public APIs in trio.hazmat; they don’t have any special access to trio’s internals.)

class trio.CapacityLimiter(total_tokens)

An object for controlling access to a resource with limited capacity.

Sometimes you need to put a limit on how many tasks can do something at the same time. For example, you might want to use some threads to run multiple blocking I/O operations in parallel… but if you use too many threads at once, then your system can become overloaded and it’ll actually make things slower. One popular solution is to impose a policy like “run up to 40 threads at the same time, but no more”. But how do you implement a policy like this?

That’s what CapacityLimiter is for. You can think of a CapacityLimiter object as a sack that starts out holding some fixed number of tokens:

limit = trio.CapacityLimiter(40)

Then tasks can come along and borrow a token out of the sack:

# Borrow a token:
async with limit:
    # We are holding a token!
    await perform_expensive_operation()
# Exiting the 'async with' block puts the token back into the sack

And crucially, if you try to borrow a token but the sack is empty, then you have to wait for another task to finish what it’s doing and put its token back first before you can take it and continue.

Another way to think of it: a CapacityLimiter is like a sofa with a fixed number of seats, and if they’re all taken then you have to wait for someone to get up before you can sit down.

By default, run_sync_in_worker_thread() uses a CapacityLimiter to limit the number of threads running at once; see current_default_worker_thread_limiter() for details.

If you’re familiar with semaphores, then you can think of this as a restricted semaphore that’s specialized for one common use case, with additional error checking. For a more traditional semaphore, see Semaphore.

Note

Don’t confuse this with the “leaky bucket” or “token bucket” algorithms used to limit bandwidth usage on networks. The basic idea of using tokens to track a resource limit is similar, but this is a very simple sack where tokens aren’t automatically created or destroyed over time; they’re just borrowed and then put back.

await acquire()

Borrow a token from the sack, blocking if necessary.

Raises:RuntimeError – if the current task already holds one of this sack’s tokens.
acquire_nowait()

Borrow a token from the sack, without blocking.

Raises:
  • WouldBlock – if no tokens are available.
  • RuntimeError – if the current task already holds one of this sack’s tokens.
await acquire_on_behalf_of(borrower)

Borrow a token from the sack on behalf of borrower, blocking if necessary.

Parameters:borrower – A trio.hazmat.Task or arbitrary opaque object used to record who is borrowing this token; see acquire_on_behalf_of_nowait() for details.
Raises:RuntimeError – if borrower task already holds one of this sack’s tokens.
acquire_on_behalf_of_nowait(borrower)

Borrow a token from the sack on behalf of borrower, without blocking.

Parameters:

borrower – A trio.hazmat.Task or arbitrary opaque object used to record who is borrowing this token. This is used by run_sync_in_worker_thread() to allow threads to “hold tokens”, with the intention in the future of using it to allow deadlock detection and other useful things

Raises:
  • WouldBlock – if no tokens are available.
  • RuntimeError – if borrower already holds one of this sack’s tokens.
available_tokens

The amount of capacity that’s available to use.

borrowed_tokens

The amount of capacity that’s currently in use.

release()

Put a token back into the sack.

Raises:RuntimeError – if the current task has not acquired one of this sack’s tokens.
release_on_behalf_of(borrower)

Put a token back into the sack on behalf of borrower.

Raises:RuntimeError – if the given borrower has not acquired one of this sack’s tokens.
statistics()

Return an object containing debugging information.

Currently the following fields are defined:

  • borrowed_tokens: The number of tokens currently borrowed from the sack.
  • total_tokens: The total number of tokens in the sack. Usually this will be larger than borrowed_tokens, but it’s possibly for it to be smaller if total_tokens was recently decreased.
  • borrowers: A list of all tasks or other entities that currently hold a token.
  • tasks_waiting: The number of tasks blocked on this CapacityLimiter’s acquire() or acquire_on_behalf_of() methods.
total_tokens

The total capacity available.

You can change total_tokens by assigning to this attribute. If you make it larger, then the appropriate number of waiting tasks will be woken immediately to take the new tokens. If you decrease total_tokens below the number of tasks that are currently using the resource, then all current tasks will be allowed to finish as normal, but no new tasks will be allowed in until the total number of tasks drops below the new total_tokens.

class trio.Semaphore(initial_value, *, max_value=None)

A semaphore.

A semaphore holds an integer value, which can be incremented by calling release() and decremented by calling acquire() – but the value is never allowed to drop below zero. If the value is zero, then acquire() will block until someone calls release().

If you’re looking for a Semaphore to limit the number of tasks that can access some resource simultaneously, then consider using a CapacityLimiter instead.

This object’s interface is similar to, but different from, that of threading.Semaphore.

A Semaphore object can be used as an async context manager; it blocks on entry but not on exit.

Parameters:
  • initial_value (int) – A non-negative integer giving semaphore’s initial value.
  • max_value (int or None) – If given, makes this a “bounded” semaphore that raises an error if the value is about to exceed the given max_value.
await acquire()

Decrement the semaphore value, blocking if necessary to avoid letting it drop below zero.

acquire_nowait()

Attempt to decrement the semaphore value, without blocking.

Raises:WouldBlock – if the value is zero.
max_value

The maximum allowed value. May be None to indicate no limit.

release()

Increment the semaphore value, possibly waking a task blocked in acquire().

Raises:ValueError – if incrementing the value would cause it to exceed max_value.
statistics()

Return an object containing debugging information.

Currently the following fields are defined:

  • tasks_waiting: The number of tasks blocked on this semaphore’s acquire() method.
value

The current value of the semaphore.

class trio.Lock

A classic mutex.

This is a non-reentrant, single-owner lock. Unlike threading.Lock, only the owner of the lock is allowed to release it.

A Lock object can be used as an async context manager; it blocks on entry but not on exit.

await acquire()

Acquire the lock, blocking if necessary.

acquire_nowait()

Attempt to acquire the lock, without blocking.

Raises:WouldBlock – if the lock is held.
locked()

Check whether the lock is currently held.

Returns:True if the lock is held, False otherwise.
Return type:bool
release()

Release the lock.

Raises:RuntimeError – if the calling task does not hold the lock.
statistics()

Return an object containing debugging information.

Currently the following fields are defined:

  • locked: boolean indicating whether the lock is held.
  • owner: the trio.hazmat.Task currently holding the lock, or None if the lock is not held.
  • tasks_waiting: The number of tasks blocked on this lock’s acquire() method.
class trio.StrictFIFOLock

A variant of Lock where tasks are guaranteed to acquire the lock in strict first-come-first-served order.

An example of when this is useful is if you’re implementing something like trio.ssl.SSLStream or an HTTP/2 server using h2, where you have multiple concurrent tasks that are interacting with a shared state machine, and at unpredictable moments the state machine requests that a chunk of data be sent over the network. (For example, when using h2 simply reading incoming data can occasionally create outgoing data to send.) The challenge is to make sure that these chunks are sent in the correct order, without being garbled.

One option would be to use a regular Lock, and wrap it around every interaction with the state machine:

# This approach is sometimes workable but often sub-optimal; see below
async with lock:
    state_machine.do_something()
    if state_machine.has_data_to_send():
        await conn.sendall(state_machine.get_data_to_send())

But this can be problematic. If you’re using h2 then usually reading incoming data doesn’t create the need to send any data, so we don’t want to force every task that tries to read from the network to sit and wait a potentially long time for sendall to finish. And in some situations this could even potentially cause a deadlock, if the remote peer is waiting for you to read some data before it accepts the data you’re sending.

StrictFIFOLock provides an alternative. We can rewrite our example like:

# Note: no awaits between when we start using the state machine and
# when we block to take the lock!
state_machine.do_something()
if state_machine.has_data_to_send():
    # Notice that we fetch the data to send out of the state machine
    # *before* sleeping, so that other tasks won't see it.
    chunk = state_machine.get_data_to_send()
    async with strict_fifo_lock:
        await conn.sendall(chunk)

First we do all our interaction with the state machine in a single scheduling quantum (notice there are no awaits in there), so it’s automatically atomic with respect to other tasks. And then if and only if we have data to send, we get in line to send it – and StrictFIFOLock guarantees that each task will send its data in the same order that the state machine generated it.

Currently, StrictFIFOLock is simply an alias for Lock, but (a) this may not always be true in the future, especially if trio ever implements more sophisticated scheduling policies, and (b) the above code is relying on a pretty subtle property of its lock. Using a StrictFIFOLock acts as an executable reminder that you’re relying on this property.

class trio.Condition(lock=None)

A classic condition variable, similar to threading.Condition.

A Condition object can be used as an async context manager to acquire the underlying lock; it blocks on entry but not on exit.

Parameters:lock (Lock) – the lock object to use. If given, must be a trio.Lock. If None, a new Lock will be allocated and used.
await acquire()

Acquire the underlying lock, blocking if necessary.

acquire_nowait()

Attempt to acquire the underlying lock, without blocking.

Raises:WouldBlock – if the lock is currently held.
locked()

Check whether the underlying lock is currently held.

Returns:True if the lock is held, False otherwise.
Return type:bool
notify(n=1)

Wake one or more tasks that are blocked in wait().

Parameters:n (int) – The number of tasks to wake.
Raises:RuntimeError – if the calling task does not hold the lock.
notify_all()

Wake all tasks that are currently blocked in wait().

Raises:RuntimeError – if the calling task does not hold the lock.
release()

Release the underlying lock.

statistics()

Return an object containing debugging information.

Currently the following fields are defined:

  • tasks_waiting: The number of tasks blocked on this condition’s wait() method.
  • lock_statistics: The result of calling the underlying Locks statistics() method.
await wait()

Wait for another thread to call notify() or notify_all().

When calling this method, you must hold the lock. It releases the lock while waiting, and then re-acquires it before waking up.

There is a subtlety with how this method interacts with cancellation: when cancelled it will block to re-acquire the lock before raising Cancelled. This may cause cancellation to be less prompt than expected. The advantage is that it makes code like this work:

async with condition:
    await condition.wait()

If we didn’t re-acquire the lock before waking up, and wait() were cancelled here, then we’d crash in condition.__aexit__ when we tried to release the lock we no longer held.

Raises:RuntimeError – if the calling task does not hold the lock.

Threads (if you must)

In a perfect world, all third-party libraries and low-level APIs would be natively async and integrated into Trio, and all would be happiness and rainbows.

That world, alas, does not (yet) exist. Until it does, you may find yourself needing to interact with non-Trio APIs that do rude things like “blocking”.

In acknowledgment of this reality, Trio provides two useful utilities for working with real, operating-system level, threading-module-style threads. First, if you’re in Trio but need to push some blocking I/O into a thread, there’s run_sync_in_worker_thread(). And if you’re in a thread and need to communicate back with trio, you can use a BlockingTrioPortal.

Trio’s philosophy about managing worker threads

If you’ve used other I/O frameworks, you may have encountered the concept of a “thread pool”, which is most commonly implemented as a fixed size collection of threads that hang around waiting for jobs to be assigned to them. These solve two different problems: First, re-using the same threads over and over is more efficient than starting and stopping a new thread for every job you need done; basically, the pool acts as a kind of cache for idle threads. And second, having a fixed size avoids getting into a situation where 100,000 jobs are submitted simultaneously, and then 100,000 threads are spawned and the system gets overloaded and crashes. Instead, the N threads start executing the first N jobs, while the other (100,000 - N) jobs sit in a queue and wait their turn. Which is generally what you want, and this is how trio.run_sync_in_worker_thread() works by default.

The downside of this kind of thread pool is that sometimes, you need more sophisticated logic for controlling how many threads are run at once. For example, you might want a policy like “at most 20 threads total, but no more than 3 of those can be running jobs associated with the same user account”, or you might want a pool whose size is dynamically adjusted over time in response to system conditions.

It’s even possible for a fixed-size policy to cause unexpected deadlocks. Imagine a situation where we have two different types of blocking jobs that you want to run in the thread pool, type A and type B. Type A is pretty simple: it just runs and completes pretty quickly. But type B is more complicated: it has to stop in the middle and wait for some other work to finish, and that other work includes running a type A job. Now, suppose you submit N jobs of type B to the pool. They all start running, and then eventually end up submitting one or more jobs of type A. But since every thread in our pool is already busy, the type A jobs don’t actually start running – they just sit in a queue waiting for the type B jobs to finish. But the type B jobs will never finish, because they’re waiting for the type A jobs. Our system has deadlocked. The ideal solution to this problem is to avoid having type B jobs in the first place – generally it’s better to keep complex synchronization logic in the main Trio thread. But if you can’t do that, then you need a custom thread allocation policy that tracks separate limits for different types of jobs, and make it impossible for type B jobs to fill up all the slots that type A jobs need to run.

So, we can see that it’s important to be able to change the policy controlling the allocation of threads to jobs. But in many frameworks, this requires implementing a new thread pool from scratch, which is highly non-trivial; and if different types of jobs need different policies, then you may have to create multiple pools, which is inefficient because now you effectively have two different thread caches that aren’t sharing resources.

Trio’s solution to this problem is to split worker thread management into two layers. The lower layer is responsible for taking blocking I/O jobs and arranging for them to run immediately on some worker thread. It takes care of solving the tricky concurrency problems involved in managing threads and is responsible for optimizations like re-using threads, but has no admission control policy: if you give it 100,000 jobs, it will spawn 100,000 threads. The upper layer is responsible for providing the policy to make sure that this doesn’t happen – but since it only has to worry about policy, it can be much simpler. In fact, all there is to it is the limiter= argument passed to run_sync_in_worker_thread(). This defaults to a global CapacityLimiter object, which gives us the classic fixed-size thread pool behavior. (See current_default_worker_thread_limiter().) But if you want to use “separate pools” for type A jobs and type B jobs, then it’s just a matter of creating two separate CapacityLimiter objects and passing them in when running these jobs. Or here’s an example of defining a custom policy that respects the global thread limit, while making sure that no individual user can use more than 3 threads at a time:

class CombinedLimiter:
     def __init__(self, first, second):
         self._first = first
         self._second = second

     async def acquire_on_behalf_of(self, borrower):
         # Acquire both, being careful to clean up properly on error
         await self._first.acquire_on_behalf_of(borrower)
         try:
             await self._second.acquire_on_behalf_of(borrower)
         except:
             self._first.release_on_behalf_of(borrower)
             raise

     def release_on_behalf_of(self, borrower):
         # Release both, being careful to clean up properly on error
         try:
             self._second.release_on_behalf_of(borrower)
         finally:
             self._first.release_on_behalf_of(borrower)


# Use a weak value dictionary, so that we don't waste memory holding
# limiter objects for users who don't have any worker threads running.
USER_LIMITERS = weakref.WeakValueDictionary()
MAX_THREADS_PER_USER = 3

def get_user_limiter(user_id):
    try:
        return USER_LIMITERS[user_id]
    except KeyError:
        per_user_limiter = trio.CapacityLimiter(MAX_THREADS_PER_USER)
        global_limiter = trio.current_default_worker_thread_limiter()
        # IMPORTANT: acquire the per_user_limiter before the global_limiter.
        # If we get 100 jobs for a user at the same time, we want
        # to only allow 3 of them at a time to even compete for the
        # global thread slots.
        combined_limiter = CombinedLimiter(per_user_limiter, global_limiter)
        USER_LIMITERS[user_id] = limiter
        return limiter


async def run_in_worker_thread_for_user(user_id, async_fn, *args, **kwargs):
    # *args belong to async_fn; **kwargs belong to run_sync_in_worker_thread
    kwargs["limiter"] = get_user_limiter(user_id)
    return await trio.run_sync_in_worker_thread(asycn_fn, *args, **kwargs)

Putting blocking I/O into worker threads

await trio.run_sync_in_worker_thread(sync_fn, *args, cancellable=False, limiter=None)

Convert a blocking operation into an async operation using a thread.

These two lines are equivalent:

sync_fn(*args)
await run_sync_in_worker_thread(sync_fn, *args)

except that if sync_fn takes a long time, then the first line will block the Trio loop while it runs, while the second line allows other Trio tasks to continue working while sync_fn runs. This is accomplished by pushing the call to sync_fn(*args) off into a worker thread.

Parameters:

Cancellation handling: Cancellation is a tricky issue here, because neither Python nor the operating systems it runs on provide any general mechanism for cancelling an arbitrary synchronous function running in a thread. run_sync_in_worker_thread() will always check for cancellation on entry, before starting the thread. But once the thread is running, there are two ways it can handle being cancelled:

  • If cancellable=False, the function ignores the cancellation and keeps going, just like if we had called sync_fn synchronously. This is the default behavior.

  • If cancellable=True, then run_sync_in_worker_thread immediately raises Cancelled. In this case the thread keeps running in background – we just abandon it to do whatever it’s going to do, and silently discard any return value or errors that it raises. Only use this if you know that the operation is safe and side-effect free. (For example: trio.socket.getaddrinfo() is implemented using run_sync_in_worker_thread(), and it sets cancellable=True because it doesn’t really affect anything if a stray hostname lookup keeps running in the background.)

    The limiter is only released after the thread has actually finished – which in the case of cancellation may be some time after run_sync_in_worker_thread() has returned. (This is why it’s crucial that run_sync_in_worker_thread() takes care of acquiring and releasing the limiter.) If trio.run() finishes before the thread does, then the limiter release method will never be called at all.

Warning

You should not use run_sync_in_worker_thread() to call long-running CPU-bound functions! In addition to the usual GIL-related reasons why using threads for CPU-bound work is not very effective in Python, there is an additional problem: on CPython, CPU-bound threads tend to “starve out” IO-bound threads, so using run_sync_in_worker_thread() for CPU-bound work is likely to adversely affect the main thread running trio. If you need to do this, you’re better off using a worker process, or perhaps PyPy (which still has a GIL, but may do a better job of fairly allocating CPU time between threads).

Returns:Whatever sync_fn(*args) returns.
Raises:Whatever sync_fn(*args) raises.
trio.current_default_worker_thread_limiter()

Get the default CapacityLimiter used by run_sync_in_worker_thread().

The most common reason to call this would be if you want to modify its total_tokens attribute.

Getting back into the trio thread from another thread

class trio.BlockingTrioPortal(trio_token=None)

A portal that synchronous threads can reach through to run code in the Trio thread.

Most Trio functions can only be called from the Trio thread, which is sometimes annoying. What if you really need to call a Trio function from a worker thread? That’s where BlockingTrioPortal comes in: its the rare Trio object whose methods can – in fact, must! – be called from a another thread, and it allows you to call all those other functions.

There is one complication: it’s possible for a single Python program to contain multiple calls to trio.run(), either in sequence – like in a test suite that calls trio.run() for each test – or simultaneously in different threads. So how do you control which trio.run() your portal opens into?

The answer is that each BlockingTrioPortal object is associated with one specific call to trio.run().

The simplest way to set this up is to instantiate the class with no arguments inside Trio; this automatically binds it to the context where you instantiate it:

async def some_function():
    portal = trio.BlockingTrioPortal()
    await trio.run_sync_in_worker_thread(sync_fn, portal)

Alternatively, you can pass an explicit trio.hazmat.TrioToken to specify the trio.run() that you want your portal to connect to.

run(afn, *args)

Run the given async function in the trio thread, blocking until it is complete.

Returns or raises whatever the given function returns or raises. It can also raise exceptions of its own:

Raises:
  • RunFinishedError – if the corresponding call to trio.run() has already completed.
  • Cancelled – if the corresponding call to trio.run() completes while afn(*args) is running, then afn is likely to raise Cancelled, and this will propagate out into
  • RuntimeError – if you try calling this from inside the Trio thread, which would otherwise cause a deadlock.
run_sync(fn, *args)

Run the given synchronous function in the trio thread, blocking until it is complete.

Returns or raises whatever the given function returns or raises. It can also exceptions of its own:

Raises:
  • RunFinishedError – if the corresponding call to trio.run() has already completed.
  • RuntimeError – if you try calling this from inside the Trio thread, which would otherwise cause a deadlock.

This will probably be clearer with an example. Here we demonstrate how to spawn a child thread, and then use a trio.Queue to send messages between the thread and a trio task:

import trio
import threading

def thread_fn(portal, request_queue, response_queue):
    while True:
        # Since we're in a thread, we can't call trio.Queue methods
        # directly -- so we use our portal to call them.
        request = portal.run(request_queue.get)
        # We use 'None' as a request to quit
        if request is not None:
            response = request + 1
            portal.run(response_queue.put, response)
        else:
            # acknowledge that we're shutting down, and then do it
            portal.run(response_queue.put, None)
            return

async def main():
    portal = trio.BlockingTrioPortal()
    request_queue = trio.Queue(1)
    response_queue = trio.Queue(1)
    thread = threading.Thread(
        target=thread_fn,
        args=(portal, request_queue, response_queue))
    thread.start()

    # prints "1"
    await request_queue.put(0)
    print(await response_queue.get())

    # prints "2"
    await request_queue.put(1)
    print(await response_queue.get())

    # prints "None"
    await request_queue.put(None)
    print(await response_queue.get())
    thread.join()

trio.run(main)

Exceptions and warnings

exception trio.Cancelled

Raised by blocking calls if the surrounding scope has been cancelled.

You should let this exception propagate, to be caught by the relevant cancel scope. To remind you of this, it inherits from BaseException instead of Exception, just like KeyboardInterrupt and SystemExit do. This means that if you write something like:

try:
    ...
except Exception:
    ...

then this won’t catch a Cancelled exception.

Note

In the US it’s also common to see this word spelled “canceled”, with only one “l”. This is a recent and US-specific innovation, and even in the US both forms are still commonly used. So for consistency with the rest of the world and with “cancellation” (which always has two “l”s), trio uses the two “l” spelling everywhere.

exception trio.TooSlowError

Raised by fail_after() and fail_at() if the timeout expires.

exception trio.WouldBlock

Raised by X_nowait functions if X would block.

exception trio.ResourceBusyError

Raised when a task attempts to use a resource that some other task is already using, and this would lead to bugs and nonsense.

For example, if two tasks try to send data through the same socket at the same time, trio will raise ResourceBusyError instead of letting the data get scrambled.

exception trio.RunFinishedError

Raised by run_in_trio_thread and similar functions if the corresponding call to trio.run() has already finished.

exception trio.TrioInternalError

Raised by run() if we encounter a bug in trio, or (possibly) a misuse of one of the low-level trio.hazmat APIs.

This should never happen! If you get this error, please file a bug.

Unfortunately, if you get this error it also means that all bets are off – trio doesn’t know what is going on and its normal invariants may be void. (For example, we might have “lost track” of a task. Or lost track of all tasks.) Again, though, this shouldn’t happen.

exception trio.TrioDeprecationWarning

Bases: FutureWarning

Warning emitted if you use deprecated Trio functionality.

As a young project, Trio is currently quite aggressive about deprecating and/or removing functionality that we realize was a bad idea. If you use Trio, you should subscribe to issue #1 to get information about upcoming deprecations and other backwards compatibility breaking changes.

Despite the name, this class currently inherits from FutureWarning, not DeprecationWarning, because while we’re in young-and-aggressive mode we want these warnings to be visible by default. You can hide them by installing a filter or with the -W switch: see the warnings documentation for details.

I/O in Trio

The abstract Stream API

Trio provides a set of abstract base classes that define a standard interface for unidirectional and bidirectional byte streams.

Why is this useful? Because it lets you write generic protocol implementations that can work over arbitrary transports, and easily create complex transport configurations. Here’s some examples:

  • trio.SocketStream wraps a raw socket (like a TCP connection over the network), and converts it to the standard stream interface.

  • trio.ssl.SSLStream is a “stream adapter” that can take any object that implements the trio.abc.Stream interface, and convert it into an encrypted stream. In trio the standard way to speak SSL over the network is to wrap an SSLStream around a SocketStream.

  • If you spawn a subprocess then you can get a SendStream that lets you write to its stdin, and a ReceiveStream that lets you read from its stdout. If for some reason you wanted to speak SSL to a subprocess, you could use a StapledStream to combine its stdin/stdout into a single bidirectional Stream, and then wrap that in an SSLStream:

    ssl_context = trio.ssl.create_default_context()
    ssl_context.check_hostname = False
    s = SSLStream(StapledStream(process.stdin, process.stdout), ssl_context)
    

    [Note: subprocess support is not implemented yet, but that’s the plan. Unless it is implemented, and I forgot to remove this note.]

  • It sometimes happens that you want to connect to an HTTPS server, but you have to go through a web proxy… and the proxy also uses HTTPS. So you end up having to do SSL-on-top-of-SSL. In trio this is trivial – just wrap your first SSLStream in a second SSLStream:

    # Get a raw SocketStream connection to the proxy:
    s0 = await open_tcp_stream("proxy", 443)
    
    # Set up SSL connection to proxy:
    s1 = SSLStream(s0, proxy_ssl_context, server_hostname="proxy")
    # Request a connection to the website
    await s1.send_all(b"CONNECT website:443 / HTTP/1.0\r\n")
    await check_CONNECT_response(s1)
    
    # Set up SSL connection to the real website. Notice that s1 is
    # already an SSLStream object, and here we're wrapping a second
    # SSLStream object around it.
    s2 = SSLStream(s1, website_ssl_context, server_hostname="website")
    # Make our request
    await s2.send_all("GET /index.html HTTP/1.0\r\n")
    ...
    
  • The trio.testing module provides a set of flexible in-memory stream object implementations, so if you have a protocol implementation to test then you can can start two tasks, set up a virtual “socket” connecting them, and then do things like inject random-but-repeatable delays into the connection.

Abstract base classes

Overview: abstract base classes for I/O
Abstract base class Inherits from… Adds these abstract methods… And these concrete methods. Example implementations
AsyncResource   aclose() __aenter__, __aexit__ Asynchronous file objects
SendStream AsyncResource send_all(), wait_send_all_might_not_block()   MemorySendStream
ReceiveStream AsyncResource receive_some()   MemoryReceiveStream
Stream SendStream, ReceiveStream     SSLStream
HalfCloseableStream Stream send_eof()   SocketStream, StapledStream
Listener AsyncResource accept()   SocketListener, SSLListener
class trio.abc.AsyncResource

A standard interface for resources that needs to be cleaned up, and where that cleanup may require blocking operations.

This class distinguishes between “graceful” closes, which may perform I/O and thus block, and a “forceful” close, which cannot. For example, cleanly shutting down a TLS-encrypted connection requires sending a “goodbye” message; but if a peer has become non-responsive, then sending this message might block forever, so we may want to just drop the connection instead. Therefore the aclose() method is unusual in that it should always close the connection (or at least make its best attempt) even if it fails; failure indicates a failure to achieve grace, not a failure to close the connection.

Objects that implement this interface can be used as async context managers, i.e., you can write:

async with create_resource() as some_async_resource:
    ...

Entering the context manager is synchronous (not a checkpoint); exiting it calls aclose(). The default implementations of __aenter__ and __aexit__ should be adequate for all subclasses.

abstractmethod await aclose()

Close this resource, possibly blocking.

IMPORTANT: This method may block in order to perform a “graceful” shutdown. But, if this fails, then it still must close any underlying resources before returning. An error from this method indicates a failure to achieve grace, not a failure to close the connection.

For example, suppose we call aclose() on a TLS-encrypted connection. This requires sending a “goodbye” message; but if the peer has become non-responsive, then our attempt to send this message might block forever, and eventually time out and be cancelled. In this case the aclose() method on SSLStream will immediately close the underlying transport stream using trio.aclose_forcefully() before raising Cancelled.

If the resource is already closed, then this method should silently succeed.

See also: trio.aclose_forcefully().

await trio.aclose_forcefully(resource)

Close an async resource or async generator immediately, without blocking to do any graceful cleanup.

AsyncResource objects guarantee that if their aclose() method is cancelled, then they will still close the resource (albeit in a potentially ungraceful fashion). aclose_forcefully() is a convenience function that exploits this behavior to let you force a resource to be closed without blocking: it works by calling await resource.aclose() and then cancelling it immediately.

Most users won’t need this, but it may be useful on cleanup paths where you can’t afford to block, or if you want to close an resource and don’t care about handling it gracefully. For example, if SSLStream encounters an error and cannot perform its own graceful close, then there’s no point in waiting to gracefully shut down the underlying transport either, so it calls await aclose_forcefully(self.transport_stream).

Note that this function is async, and that it acts as a checkpoint, but unlike most async functions it cannot block indefinitely (at least, assuming the underlying resource object is correctly implemented).

class trio.abc.SendStream

Bases: trio.abc.AsyncResource

A standard interface for sending data on a byte stream.

The underlying stream may be unidirectional, or bidirectional. If it’s bidirectional, then you probably want to also implement ReceiveStream, which makes your object a Stream.

Every SendStream also implements the AsyncResource interface.

abstractmethod await send_all(data)

Sends the given data through the stream, blocking if necessary.

Parameters:

data (bytes, bytearray, or memoryview) – The data to send.

Raises:

Most low-level operations in trio provide a guarantee: if they raise trio.Cancelled, this means that they had no effect, so the system remains in a known state. This is not true for send_all(). If this operation raises trio.Cancelled (or any other exception for that matter), then it may have sent some, all, or none of the requested data, and there is no way to know which.

abstractmethod await wait_send_all_might_not_block()

Block until it’s possible that send_all() might not block.

This method may return early: it’s possible that after it returns, send_all() will still block. (In the worst case, if no better implementation is available, then it might always return immediately without blocking. It’s nice to do better than that when possible, though.)

This method must not return late: if it’s possible for send_all() to complete without blocking, then it must return. When implementing it, err on the side of returning early.

Raises:

Note

This method is intended to aid in implementing protocols that want to delay choosing which data to send until the last moment. E.g., suppose you’re working on an implemention of a remote display server like VNC, and the network connection is currently backed up so that if you call send_all() now then it will sit for 0.5 seconds before actually sending anything. In this case it doesn’t make sense to take a screenshot, then wait 0.5 seconds, and then send it, because the screen will keep changing while you wait; it’s better to wait 0.5 seconds, then take the screenshot, and then send it, because this way the data you deliver will be more up-to-date. Using wait_send_all_might_not_block() makes it possible to implement the better strategy.

If you use this method, you might also want to read up on TCP_NOTSENT_LOWAT.

Further reading:

class trio.abc.ReceiveStream

Bases: trio.abc.AsyncResource

A standard interface for receiving data on a byte stream.

The underlying stream may be unidirectional, or bidirectional. If it’s bidirectional, then you probably want to also implement SendStream, which makes your object a Stream.

Every ReceiveStream also implements the AsyncResource interface.

abstractmethod await receive_some(max_bytes)

Wait until there is data available on this stream, and then return at most max_bytes of it.

A return value of b"" (an empty bytestring) indicates that the stream has reached end-of-file. Implementations should be careful that they return b"" if, and only if, the stream has reached end-of-file!

This method will return as soon as any data is available, so it may return fewer than max_bytes of data. But it will never return more.

Parameters:

max_bytes (int) – The maximum number of bytes to return. Must be greater than zero.

Returns:

The data received.

Return type:

bytes or bytearray

Raises:
class trio.abc.Stream

Bases: trio.abc.SendStream, trio.abc.ReceiveStream

A standard interface for interacting with bidirectional byte streams.

A Stream is an object that implements both the SendStream and ReceiveStream interfaces.

If implementing this interface, you should consider whether you can go one step further and implement HalfCloseableStream.

class trio.abc.HalfCloseableStream

Bases: trio.abc.Stream

This interface extends Stream to also allow closing the send part of the stream without closing the receive part.

abstractmethod await send_eof()

Send an end-of-file indication on this stream, if possible.

The difference between send_eof() and aclose() is that send_eof() is a unidirectional end-of-file indication. After you call this method, you shouldn’t try sending any more data on this stream, and your remote peer should receive an end-of-file indication (eventually, after receiving all the data you sent before that). But, they may continue to send data to you, and you can continue to receive it by calling receive_some(). You can think of it as calling aclose() on just the SendStream “half” of the stream object (and in fact that’s literally how trio.StapledStream implements it).

Examples:

  • On a socket, this corresponds to shutdown(..., SHUT_WR) (man page).
  • The SSH protocol provides the ability to multiplex bidirectional “channels” on top of a single encrypted connection. A trio implementation of SSH could expose these channels as HalfCloseableStream objects, and calling send_eof() would send an SSH_MSG_CHANNEL_EOF request (see RFC 4254 §5.3).
  • On an SSL/TLS-encrypted connection, the protocol doesn’t provide any way to do a unidirectional shutdown without closing the connection entirely, so SSLStream implements Stream, not HalfCloseableStream.

If an EOF has already been sent, then this method should silently succeed.

Raises:
exception trio.BrokenStreamError

Raised when an attempt to use a stream a stream fails due to external circumstances.

For example, you might get this if you try to send data on a stream where the remote side has already closed the connection.

You don’t get this error if you closed the stream – in that case you get ClosedStreamError.

This exception’s __cause__ attribute will often contain more information about the underlying error.

exception trio.ClosedStreamError

Raised when an attempt to use a stream fails because the stream was already closed locally.

You only get this error if your code closed the stream object you’re attempting to use by calling aclose() or similar. (send_all() might also raise this if you already called send_eof().) Therefore this exception generally indicates a bug in your code.

If a problem arises elsewhere, for example due to a network failure or a misbehaving peer, then you get BrokenStreamError instead.

class trio.abc.Listener

Bases: trio.abc.AsyncResource

A standard interface for listening for incoming connections.

abstractmethod await accept()

Wait until an incoming connection arrives, and then return it.

Returns:

an object representing the incoming connection. In

practice this is almost always some variety of Stream, though in principle you could also use this interface with, say, SOCK_SEQPACKET sockets or similar.

Return type:

AsyncResource

Raises:

Note that there is no BrokenListenerError, because for listeners there is no general condition of “the network/remote peer broke the connection” that can be handled in a generic way, like there is for streams. Other errors can occur and be raised from accept() – for example, if you run out of file descriptors then you might get an OSError with its errno set to EMFILE.

exception trio.ClosedListenerError

Raised when an attempt to use a listener fails because it was already closed locally.

You only get this error if your code closed the stream object you’re attempting to use by calling aclose() or similar. Therefore this exception generally indicates a bug in your code.

Generic stream tools

Trio currently provides a generic helper for writing servers that listen for connections using one or more Listeners, and a generic utility class for working with streams. And if you want to test code that’s written against the streams interface, you should also check out Streams in trio.testing.

await trio.serve_listeners(handler, listeners, *, handler_nursery=None, task_status=TASK_STATUS_IGNORED)

Listen for incoming connections on listeners, and for each one start a task running handler(stream).

Warning

If handler raises an exception, then this function doesn’t do anything special to catch it – so by default the exception will propagate out and crash your server. If you don’t want this, then catch exceptions inside your handler, or use a handler_nursery object that responds to exceptions in some other way.

Parameters:
  • handler – An async callable, that will be invoked like handler_nursery.start_soon(handler, stream) for each incoming connection.
  • listeners – A list of Listener objects. serve_listeners() takes responsibility for closing them.
  • handler_nursery – The nursery used to start handlers, or any object with a start_soon method. If None (the default), then serve_listeners() will create a new nursery internally and use that.
  • task_status – This function can be used with nursery.start, which will return listeners.
Returns:

This function never returns unless cancelled.

Resource handling:

If handler neglects to close the stream, then it will be closed using trio.aclose_forcefully().

Error handling:

Most errors coming from accept() are allowed to propagate out (crashing the server in the process). However, some errors – those which indicate that the server is temporarily overloaded – are handled specially. These are OSErrors with one of the following errnos:

  • EMFILE: process is out of file descriptors
  • ENFILE: system is out of file descriptors
  • ENOBUFS, ENOMEM: the kernel hit some sort of memory limitation when trying to create a socket object

When serve_listeners() gets one of these errors, then it:

  • Logs the error to the standard library logger trio.serve_listeners (level = ERROR, with exception information included). By default this causes it to be printed to stderr.
  • Waits 100 ms before calling accept again, in hopes that the system will recover.
class trio.StapledStream(send_stream, receive_stream)

Bases: trio.abc.HalfCloseableStream

This class staples together two unidirectional streams to make single bidirectional stream.

Parameters:
  • send_stream (SendStream) – The stream to use for sending.
  • receive_stream (ReceiveStream) – The stream to use for receiving.

Example

A silly way to make a stream that echoes back whatever you write to it:

left, right = trio.testing.memory_stream_pair()
echo_stream = StapledStream(SocketStream(left), SocketStream(right))
await echo_stream.send_all(b"x")
assert await echo_stream.receive_some(1) == b"x"

StapledStream objects implement the methods in the HalfCloseableStream interface. They also have two additional public attributes:

send_stream

The underlying SendStream. send_all() and wait_send_all_might_not_block() are delegated to this object.

receive_stream

The underlying ReceiveStream. receive_some() is delegated to this object.

await aclose()

Calls aclose on both underlying streams.

await receive_some(max_bytes)

Calls self.receive_stream.receive_some.

await send_all(data)

Calls self.send_stream.send_all.

await send_eof()

Shuts down the send side of the stream.

If self.send_stream.send_eof exists, then calls it. Otherwise, calls self.send_stream.aclose().

await wait_send_all_might_not_block()

Calls self.send_stream.wait_send_all_might_not_block.

Sockets and networking

The high-level network interface is built on top of our stream abstraction.

await trio.open_tcp_stream(host, port, *, happy_eyeballs_delay=0.3)

Connect to the given host and port over TCP.

If the given host has multiple IP addresses associated with it, then we have a problem: which one do we use?

One approach would be to attempt to connect to the first one, and then if that fails, attempt to connect to the second one … until we’ve tried all of them. But the problem with this is that if the first IP address is unreachable (for example, because it’s an IPv6 address and our network discards IPv6 packets), then we might end up waiting tens of seconds for the first connection attempt to timeout before we try the second address.

Another approach would be to attempt to connect to all of the addresses at the same time, in parallel, and then use whichever connection succeeds first, abandoning the others. This would be fast, but create a lot of unnecessary load on the network and the remote server.

This function strikes a balance between these two extremes: it works its way through the available addresses one at a time, like the first approach; but, if happy_eyeballs_delay seconds have passed and it’s still waiting for an attempt to succeed or fail, then it gets impatient and starts the next connection attempt in parallel. As soon as any one connection attempt succeeds, all the other attempts are cancelled. This avoids unnecessary load because most connections will succeed after just one or two attempts, but if one of the addresses is unreachable then it doesn’t slow us down too much.

This is known as a “happy eyeballs” algorithm, and our particular variant is modelled after how Chrome connects to webservers; see RFC 6555 for more details.

Parameters:
  • host (str or bytes) – The host to connect to. Can be an IPv4 address, IPv6 address, or a hostname.
  • port (int) – The port to connect to.
  • happy_eyeballs_delay (float) – How many seconds to wait for each connection attempt to succeed or fail before getting impatient and starting another one in parallel. Set to math.inf if you want to limit to only one connection attempt at a time (like socket.create_connection()). Default: 0.3 (300 ms).
Returns:

a Stream connected to the given server.

Return type:

SocketStream

Raises:

OSError – if the connection fails.

See also

open_ssl_over_tcp_stream

await trio.serve_tcp(handler, port, *, host=None, backlog=None, handler_nursery=None, task_status=TASK_STATUS_IGNORED)

Listen for incoming TCP connections, and for each one start a task running handler(stream).

This is a thin convenience wrapper around open_tcp_listeners() and serve_listeners() – see them for full details.

Warning

If handler raises an exception, then this function doesn’t do anything special to catch it – so by default the exception will propagate out and crash your server. If you don’t want this, then catch exceptions inside your handler, or use a handler_nursery object that responds to exceptions in some other way.

When used with nursery.start you get back the newly opened listeners. So, for example, if you want to start a server in your test suite and then connect to it to check that it’s working properly, you can use something like:

from trio.testing import open_stream_to_socket_listener

async with trio.open_nursery() as nursery:
    listeners = await nursery.start(serve_tcp, handler, 0)
    client_stream = await open_stream_to_socket_listener(listeners[0])

    # Then send and receive data on 'client', for example:
    await client.send_all(b"GET / HTTP/1.0\r\n\r\n")

This avoids several common pitfalls:

  1. It lets the kernel pick a random open port, so your test suite doesn’t depend on any particular port being open.
  2. It waits for the server to be accepting connections on that port before start returns, so there’s no race condition where the incoming connection arrives before the server is ready.
  3. It uses the Listener object to find out which port was picked, so it can connect to the right place.
Parameters:
  • handler – The handler to start for each incoming connection. Passed to serve_listeners().
  • port – The port to listen on. Use 0 to let the kernel pick an open port. Passed to open_tcp_listeners().
  • host (str, bytes, or None) – The host interface to listen on; use None to bind to the wildcard address. Passed to open_tcp_listeners().
  • backlog – The listen backlog, or None to have a good default picked. Passed to open_tcp_listeners().
  • handler_nursery – The nursery to start handlers in, or None to use an internal nursery. Passed to serve_listeners().
  • task_status – This function can be used with nursery.start.
Returns:

This function only returns when cancelled.

await trio.open_ssl_over_tcp_stream(host, port, *, https_compatible=False, ssl_context=None, happy_eyeballs_delay=0.3)

Make a TLS-encrypted Connection to the given host and port over TCP.

This is a convenience wrapper that calls open_tcp_stream() and wraps the result in an SSLStream.

This function does not perform the TLS handshake; you can do it manually by calling do_handshake(), or else it will be performed automatically the first time you send or receive data.

Parameters:
  • host (bytes or str) – The host to connect to. We require the server to have a TLS certificate valid for this hostname.
  • port (int) – The port to connect to.
  • https_compatible (bool) – Set this to True if you’re connecting to a web server. See SSLStream for details. Default: False.
  • ssl_context (SSLContext or None) – The SSL context to use. If None (the default), ssl.create_default_context() will be called to create a context.
  • happy_eyeballs_delay (float) – See open_tcp_stream().
Returns:

the encrypted connection to the server.

Return type:

trio.ssl.SSLStream

await trio.serve_ssl_over_tcp(handler, port, ssl_context, *, host=None, https_compatible=False, backlog=None, handler_nursery=None, task_status=TASK_STATUS_IGNORED)

Listen for incoming TCP connections, and for each one start a task running handler(stream).

This is a thin convenience wrapper around open_ssl_over_tcp_listeners() and serve_listeners() – see them for full details.

Warning

If handler raises an exception, then this function doesn’t do anything special to catch it – so by default the exception will propagate out and crash your server. If you don’t want this, then catch exceptions inside your handler, or use a handler_nursery object that responds to exceptions in some other way.

When used with nursery.start you get back the newly opened listeners. See the documentation for serve_tcp() for an example where this is useful.

Parameters:
  • handler – The handler to start for each incoming connection. Passed to serve_listeners().
  • port (int) – The port to listen on. Use 0 to let the kernel pick an open port. Ultimately passed to open_tcp_listeners().
  • ssl_context (SSLContext) – The SSL context to use for all incoming connections. Passed to open_ssl_over_tcp_listeners().
  • host (str, bytes, or None) – The address to bind to; use None to bind to the wildcard address. Ultimately passed to open_tcp_listeners().
  • https_compatible (bool) – Set this to True if you want to use “HTTPS-style” TLS. See SSLStream for details.
  • backlog (int or None) – See SSLStream for details.
  • handler_nursery – The nursery to start handlers in, or None to use an internal nursery. Passed to serve_listeners().
  • task_status – This function can be used with nursery.start.
Returns:

This function only returns when cancelled.

class trio.SocketStream(socket)

Bases: trio.abc.HalfCloseableStream

An implementation of the trio.abc.HalfCloseableStream interface based on a raw network socket.

Parameters:socket – The trio socket object to wrap. Must have type SOCK_STREAM, and be connected.

By default, SocketStream enables TCP_NODELAY, and (on platforms where it’s supported) enables TCP_NOTSENT_LOWAT with a reasonable buffer size (currently 16 KiB) – see issue #72 for discussion. You can of course override these defaults by calling setsockopt().

Once a SocketStream object is constructed, it implements the full trio.abc.HalfCloseableStream interface. In addition, it provides a few extra features:

socket

The Trio socket object that this stream wraps.

await aclose()
getsockopt(level, option, buffersize=0)

Check the current value of an option on the underlying socket.

See socket.socket.getsockopt() for details.

await receive_some(max_bytes)
await send_all(data)
await send_eof()
setsockopt(level, option, value)

Set an option on the underlying socket.

See socket.socket.setsockopt() for details.

await wait_send_all_might_not_block()
class trio.SocketListener(socket)

Bases: trio.abc.Listener

A Listener that uses a listening socket to accept incoming connections as SocketStream objects.

Parameters:socket – The trio socket object to wrap. Must have type SOCK_STREAM, and be listening.

Note that the SocketListener “takes ownership” of the given socket; closing the SocketListener will also close the socket.

socket

The Trio socket object that this stream wraps.

await accept()

Accept an incoming connection.

Returns:

SocketStream

Raises:
  • OSError – if the underlying call to accept raises an unexpected error.
  • ClosedListenerError – if you already closed the socket.

This method handles routine errors like ECONNABORTED, but passes other errors on to its caller. In particular, it does not make any special effort to handle resource exhaustion errors like EMFILE, ENFILE, ENOBUFS, ENOMEM.

await aclose()

Close this listener and its underlying socket.

await trio.open_tcp_listeners(port, *, host=None, backlog=None)

Create SocketListener objects to listen for TCP connections.

Parameters:
  • port (int) –

    The port to listen on.

    If you use 0 as your port, then the kernel will automatically pick an arbitrary open port. But be careful: if you use this feature when binding to multiple IP addresses, then each IP address will get its own random port, and the returned listeners will probably be listening on different ports. In particular, this will happen if you use host=None – which is the default – because in this case open_tcp_listeners() will bind to both the IPv4 wildcard address (0.0.0.0) and also the IPv6 wildcard address (::).

  • host (str, bytes-like, or None) –

    The local interface to bind to. This is passed to getaddrinfo() with the AI_PASSIVE flag set.

    If you want to bind to bind to the wildcard address on both IPv4 and IPv6, in order to accept connections on all available interfaces, then pass None. This is the default.

    If you have a specific interface you want to bind to, pass its IP address or hostname here. If a hostname resolves to multiple IP addresses, this function will open one listener on each of them.

    If you want to use only IPv4, or only IPv6, but want to accept on all interfaces, pass the family-specific wildcard address: "0.0.0.0" for IPv4-only and "::" for IPv6-only.

  • backlog (int or None) – The listen backlog to use. If you leave this as None then Trio will pick a good default. (Currently: whatever your system has configured as the maximum backlog.)
Returns:

list of SocketListener

await trio.open_ssl_over_tcp_listeners(port, ssl_context, *, host=None, https_compatible=False, backlog=None)

Start listening for SSL/TLS-encrypted TCP connections to the given port.

Parameters:

SSL / TLS support

The trio.ssl module implements SSL/TLS support for Trio, using the standard library ssl module. It re-exports most of ssl´s API, with the notable exception is ssl.SSLContext, which has unsafe defaults; if you really want to use ssl.SSLContext you can import it from ssl, but normally you should create your contexts using trio.ssl.create_default_context.

Instead of using ssl.SSLContext.wrap_socket(), though, you create a SSLStream:

class trio.ssl.SSLStream(transport_stream, ssl_context, *, server_hostname=None, server_side=False, https_compatible=False, max_refill_bytes=32768)

Bases: trio.abc.Stream

Encrypted communication using SSL/TLS.

SSLStream wraps an arbitrary Stream, and allows you to perform encrypted communication over it using the usual Stream interface. You pass regular data to send_all(), then it encrypts it and sends the encrypted data on the underlying Stream; receive_some() takes encrypted data out of the underlying Stream and decrypts it before returning it.

You should read the standard library’s ssl documentation carefully before attempting to use this class, and probably other general documentation on SSL/TLS as well. SSL/TLS is subtle and quick to anger. Really. I’m not kidding.

Parameters:
  • transport_stream (Stream) – The stream used to transport encrypted data. Required.
  • ssl_context (SSLContext) – The SSLContext used for this connection. Required. Usually created by calling trio.ssl.create_default_context().
  • server_hostname (str or None) – The name of the server being connected to. Used for SNI and for validating the server’s certificate (if hostname checking is enabled). This is effectively mandatory for clients, and actually mandatory if ssl_context.check_hostname is True.
  • server_side (bool) – Whether this stream is acting as a client or server. Defaults to False, i.e. client mode.
  • https_compatible (bool) –

    There are two versions of SSL/TLS commonly encountered in the wild: the standard version, and the version used for HTTPS (HTTP-over-SSL/TLS).

    Standard-compliant SSL/TLS implementations always send a cryptographically signed close_notify message before closing the connection. This is important because if the underlying transport were simply closed, then there wouldn’t be any way for the other side to know whether the connection was intentionally closed by the peer that they negotiated a cryptographic connection to, or by some man-in-the-middle attacker who can’t manipulate the cryptographic stream, but can manipulate the transport layer (a so-called “truncation attack”).

    However, this part of the standard is widely ignored by real-world HTTPS implementations, which means that if you want to interoperate with them, then you NEED to ignore it too.

    Fortunately this isn’t as bad as it sounds, because the HTTP protocol already includes its own equivalent of close_notify, so doing this again at the SSL/TLS level is redundant. But not all protocols do! Therefore, by default Trio implements the safer standard-compliant version (https_compatible=False). But if you’re speaking HTTPS or some other protocol where close_notifys are commonly skipped, then you should set https_compatible=True; with this setting, Trio will neither expect nor send close_notify messages.

    If you have code that was written to use ssl.SSLSocket and now you’re porting it to Trio, then it may be useful to know that a difference between SSLStream and ssl.SSLSocket is that SSLSocket implements the https_compatible=True behavior by default.

  • max_refill_bytes (int) – SSLSocket maintains an internal buffer of incoming data, and when it runs low then it calls receive_some() on the underlying transport stream to refill it. This argument lets you set the max_bytes argument passed to the underlying receive_some() call. It doesn’t affect calls to this class’s receive_some(), or really anything else user-observable except possibly performance. You probably don’t need to worry about this.
transport_stream

trio.abc.Stream – The underlying transport stream that was passed to __init__. An example of when this would be useful is if you’re using SSLStream over a SocketStream and want to call the SocketStream’s setsockopt() method.

Internally, this class is implemented using an instance of ssl.SSLObject, and all of SSLObject’s methods and attributes are re-exported as methods and attributes on this class.

This also means that if you register a SNI callback using set_servername_callback(), then the first argument your callback receives will be a ssl.SSLObject.

await aclose()

Gracefully shut down this connection, and close the underlying transport.

If https_compatible is False (the default), then this attempts to first send a close_notify and then close the underlying stream by calling its aclose() method.

If https_compatible is set to True, then this simply closes the underlying stream and marks this stream as closed.

await do_handshake()

Ensure that the initial handshake has completed.

The SSL protocol requires an initial handshake to exchange certificates, select cryptographic keys, and so forth, before any actual data can be sent or received. You don’t have to call this method; if you don’t, then SSLStream will automatically peform the handshake as needed, the first time you try to send or receive data. But if you want to trigger it manually – for example, because you want to look at the peer’s certificate before you start talking to them – then you can call this method.

If the initial handshake is already in progress in another task, this waits for it to complete and then returns.

If the initial handshake has already completed, this returns immediately without doing anything (except executing a checkpoint).

Warning

If this method is cancelled, then it may leave the SSLStream in an unusable state. If this happens then any future attempt to use the object will raise trio.BrokenStreamError.

await receive_some(max_bytes)

Read some data from the underlying transport, decrypt it, and return it.

See trio.abc.ReceiveStream.receive_some() for details.

Warning

If this method is cancelled while the initial handshake or a renegotiation are in progress, then it may leave the SSLStream in an unusable state. If this happens then any future attempt to use the object will raise trio.BrokenStreamError.

await send_all(data)

Encrypt some data and then send it on the underlying transport.

See trio.abc.SendStream.send_all() for details.

Warning

If this method is cancelled, then it may leave the SSLStream in an unusable state. If this happens then any attempt to use the object will raise trio.BrokenStreamError.

await unwrap()

Cleanly close down the SSL/TLS encryption layer, allowing the underlying stream to be used for unencrypted communication.

You almost certainly don’t need this.

Returns:A pair (transport_stream, trailing_bytes), where transport_stream is the underlying transport stream, and trailing_bytes is a byte string. Since SSLStream doesn’t necessarily know where the end of the encrypted data will be, it can happen that it accidentally reads too much from the underlying stream. trailing_bytes contains this extra data; you should process it as if it was returned from a call to transport_stream.receive_some(...).
await wait_send_all_might_not_block()

See trio.abc.SendStream.wait_send_all_might_not_block().

And if you’re implementing a server, you can use SSLListener:

class trio.ssl.SSLListener(transport_listener, ssl_context, *, https_compatible=False, max_refill_bytes=32768)

Bases: trio.abc.Listener

A Listener for SSL/TLS-encrypted servers.

SSLListener allows you to wrap

Parameters:
transport_listener

trio.abc.Listener – The underlying listener that was passed to __init__.

await accept()

Accept the next connection and wrap it in an SSLStream.

See trio.abc.Listener.accept() for details.

await aclose()

Close the transport listener.

Low-level networking with trio.socket

The trio.socket module provides trio’s basic low-level networking API. If you’re doing ordinary things with stream-oriented connections over IPv4/IPv6/Unix domain sockets, then you probably want to stick to the high-level API described above. If you want to use UDP, or exotic address families like AF_BLUETOOTH, or otherwise get direct access to all the quirky bits of your system’s networking API, then you’re in the right place.

Top-level exports

Generally, the API exposed by trio.socket mirrors that of the standard library socket module. Most constants (like SOL_SOCKET) and simple utilities (like inet_aton()) are simply re-exported unchanged. But there are also some differences, which are described here.

First, Trio provides analogues to all the standard library functions that return socket objects; their interface is identical, except that they’re modified to return trio socket objects instead:

trio.socket.socket(family=<AddressFamily.AF_INET: 2>, type=<SocketKind.SOCK_STREAM: 1>, proto=0, fileno=None)

Create a new trio socket, like socket.socket().

This function’s behavior can be customized using set_custom_socket_factory().

trio.socket.socketpair(family=None, type=<SocketKind.SOCK_STREAM: 1>, proto=0)

Like socket.socketpair(), but returns a pair of trio socket objects.

trio.socket.fromfd(fd, family, type, proto=0)

Like socket.fromfd(), but returns a trio socket object.

trio.socket.fromshare(data)

Like socket.fromshare(), but returns a trio socket object.

In addition, there is a new function to directly convert a standard library socket into a trio socket:

trio.socket.from_stdlib_socket(sock)

Convert a standard library socket.socket() object into a trio socket object.

Unlike socket.socket(), trio.socket.socket() is a function, not a class; if you want to check whether an object is a trio socket, use isinstance(obj, trio.socket.SocketType).

For name lookup, Trio provides the standard functions, but with some changes:

await trio.socket.getaddrinfo(host, port, family=0, type=0, proto=0, flags=0)

Look up a numeric address given a name.

Arguments and return values are identical to socket.getaddrinfo(), except that this version is async.

Also, trio.socket.getaddrinfo() correctly uses IDNA 2008 to process non-ASCII domain names. (socket.getaddrinfo() uses IDNA 2003, which can give the wrong result in some cases and cause you to connect to a different host than the one you intended; see bpo-17305.)

This function’s behavior can be customized using set_custom_hostname_resolver().

await trio.socket.getnameinfo(sockaddr, flags)

Look up a name given a numeric address.

Arguments and return values are identical to socket.getnameinfo(), except that this version is async.

This function’s behavior can be customized using set_custom_hostname_resolver().

await trio.socket.getprotobyname(name)

Look up a protocol number by name. (Rarely used.)

Like socket.getprotobyname(), but async.

Trio intentionally DOES NOT include some obsolete, redundant, or broken features:

Socket objects

class trio.socket.SocketType

Note

trio.socket.SocketType is an abstract class and cannot be instantiated directly; you get concrete socket objects by calling constructors like trio.socket.socket(). However, you can use it to check if an object is a Trio socket via isinstance(obj, trio.socket.SocketType).

Trio socket objects are overall very similar to the standard library socket objects, with a few important differences:

First, and most obviously, everything is made “trio-style”: blocking methods become async methods, and the following attributes are not supported:

  • setblocking(): trio sockets always act like blocking sockets; if you need to read/write from multiple sockets at once, then create multiple tasks.
  • settimeout(): see Cancellation and timeouts instead.
  • makefile(): Python’s file-like API is synchronous, so it can’t be implemented on top of an async socket.
  • sendall(): Could be supported, but you’re better off using the higher-level SocketStream, and specifically its send_all() method, which also does additional error checking.

In addition, the following methods are similar to the equivalents in socket.socket(), but have some trio-specific quirks:

await connect()

Connect the socket to a remote address.

Similar to socket.socket.connect(), except async.

Warning

Due to limitations of the underlying operating system APIs, it is not always possible to properly cancel a connection attempt once it has begun. If connect() is cancelled, and is unable to abort the connection attempt, then it will:

  1. forcibly close the socket to prevent accidental re-use
  2. raise Cancelled.

tl;dr: if connect() is cancelled then the socket is left in an unknown state – possibly open, and possibly closed. The only reasonable thing to do is to close it.

sendfile()

Not implemented yet!

We also keep track of an extra bit of state, because it turns out to be useful for trio.SocketStream:

did_shutdown_SHUT_WR

This bool attribute it True if you’ve called sock.shutdown(SHUT_WR) or sock.shutdown(SHUT_RDWR), and False otherwise.

The following methods are identical to their equivalents in socket.socket(), except async, and the ones that take address arguments require pre-resolved addresses:

All methods and attributes not mentioned above are identical to their equivalents in socket.socket():

Asynchronous filesystem I/O

Trio provides built-in facilities for performing asynchronous filesystem operations like reading or renaming a file. Generally, we recommend that you use these instead of Python’s normal synchronous file APIs. But the tradeoffs here are somewhat subtle: sometimes people switch to async I/O, and then they’re surprised and confused when they find it doesn’t speed up their program. The next section explains the theory behind async file I/O, to help you better understand your code’s behavior. Or, if you just want to get started, you can jump down to the API overview.

Background: Why is async file I/O useful? The answer may surprise you

Many people expect that switching to from synchronous file I/O to async file I/O will always make their program faster. This is not true! If we just look at total throughput, then async file I/O might be faster, slower, or about the same, and it depends in a complicated way on things like your exact patterns of disk access how much RAM you have. The main motivation for async file I/O is not to improve throughput, but to reduce the frequency of latency glitches.

To understand why, you need to know two things.

First, right now no mainstream operating system offers a generic, reliable, native API for async file for filesystem operations, so we have to fake it by using threads (specifically, run_sync_in_worker_thread()). This is cheap but isn’t free: on a typical PC, dispatching to a worker thread adds something like ~100 µs of overhead to each operation. (“µs” is pronounced “microseconds”, and there are 1,000,000 µs in a second. Note that all the numbers here are going to be rough orders of magnitude to give you a sense of scale; if you need precise numbers for your environment, measure!)

And second, the cost of a disk operation is incredibly bimodal. Sometimes, the data you need is already cached in RAM, and then accessing it is very, very fast – calling io.FileIO’s read method on a cached file takes on the order of ~1 µs. But when the data isn’t cached, then accessing it is much, much slower: the average is ~100 µs for SSDs and ~10,000 µs for spinning disks, and if you look at tail latencies then for both types of storage you’ll see cases where occasionally some operation will be 10x or 100x slower than average. And that’s assuming your program is the only thing trying to use that disk – if you’re on some oversold cloud VM fighting for I/O with other tenants then who knows what will happen. And some operations can require multiple disk accesses.

Putting these together: if your data is in RAM then it should be clear that using a thread is a terrible idea – if you add 100 µs of overhead to a 1 µs operation, then that’s a 100x slowdown! On the other hand, if your data’s on a spinning disk, then using a thread is great – instead of blocking the main thread and all tasks for 10,000 µs, we only block them for 100 µs and can spend the rest of that time running other tasks to get useful work done, which can effectively be a 100x speedup.

But here’s the problem: for any individual I/O operation, there’s no way to know in advance whether it’s going to be one of the fast ones or one of the slow ones, so you can’t pick and choose. When you switch to async file I/O, it makes all the fast operations slower, and all the slow operations faster. Is that a win? In terms of overall speed, it’s hard to say: it depends what kind of disks you’re using and your kernel’s disk cache hit rate, which in turn depends on your file access patterns, how much spare RAM you have, the load on your service, … all kinds of things. If the answer is important to you, then there’s no substitute for measuring your code’s actual behavior in your actual deployment environment. But what we can say is that async disk I/O makes performance much more predictable across a wider range of runtime conditions.

If you’re not sure what to do, then we recommend that you use async disk I/O by default, because it makes your code more robust when conditions are bad, especially with regards to tail latencies; this improves the chances that what your users see matches what you saw in testing. Blocking the main thread stops all tasks from running for that time. 10,000 µs is 10 ms, and it doesn’t take many 10 ms glitches to start adding up to real money; async disk I/O can help prevent those. Just don’t expect it to be magic, and be aware of the tradeoffs.

API overview

If you want to perform general filesystem operations like creating and listing directories, renaming files, or checking file metadata – or if you just want a friendly way to work with filesystem paths – then you want trio.Path. It’s an asyncified replacement for the standard library’s pathlib.Path, and provides the same comprehensive set of operations.

For reading and writing to files and file-like objects, Trio also provides a mechanism for wrapping any synchronous file-like object into an asynchronous interface. If you have a trio.Path object you can get one of these by calling its open() method; or if you know the file’s name you can open it directly with trio.open_file(). Alternatively, if you already have an open file-like object, you can wrap it with trio.wrap_file() – one case where this is especially useful is to wrap io.BytesIO or io.StringIO when writing tests.

Asynchronous path objects

class trio.Path(*args)

A pathlib.Path wrapper that executes blocking methods in trio.run_sync_in_worker_thread().

as_posix()

Return the string representation of the path with forward (/) slashes.

as_uri()

Return the path as a ‘file’ URI.

await chmod(*args, **kwargs)

Like chmod(), but async.

classmethod cwd()

Return a new path pointing to the current working directory (as returned by os.getcwd()).

await exists(*args, **kwargs)

Like exists(), but async.

await expanduser(*args, **kwargs)

Like expanduser(), but async.

await glob(*args, **kwargs)

Like glob(), but async.

await group(*args, **kwargs)

Like group(), but async.

classmethod home()

Return a new path pointing to the user’s home directory (as returned by os.path.expanduser(‘~’)).

is_absolute()

True if the path is absolute (has both a root and, if applicable, a drive).

await is_block_device(*args, **kwargs)

Like is_block_device(), but async.

await is_char_device(*args, **kwargs)

Like is_char_device(), but async.

await is_dir(*args, **kwargs)

Like is_dir(), but async.

await is_fifo(*args, **kwargs)

Like is_fifo(), but async.

await is_file(*args, **kwargs)

Like is_file(), but async.

is_reserved()

Return True if the path contains one of the special names reserved by the system, if any.

await is_socket(*args, **kwargs)

Like is_socket(), but async.

Like is_symlink(), but async.

await iterdir(*args, **kwargs)

Like iterdir(), but async.

joinpath(*args)

Combine this path with one or several arguments, and return a new path representing either a subpath (if all arguments are relative paths) or a totally different path (if one of the arguments is anchored).

await lchmod(*args, **kwargs)

Like lchmod(), but async.

await lstat(*args, **kwargs)

Like lstat(), but async.

match(path_pattern)

Return True if this path matches the given pattern.

await mkdir(*args, **kwargs)

Like mkdir(), but async.

await open(mode='r', buffering=-1, encoding=None, errors=None, newline=None)

Open the file pointed by this path and return a file object, as the built-in open() function does.

await owner(*args, **kwargs)

Like owner(), but async.

await read_bytes(*args, **kwargs)

Like read_bytes(), but async.

await read_text(*args, **kwargs)

Like read_text(), but async.

relative_to(*other)

Return the relative path to another path identified by the passed arguments. If the operation is not possible (because this is not a subpath of the other path), raise ValueError.

await rename(*args, **kwargs)

Like rename(), but async.

await replace(*args, **kwargs)

Like replace(), but async.

await resolve(*args, **kwargs)

Like resolve(), but async.

await rglob(*args, **kwargs)

Like rglob(), but async.

await rmdir(*args, **kwargs)

Like rmdir(), but async.

await samefile(*args, **kwargs)

Like samefile(), but async.

await stat(*args, **kwargs)

Like stat(), but async.

Like symlink_to(), but async.

await touch(*args, **kwargs)

Like touch(), but async.

Like unlink(), but async.

with_name(name)

Return a new path with the file name changed.

with_suffix(suffix)

Return a new path with the file suffix changed (or added, if none).

await write_bytes(*args, **kwargs)

Like write_bytes(), but async.

await write_text(*args, **kwargs)

Like write_text(), but async.

Asynchronous file objects

await trio.open_file(file, mode='r', buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None)

Asynchronous version of io.open().

Returns:An asynchronous file object

Example:

async with await trio.open_file(filename) as f:
    async for line in f:
        pass

assert f.closed

See also

trio.Path.open()

trio.wrap_file(file)

This wraps any file object in a wrapper that provides an asynchronous file object interface.

Parameters:file – a file object
Returns:An asynchronous file object that wraps file

Example:

async_file = trio.wrap_file(StringIO('asdf'))

assert await async_file.read() == 'asdf'
Asynchronous file interface

Trio’s asynchronous file objects have an interface that automatically adapts to the object being wrapped. Intuitively, you can mostly treat them like a regular file object, except adding an await in front of any of methods that do I/O. The definition of file object is a little vague in Python though, so here are the details:

  • Synchronous attributes/methods: if any of the following attributes or methods are present, then they’re re-exported unchanged: closed, encoding, errors, fileno, isatty, newlines, readable, seekable, writable, buffer, raw, line_buffering, closefd, name, mode, getvalue, getbuffer.
  • Async methods: if any of the following methods are present, then they’re re-exported as an async method: flush, read, read1, readall, readinto, readline, readlines, seek, tell, truncate, write, writelines, readinto1, peek, detach.

Special notes:

  • Async file objects implement trio’s AsyncResource interface: you close them by calling aclose() instead of close (!!), and they can be used as async context managers. Like all aclose() methods, the aclose method on async file objects is guaranteed to close the file before returning, even if it is cancelled or otherwise raises an error.

  • Using the same async file object from multiple tasks simultaneously: because the async methods on async file objects are implemented using threads, it’s only safe to call two of them at the same time from different tasks IF the underlying synchronous file object is thread-safe. You should consult the documentation for the object you’re wrapping. For objects returned from trio.open_file() or trio.Path.open(), it depends on whether you open the file in binary mode or text mode: binary mode files are task-safe/thread-safe, text mode files are not.

  • Async file objects can be used as async iterators to iterate over the lines of the file:

    async with trio.open_file(...) as f:
        async for line in f:
            print(line)
    
  • The detach method, if present, returns an async file object.

This should include all the attributes exposed by classes in io. But if you’re wrapping an object that has other attributes that aren’t on the list above, then you can access them via the .wrapped attribute:

wrapped

The underlying synchronous file object.

Subprocesses

Not implemented yet!

Signals

with trio.catch_signals(signals) as batched_signal_aiter

A context manager for catching signals.

Entering this context manager starts listening for the given signals and returns an async iterator; exiting the context manager stops listening.

The async iterator blocks until at least one signal has arrived, and then yields a set containing all of the signals that were received since the last iteration.

Note that if you leave the with block while the iterator has unextracted signals still pending inside it, then they will be re-delivered using Python’s regular signal handling logic. This avoids a race condition when signals arrives just before we exit the with block.

Parameters:signals – a set of signals to listen for.
Raises:RuntimeError – if you try to use this anywhere except Python’s main thread. (This is a Python limitation.)

Example

A common convention for Unix daemons is that they should reload their configuration when they receive a SIGHUP. Here’s a sketch of what that might look like using catch_signals():

with trio.catch_signals({signal.SIGHUP}) as batched_signal_aiter:
    async for batch in batched_signal_aiter:
        # We're only listening for one signal, so the batch is always
        # {signal.SIGHUP}, but if we were listening to more signals
        # then it could vary.
        for signum in batch:
            assert signum == signal.SIGHUP
            reload_configuration()

Testing made easier with trio.testing

The trio.testing module provides various utilities to make it easier to test trio code. Unlike the other submodules in the trio namespace, trio.testing is not automatically imported when you do import trio; you must import trio.testing explicitly.

Test harness integration

@trio.testing.trio_test

Time and timeouts

trio.testing.MockClock is a Clock with a few tricks up its sleeve to help you efficiently test code involving timeouts:

  • By default, it starts at time 0, and clock time only advances when you explicitly call jump(). This provides an extremely controllable clock for testing.
  • You can set rate to 1.0 if you want it to start running in real time like a regular clock. You can stop and start the clock within a test. You can set rate to 10.0 to make clock time pass at 10x real speed (so e.g. await trio.sleep(10) returns after 1 second).
  • But even more interestingly, you can set autojump_threshold to zero or a small value, and then it will watch the execution of the run loop, and any time things have settled down and everyone’s waiting for a timeout, it jumps the clock forward to that timeout. In many cases this allows natural-looking code involving timeouts to be automatically run at near full CPU utilization with no changes. (Thanks to fluxcapacitor for this awesome idea.)
  • And of course these can be mixed and matched at will.

Regardless of these shenanigans, from “inside” trio the passage of time still seems normal so long as you restrict yourself to trio’s time functions (see Time and clocks). Below is an example demonstrating two different ways of making time pass quickly. Notice how in both cases, the two tasks keep a consistent view of reality and events happen in the expected order, despite being wildly divorced from real time:

# across-realtime.py

import time
import trio
import trio.testing

YEAR = 365 * 24 * 60 * 60  # seconds

async def task1():
    start = trio.current_time()

    print("task1: sleeping for 1 year")
    await trio.sleep(YEAR)

    duration = trio.current_time() - start
    print("task1: woke up; clock says I've slept {} years"
          .format(duration / YEAR))

    print("task1: sleeping for 1 year, 100 times")
    for _ in range(100):
        await trio.sleep(YEAR)

    duration = trio.current_time() - start
    print("task1: slept {} years total".format(duration / YEAR))

async def task2():
    start = trio.current_time()

    print("task2: sleeping for 5 years")
    await trio.sleep(5 * YEAR)

    duration = trio.current_time() - start
    print("task2: woke up; clock says I've slept {} years"
          .format(duration / YEAR))

    print("task2: sleeping for 500 years")
    await trio.sleep(500 * YEAR)

    duration = trio.current_time() - start
    print("task2: slept {} years total".format(duration / YEAR))

async def main():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(task1)
        nursery.start_soon(task2)

def run_example(clock):
    real_start = time.monotonic()
    trio.run(main, clock=clock)
    real_duration = time.monotonic() - real_start
    print("Total real time elapsed: {} seconds".format(real_duration))

print("Clock where time passes at 100 years per second:\n")
run_example(trio.testing.MockClock(rate=100 * YEAR))

print("\nClock where time automatically skips past the boring parts:\n")
run_example(trio.testing.MockClock(autojump_threshold=0))

Output:

Clock where time passes at 100 years per second:

task2: sleeping for 5 years
task1: sleeping for 1 year
task1: woke up; clock says I've slept 1.0365006048232317 years
task1: sleeping for 1 year, 100 times
task2: woke up; clock says I've slept 5.0572111969813704 years
task2: sleeping for 500 years
task1: slept 104.77677842136472 years total
task2: slept 505.25014589075 years total
Total real time elapsed: 5.053582429885864 seconds

Clock where time automatically skips past the boring parts:

task2: sleeping for 5 years
task1: sleeping for 1 year
task1: woke up; clock says I've slept 1.0 years
task1: sleeping for 1 year, 100 times
task2: woke up; clock says I've slept 5.0 years
task2: sleeping for 500 years
task1: slept 101.0 years total
task2: slept 505.0 years total
Total real time elapsed: 0.019298791885375977 seconds
class trio.testing.MockClock(rate=0.0, autojump_threshold=inf)

A user-controllable clock suitable for writing tests.

Parameters:
rate

How many seconds of clock time pass per second of real time. Default is 0.0, i.e. the clock only advances through manuals calls to jump() or when the autojump_threshold is triggered. You can assign to this attribute to change it.

autojump_threshold

The clock keeps an eye on the run loop, and if at any point it detects that all tasks have been blocked for this many real seconds (i.e., according to the actual clock, not this clock), then the clock automatically jumps ahead to the run loop’s next scheduled timeout. Default is math.inf, i.e., to never autojump. You can assign to this attribute to change it.

Basically the idea is that if you have code or tests that use sleeps and timeouts, you can use this to make it run much faster, totally automatically. (At least, as long as those sleeps/timeouts are happening inside trio; if your test involves talking to external service and waiting for it to timeout then obviously we can’t help you there.)

You should set this to the smallest value that lets you reliably avoid “false alarms” where some I/O is in flight (e.g. between two halves of a socketpair) but the threshold gets triggered and time gets advanced anyway. This will depend on the details of your tests and test environment. If you aren’t doing any I/O (like in our sleeping example above) then just set it to zero, and the clock will jump whenever all tasks are blocked.

Warning

If you’re using wait_all_tasks_blocked() and autojump_threshold together, then you have to be careful. Setting autojump_threshold acts like a background task calling:

while True:
    await wait_all_tasks_blocked(
      cushion=clock.autojump_threshold, tiebreaker=float("inf"))

This means that if you call wait_all_tasks_blocked() with a cushion larger than your autojump threshold, then your call to wait_all_tasks_blocked() will never return, because the autojump task will keep waking up before your task does, and each time it does it’ll reset your task’s timer. However, if your cushion and the autojump threshold are the same, then the autojump’s tiebreaker will prevent them from interfering (unless you also set your tiebreaker to infinity for some reason. Don’t do that). As an important special case: this means that if you set an autojump threshold of zero and use wait_all_tasks_blocked() with the default zero cushion, then everything will work fine.

Summary: you should set autojump_threshold to be at least as large as the largest cushion you plan to pass to wait_all_tasks_blocked().

jump(seconds)

Manually advance the clock by the given number of seconds.

Parameters:seconds (float) – the number of seconds to jump the clock forward.
Raises:ValueError – if you try to pass a negative value for seconds.

Inter-task ordering

class trio.testing.Sequencer

A convenience class for forcing code in different tasks to run in an explicit linear order.

Instances of this class implement a __call__ method which returns an async context manager. The idea is that you pass a sequence number to __call__ to say where this block of code should go in the linear sequence. Block 0 starts immediately, and then block N doesn’t start until block N-1 has finished.

Example

An extremely elaborate way to print the numbers 0-5, in order:

async def worker1(seq):
    async with seq(0):
        print(0)
    async with seq(4):
        print(4)

async def worker2(seq):
    async with seq(2):
        print(2)
    async with seq(5):
        print(5)

async def worker3(seq):
    async with seq(1):
        print(1)
    async with seq(3):
        print(3)

async def main():
   seq = trio.testing.Sequencer()
   async with trio.open_nursery() as nursery:
       nursery.start_soon(worker1, seq)
       nursery.start_soon(worker2, seq)
       nursery.start_soon(worker3, seq)
await trio.testing.wait_all_tasks_blocked(cushion=0.0, tiebreaker=0)

Block until there are no runnable tasks.

This is useful in testing code when you want to give other tasks a chance to “settle down”. The calling task is blocked, and doesn’t wake up until all other tasks are also blocked for at least cushion seconds. (Setting a non-zero cushion is intended to handle cases like two tasks talking to each other over a local socket, where we want to ignore the potential brief moment between a send and receive when all tasks are blocked.)

Note that cushion is measured in real time, not the trio clock time.

If there are multiple tasks blocked in wait_all_tasks_blocked(), then the one with the shortest cushion is the one woken (and the this task becoming unblocked resets the timers for the remaining tasks). If there are multiple tasks that have exactly the same cushion, then the one with the lowest tiebreaker value is woken first. And if there are multiple tasks with the same cushion and the same tiebreaker, then all are woken.

You should also consider trio.testing.Sequencer, which provides a more explicit way to control execution ordering within a test, and will often produce more readable tests.

Example

Here’s an example of one way to test that trio’s locks are fair: we take the lock in the parent, start a child, wait for the child to be blocked waiting for the lock (!), and then check that we can’t release and immediately re-acquire the lock:

async def lock_taker(lock):
    await lock.acquire()
    lock.release()

async def test_lock_fairness():
    lock = trio.Lock()
    await lock.acquire()
    async with trio.open_nursery() as nursery:
        child = nursery.start_soon(lock_taker, lock)
        # child hasn't run yet, we have the lock
        assert lock.locked()
        assert lock._owner is trio.current_task()
        await trio.testing.wait_all_tasks_blocked()
        # now the child has run and is blocked on lock.acquire(), we
        # still have the lock
        assert lock.locked()
        assert lock._owner is trio.current_task()
        lock.release()
        try:
            # The child has a prior claim, so we can't have it
            lock.acquire_nowait()
        except trio.WouldBlock:
            assert lock._owner is child
            print("PASS")
        else:
            print("FAIL")

Streams

Connecting to an in-process socket server

await trio.testing.open_stream_to_socket_listener(socket_listener)

Connect to the given SocketListener.

This is particularly useful in tests when you want to let a server pick its own port, and then connect to it:

listeners = await trio.open_tcp_listeners(0)
client = await trio.testing.open_stream_to_socket_listener(listeners[0])
Parameters:socket_listener (SocketListener) – The SocketListener to connect to.
Returns:a stream connected to the given listener.
Return type:SocketStream

Virtual, controllable streams

One particularly challenging problem when testing network protocols is making sure that your implementation can handle data whose flow gets broken up in weird ways and arrives with weird timings: localhost connections tend to be much better behaved than real networks, so if you only test on localhost then you might get bitten later. To help you out, trio provides some fully in-memory implementations of the stream interfaces (see The abstract Stream API), that let you write all kinds of interestingly evil tests.

There are a few pieces here, so here’s how they fit together:

memory_stream_pair() gives you a pair of connected, bidirectional streams. It’s like socket.socketpair(), but without any involvement from that pesky operating system and its networking stack.

To build a bidirectional stream, memory_stream_pair() uses two unidirectional streams. It gets these by calling memory_stream_one_way_pair().

memory_stream_one_way_pair(), in turn, is implemented using the low-ish level classes MemorySendStream and MemoryReceiveStream. These are implementations of (you guessed it) trio.abc.SendStream and trio.abc.ReceiveStream that on their own, aren’t attached to anything – “sending” and “receiving” just put data into and get data out of a private internal buffer that each object owns. They also have some interesting hooks you can set, that let you customize the behavior of their methods. This is where you can insert the evil, if you want it. memory_stream_one_way_pair() takes advantage of these hooks in a relatively boring way: it just sets it up so that when you call sendall, or when you close the send stream, then it automatically triggers a call to memory_stream_pump(), which is a convenience function that takes data out of a MemorySendStream´s buffer and puts it into a MemoryReceiveStream´s buffer. But that’s just the default – you can replace this with whatever arbitrary behavior you want.

Trio also provides some specialized functions for testing completely unbuffered streams: lockstep_stream_one_way_pair() and lockstep_stream_pair(). These aren’t customizable, but they do exhibit an extreme kind of behavior that’s good at catching out edge cases in protocol implementations.

API details

class trio.testing.MemorySendStream(send_all_hook=None, wait_send_all_might_not_block_hook=None, close_hook=None)

An in-memory SendStream.

Parameters:
  • send_all_hook – An async function, or None. Called from send_all(). Can do whatever you like.
  • wait_send_all_might_not_block_hook – An async function, or None. Called from wait_send_all_might_not_block(). Can do whatever you like.
  • close_hook – A synchronous function, or None. Called from close() and aclose(). Can do whatever you like.
send_all_hook
wait_send_all_might_not_block_hook
close_hook

All of these hooks are also exposed as attributes on the object, and you can change them at any time.

await aclose()

Same as close(), but async.

close()

Marks this stream as closed, and then calls the close_hook (if any).

await get_data(max_bytes=None)

Retrieves data from the internal buffer, blocking if necessary.

Parameters:max_bytes (int or None) – The maximum amount of data to retrieve. None (the default) means to retrieve all the data that’s present (but still blocks until at least one byte is available).
Returns:If this stream has been closed, an empty bytearray. Otherwise, the requested data.
get_data_nowait(max_bytes=None)

Retrieves data from the internal buffer, but doesn’t block.

See get_data() for details.

Raises:trio.WouldBlock – if no data is available to retrieve.
await send_all(data)

Places the given data into the object’s internal buffer, and then calls the send_all_hook (if any).

await wait_send_all_might_not_block()

Calls the wait_send_all_might_not_block_hook (if any), and then returns immediately.

class trio.testing.MemoryReceiveStream(receive_some_hook=None, close_hook=None)

An in-memory ReceiveStream.

Parameters:
  • receive_some_hook – An async function, or None. Called from receive_some(). Can do whatever you like.
  • close_hook – A synchronous function, or None. Called from close() and aclose(). Can do whatever you like.
receive_some_hook
close_hook

Both hooks are also exposed as attributes on the object, and you can change them at any time.

await aclose()

Same as close(), but async.

close()

Discards any pending data from the internal buffer, and marks this stream as closed.

put_data(data)

Appends the given data to the internal buffer.

put_eof()

Adds an end-of-file marker to the internal buffer.

await receive_some(max_bytes)

Calls the receive_some_hook (if any), and then retrieves data from the internal buffer, blocking if necessary.

trio.testing.memory_stream_pump(memory_send_stream, memory_recieve_stream, *, max_bytes=None)

Take data out of the given MemorySendStream’s internal buffer, and put it into the given MemoryReceiveStream’s internal buffer.

Parameters:
  • memory_send_stream (MemorySendStream) – The stream to get data from.
  • memory_recieve_stream (MemoryReceiveStream) – The stream to put data into.
  • max_bytes (int or None) – The maximum amount of data to transfer in this call, or None to transfer all available data.
Returns:

True if it successfully transferred some data, or False if there was no data to transfer.

This is used to implement memory_stream_one_way_pair() and memory_stream_pair(); see the latter’s docstring for an example of how you might use it yourself.

trio.testing.memory_stream_one_way_pair()

Create a connected, pure-Python, unidirectional stream with infinite buffering and flexible configuration options.

You can think of this as being a no-operating-system-involved trio-streamsified version of os.pipe() (except that os.pipe() returns the streams in the wrong order – we follow the superior convention that data flows from left to right).

Returns:A tuple (MemorySendStream, MemoryReceiveStream), where the MemorySendStream has its hooks set up so that it calls memory_stream_pump() from its send_all_hook and close_hook.

The end result is that data automatically flows from the MemorySendStream to the MemoryReceiveStream. But you’re also free to rearrange things however you like. For example, you can temporarily set the send_all_hook to None if you want to simulate a stall in data transmission. Or see memory_stream_pair() for a more elaborate example.

trio.testing.memory_stream_pair()

Create a connected, pure-Python, bidirectional stream with infinite buffering and flexible configuration options.

This is a convenience function that creates two one-way streams using memory_stream_one_way_pair(), and then uses StapledStream to combine them into a single bidirectional stream.

This is like a no-operating-system-involved, trio-streamsified version of socket.socketpair().

Returns:A pair of StapledStream objects that are connected so that data automatically flows from one to the other in both directions.

After creating a stream pair, you can send data back and forth, which is enough for simple tests:

left, right = memory_stream_pair()
await left.send_all(b"123")
assert await right.receive_some(10) == b"123"
await right.send_all(b"456")
assert await left.receive_some(10) == b"456"

But if you read the docs for StapledStream and memory_stream_one_way_pair(), you’ll see that all the pieces involved in wiring this up are public APIs, so you can adjust to suit the requirements of your tests. For example, here’s how to tweak a stream so that data flowing from left to right trickles in one byte at a time (but data flowing from right to left proceeds at full speed):

left, right = memory_stream_pair()
async def trickle():
    # left is a StapledStream, and left.send_stream is a MemorySendStream
    # right is a StapledStream, and right.recv_stream is a MemoryReceiveStream
    while memory_stream_pump(left.send_stream, right.recv_stream, max_byes=1):
        # Pause between each byte
        await trio.sleep(1)
# Normally this send_all_hook calls memory_stream_pump directly without
# passing in a max_bytes. We replace it with our custom version:
left.send_stream.send_all_hook = trickle

And here’s a simple test using our modified stream objects:

async def sender():
    await left.send_all(b"12345")
    await left.send_eof()

async def receiver():
    while True:
        data = await right.receive_some(10)
        if data == b"":
            return
        print(data)

async with trio.open_nursery() as nursery:
    nursery.start_soon(sender)
    nursery.start_soon(receiver)

By default, this will print b"12345" and then immediately exit; with our trickle stream it instead sleeps 1 second, then prints b"1", then sleeps 1 second, then prints b"2", etc.

Pro-tip: you can insert sleep calls (like in our example above) to manipulate the flow of data across tasks… and then use MockClock and its autojump_threshold functionality to keep your test suite running quickly.

If you want to stress test a protocol implementation, one nice trick is to use the random module (preferably with a fixed seed) to move random numbers of bytes at a time, and insert random sleeps in between them. You can also set up a custom receive_some_hook if you want to manipulate things on the receiving side, and not just the sending side.

trio.testing.lockstep_stream_one_way_pair()

Create a connected, pure Python, unidirectional stream where data flows in lockstep.

Returns:A tuple (SendStream, ReceiveStream).

This stream has absolutely no buffering. Each call to send_all() will block until all the given data has been returned by a call to receive_some().

This can be useful for testing flow control mechanisms in an extreme case, or for setting up “clogged” streams to use with check_one_way_stream() and friends.

In addition to fulfilling the SendStream and ReceiveStream interfaces, the return objects also have a synchronous close method.

trio.testing.lockstep_stream_pair()

Create a connected, pure-Python, bidirectional stream where data flows in lockstep.

Returns:A tuple (StapledStream, StapledStream).

This is a convenience function that creates two one-way streams using lockstep_stream_one_way_pair(), and then uses StapledStream to combine them into a single bidirectional stream.

Testing custom stream implementations

Trio also provides some functions to help you test your custom stream implementations:

await trio.testing.check_one_way_stream(stream_maker, clogged_stream_maker)

Perform a number of generic tests on a custom one-way stream implementation.

Parameters:
  • stream_maker – An async (!) function which returns a connected (SendStream, ReceiveStream) pair.
  • clogged_stream_maker – Either None, or an async function similar to stream_maker, but with the extra property that the returned stream is in a state where send_all and wait_send_all_might_not_block will block until receive_some has been called. This allows for more thorough testing of some edge cases, especially around wait_send_all_might_not_block.
Raises:

AssertionError – if a test fails.

await trio.testing.check_two_way_stream(stream_maker, clogged_stream_maker)

Perform a number of generic tests on a custom two-way stream implementation.

This is similar to check_one_way_stream(), except that the maker functions are expected to return objects implementing the Stream interface.

This function tests a superset of what check_one_way_stream() checks – if you call this, then you don’t need to also call check_one_way_stream().

await trio.testing.check_half_closeable_stream(stream_maker, clogged_stream_maker)

Perform a number of generic tests on a custom half-closeable stream implementation.

This is similar to check_two_way_stream(), except that the maker functions are expected to return objects that implement the HalfCloseableStream interface.

This function tests a superset of what check_two_way_stream() checks – if you call this, then you don’t need to also call check_two_way_stream().

Virtual networking for testing

In the previous section you learned how to use virtual in-memory streams to test protocols that are written against trio’s Stream abstraction. But what if you have more complicated networking code – the kind of code that makes connections to multiple hosts, or opens a listening socket, or sends UDP packets?

Trio doesn’t itself provide a virtual in-memory network implementation for testing – but trio.socket module does provide the hooks you need to write your own! And if you’re interested in helping implement a reusable virtual network for testing, then please get in touch.

Note that these APIs are actually in trio.socket and trio.abc, but we document them here because they’re primarily intended for testing.

trio.socket.set_custom_hostname_resolver(hostname_resolver)

Set a custom hostname resolver.

By default, trio’s getaddrinfo() and getnameinfo() functions use the standard system resolver functions. This function allows you to customize that behavior. The main intended use case is for testing, but it might also be useful for using third-party resolvers like c-ares (though be warned that these rarely make perfect drop-in replacements for the system resolver). See trio.abc.HostnameResolver for more details.

Setting a custom hostname resolver affects all future calls to getaddrinfo() and getnameinfo() within the enclosing call to trio.run(). All other hostname resolution in trio is implemented in terms of these functions.

Generally you should call this function just once, right at the beginning of your program.

Parameters:hostname_resolver (trio.abc.HostnameResolver or None) – The new custom hostname resolver, or None to restore the default behavior.
Returns:The previous hostname resolver (which may be None).
class trio.abc.HostnameResolver

If you have a custom hostname resolver, then implementing HostnameResolver allows you to register this to be used by trio.

See trio.socket.set_custom_hostname_resolver().

abstractmethod await getaddrinfo(host, port, family=0, type=0, proto=0, flags=0)

A custom implementation of getaddrinfo().

Called by trio.socket.getaddrinfo().

If host is given as a numeric IP address, then getaddrinfo() may handle the request itself rather than calling this method.

Any required IDNA encoding is handled before calling this function; your implementation can assume that it will never see U-labels like "café.com", and only needs to handle A-labels like b"xn--caf-dma.com".

abstractmethod await getnameinfo(sockaddr, flags)

A custom implementation of getnameinfo().

Called by trio.socket.getnameinfo().

trio.socket.set_custom_socket_factory(socket_factory)

Set a custom socket object factory.

This function allows you to replace trio’s normal socket class with a custom class. This is very useful for testing, and probably a bad idea in any other circumstance. See trio.abc.HostnameResolver for more details.

Setting a custom socket factory affects all future calls to socket() within the enclosing call to trio.run().

Generally you should call this function just once, right at the beginning of your program.

Parameters:socket_factory (trio.abc.SocketFactory or None) – The new custom socket factory, or None to restore the default behavior.
Returns:The previous socket factory (which may be None).
class trio.abc.SocketFactory

If you write a custom class implementing the trio socket interface, then you can use a SocketFactory to get trio to use it.

See trio.socket.set_custom_socket_factory().

abstractmethod socket(family=None, type=None, proto=None)

Create and return a socket object.

Your socket object must inherit from trio.socket.SocketType, which is an empty class whose only purpose is to “mark” which classes should be considered valid trio sockets.

Called by trio.socket.socket().

Note that unlike trio.socket.socket(), this does not take a fileno= argument. If a fileno= is specified, then trio.socket.socket() returns a regular trio socket object instead of calling this method.

Testing checkpoints

with trio.testing.assert_checkpoints()

Use as a context manager to check that the code inside the with block executes at least one checkpoint.

Raises:AssertionError – if no checkpoint was executed.

Example

Check that trio.sleep() is a checkpoint, even if it doesn’t block:

with trio.testing.assert_checkpoints():
    await trio.sleep(0)
with trio.testing.assert_no_checkpoints()

Use as a context manager to check that the code inside the with block does not execute any check points.

Raises:AssertionError – if a checkpoint was executed.

Example

Synchronous code never contains any checkpoints, but we can double-check that:

queue = trio.Queue(10)
with trio.testing.assert_no_checkpoints():
    queue.put_nowait(None)

Introspecting and extending Trio with trio.hazmat

Warning

You probably don’t want to use this module.

trio.hazmat is Trio’s “hazardous materials” layer: it contains APIs useful for introspecting and extending Trio. If you’re writing ordinary, everyday code, then you can ignore this module completely. But sometimes you need something a bit lower level. Here are some examples of situations where you should reach for trio.hazmat:

  • You want to implement a new synchronization primitive that Trio doesn’t (yet) provide, like a reader-writer lock.
  • You want to extract low-level metrics to monitor the health of your application.
  • You want to add support for a low-level operating system interface that Trio doesn’t (yet) expose, like watching a filesystem directory for changes.
  • You want to implement an interface for calling between Trio and another event loop within the same process.
  • You’re writing a debugger and want to visualize Trio’s task tree.
  • You need to interoperate with a C library whose API exposes raw file descriptors.

Using trio.hazmat isn’t really that hazardous; in fact you’re already using it – it’s how most of the functionality described in previous chapters is implemented. The APIs described here have strictly defined and carefully documented semantics, and are perfectly safe – if you read carefully and take proper precautions. Some of those strict semantics have nasty big pointy teeth. If you make a mistake, Trio may not be able to handle it gracefully; conventions and guarantees that are followed strictly in the rest of Trio do not always apply. Using this module makes it your responsibility to think through and handle the nasty cases to expose a friendly Trio-style API to your users.

Debugging and instrumentation

Trio tries hard to provide useful hooks for debugging and instrumentation. Some are documented above (the nursery introspection attributes, trio.Queue.statistics(), etc.). Here are some more.

Global statistics

trio.hazmat.current_statistics()

Returns an object containing run-loop-level debugging information.

Currently the following fields are defined:

  • tasks_living (int): The number of tasks that have been spawned and not yet exited.
  • tasks_runnable (int): The number of tasks that are currently queued on the run queue (as opposed to blocked waiting for something to happen).
  • seconds_to_next_deadline (float): The time until the next pending cancel scope deadline. May be negative if the deadline has expired but we haven’t yet processed cancellations. May be inf if there are no pending deadlines.
  • run_sync_soon_queue_size (int): The number of unprocessed callbacks queued via trio.hazmat.TrioToken.run_sync_soon().
  • io_statistics (object): Some statistics from trio’s I/O backend. This always has an attribute backend which is a string naming which operating-system-specific I/O backend is in use; the other attributes vary between backends.

The current clock

trio.hazmat.current_clock()

Returns the current Clock.

Instrument API

The instrument API provides a standard way to add custom instrumentation to the run loop. Want to make a histogram of scheduling latencies, log a stack trace of any task that blocks the run loop for >50 ms, or measure what percentage of your process’s running time is spent waiting for I/O? This is the place.

The general idea is that at any given moment, trio.run() maintains a set of “instruments”, which are objects that implement the trio.abc.Instrument interface. When an interesting event happens, it loops over these instruments and notifies them by calling an appropriate method. The tutorial has a simple example of using this for tracing.

Since this hooks into trio at a rather low level, you do have to be careful. The callbacks are run synchronously, and in many cases if they error out then there isn’t any plausible way to propagate this exception (for instance, we might be deep in the guts of the exception propagation machinery…). Therefore our current strategy for handling exceptions raised by instruments is to (a) log an exception to the "trio.abc.Instrument" logger, which by default prints a stack trace to standard error and (b) disable the offending instrument.

You can register an initial list of instruments by passing them to trio.run(). add_instrument() and remove_instrument() let you add and remove instruments at runtime.

trio.hazmat.add_instrument(instrument)

Start instrumenting the current run loop with the given instrument.

Parameters:instrument (trio.abc.Instrument) – The instrument to activate.

If instrument is already active, does nothing.

trio.hazmat.remove_instrument(instrument)

Stop instrumenting the current run loop with the given instrument.

Parameters:instrument (trio.abc.Instrument) – The instrument to de-activate.
Raises:KeyError – if the instrument is not currently active. This could occur either because you never added it, or because you added it and then it raised an unhandled exception and was automatically deactivated.

And here’s the interface to implement if you want to build your own Instrument:

class trio.abc.Instrument

The interface for run loop instrumentation.

Instruments don’t have to inherit from this abstract base class, and all of these methods are optional. This class serves mostly as documentation.

after_io_wait(timeout)

Called after handling pending I/O.

Parameters:timeout (float) – The number of seconds we were willing to wait. This much time may or may not have elapsed, depending on whether any I/O was ready.
after_run()

Called just before trio.run() returns.

after_task_step(task)

Called when we return to the main run loop after a task has yielded.

Parameters:task (trio.hazmat.Task) – The task that just ran.
before_io_wait(timeout)

Called before blocking to wait for I/O readiness.

Parameters:timeout (float) – The number of seconds we are willing to wait.
before_run()

Called at the beginning of trio.run().

before_task_step(task)

Called immediately before we resume running the given task.

Parameters:task (trio.hazmat.Task) – The task that is about to run.
task_exited(task)

Called when the given task exits.

Parameters:task (trio.hazmat.Task) – The finished task.
task_scheduled(task)

Called when the given task becomes runnable.

It may still be some time before it actually runs, if there are other runnable tasks ahead of it.

Parameters:task (trio.hazmat.Task) – The task that became runnable.
task_spawned(task)

Called when the given task is created.

Parameters:task (trio.hazmat.Task) – The new task.

The tutorial has a fully-worked example of defining a custom instrument to log trio’s internal scheduling decisions.

Low-level I/O primitives

Different environments expose different low-level APIs for performing async I/O. trio.hazmat exposes these APIs in a relatively direct way, so as to allow maximum power and flexibility for higher level code. However, this means that the exact API provided may vary depending on what system trio is running on.

Universally available API

All environments provide the following functions:

await trio.hazmat.wait_socket_readable(sock)

Block until the given socket.socket() object is readable.

The given object must be exactly of type socket.socket(), nothing else.

Raises:
await trio.hazmat.wait_socket_writable(sock)

Block until the given socket.socket() object is writable.

The given object must be exactly of type socket.socket(), nothing else.

Raises:

Unix-specific API

Unix-like systems provide the following functions:

await trio.hazmat.wait_readable(fd)

Block until the given file descriptor is readable.

Warning

This is “readable” according to the operating system’s definition of readable. In particular, it probably won’t tell you anything useful for on-disk files.

Parameters:fd – integer file descriptor, or else an object with a fileno() method
Raises:trio.ResourceBusyError – if another task is already waiting for the given fd to become readable.
await trio.hazmat.wait_writable(fd)

Block until the given file descriptor is writable.

Warning

This is “writable” according to the operating system’s definition of writable. In particular, it probably won’t tell you anything useful for on-disk files.

Parameters:fd – integer file descriptor, or else an object with a fileno() method
Raises:trio.ResourceBusyError – if another task is already waiting for the given fd to become writable.

Kqueue-specific API

TODO: these are implemented, but are currently more of a sketch than anything real. See #26.

trio.hazmat.current_kqueue()
await trio.hazmat.wait_kevent(ident, filter, abort_func)
with trio.hazmat.monitor_kevent(ident, filter) as queue

Windows-specific API

TODO: these are implemented, but are currently more of a sketch than anything real. See #26 and #52.

trio.hazmat.register_with_iocp(handle)
await trio.hazmat.wait_overlapped(handle, lpOverlapped)
trio.hazmat.current_iocp()
with trio.hazmat.monitor_completion_key() as queue

Unbounded queues

In the section Passing messages with Queue, we showed an example with two producers and one consumer using the same queue, where the queue size would grow without bound to produce unbounded latency and memory usage. trio.Queue avoids this by placing an upper bound on how big the queue can get before put starts blocking. But what if you’re in a situation where put can’t block?

There is another option: the queue consumer could get greedy. Each time it runs, it could eagerly consume all of the pending items before allowing another task to run. (In some other systems, this would happen automatically because their queue’s get method doesn’t invoke the scheduler unless it has to block. But in trio, get is always a checkpoint.) This works, but it’s a bit risky: basically instead of applying backpressure to specifically the producer tasks, we’re applying it to all the tasks in our system. The danger here is that if enough items have built up in the queue, then “stopping the world” to process them all may cause unacceptable latency spikes in unrelated tasks. Nonetheless, this is still the right choice in situations where it’s impossible to apply backpressure more precisely. So this is the strategy implemented by UnboundedQueue. The main time you should use this is when working with low-level APIs like monitor_kevent().

class trio.hazmat.UnboundedQueue

An unbounded queue suitable for certain unusual forms of inter-task communication.

This class is designed for use as a queue in cases where the producer for some reason cannot be subjected to back-pressure, i.e., put_nowait() has to always succeed. In order to prevent the queue backlog from actually growing without bound, the consumer API is modified to dequeue items in “batches”. If a consumer task processes each batch without yielding, then this helps achieve (but does not guarantee) an effective bound on the queue’s memory use, at the cost of potentially increasing system latencies in general. You should generally prefer to use a trio.Queue instead if you can.

Currently each batch completely empties the queue, but this may change in the future.

A UnboundedQueue object can be used as an asynchronous iterator, where each iteration returns a new batch of items. I.e., these two loops are equivalent:

async for batch in queue:
    ...

while True:
    obj = await queue.get_batch()
    ...
empty()

Returns True if the queue is empty, False otherwise.

There is some subtlety to interpreting this method’s return value: see issue #63.

await get_batch()

Get the next batch from the queue, blocking as necessary.

Returns:
A list of dequeued items, in order. This list is always
non-empty.
Return type:list
get_batch_nowait()

Attempt to get the next batch from the queue, without blocking.

Returns:
A list of dequeued items, in order. On a successful call this
list is always non-empty; if it would be empty we raise WouldBlock instead.
Return type:list
Raises:WouldBlock – if the queue is empty.
put_nowait(obj)

Put an object into the queue, without blocking.

This always succeeds, because the queue is unbounded. We don’t provide a blocking put method, because it would never need to block.

Parameters:obj (object) – The object to enqueue.
qsize()

Returns the number of items currently in the queue.

statistics()

Return an object containing debugging information.

Currently the following fields are defined:

  • qsize: The number of items currently in the queue.
  • tasks_waiting: The number of tasks blocked on this queue’s get_batch() method.

Global state: system tasks and run-local storage

class trio.hazmat.RunLocal(**kwargs)

Run-local storage.

RunLocal objects are very similar to trio.TaskLocal objects, except that attributes are shared across all the tasks within a single call to trio.run(). They’re also very similar to threading.local objects, except that RunLocal objects are automatically wiped clean when trio.run() returns.

trio.hazmat.spawn_system_task(async_fn, *args, name=None)

Spawn a “system” task.

System tasks have a few differences from regular tasks:

  • They don’t need an explicit nursery; instead they go into the internal “system nursery”.
  • If a system task raises an exception, then it’s converted into a TrioInternalError and all tasks are cancelled. If you write a system task, you should be careful to make sure it doesn’t crash.
  • System tasks are automatically cancelled when the main task exits.
  • By default, system tasks have KeyboardInterrupt protection enabled. If you want your task to be interruptible by control-C, then you need to use disable_ki_protection() explicitly.
Parameters:
  • async_fn – An async callable.
  • args – Positional arguments for async_fn. If you want to pass keyword arguments, use functools.partial().
  • name – The name for this task. Only used for debugging/introspection (e.g. repr(task_obj)). If this isn’t a string, spawn_system_task() will try to make it one. A common use case is if you’re wrapping a function before spawning a new task, you might pass the original function as the name= to make debugging easier.
Returns:

the newly spawned task

Return type:

Task

Trio tokens

class trio.hazmat.TrioToken

An opaque object representing a single call to trio.run().

It has no public constructor; instead, see current_trio_token().

This object has two uses:

  1. It lets you re-enter the Trio run loop from external threads or signal handlers. This is the low-level primitive that trio.run_sync_in_worker_thread() uses to receive results from worker threads, that trio.catch_signals() uses to receive notifications about signals, and so forth.
  2. Each call to trio.run() has exactly one associated TrioToken object, so you can use it to identify a particular call.
run_sync_soon(sync_fn, *args, idempotent=False)

Schedule a call to sync_fn(*args) to occur in the context of a trio task.

This is safe to call from the main thread, from other threads, and from signal handlers. This is the fundamental primitive used to re-enter the Trio run loop from outside of it.

The call will happen “soon”, but there’s no guarantee about exactly when, and no mechanism provided for finding out when it’s happened. If you need this, you’ll have to build your own.

The call is effectively run as part of a system task (see spawn_system_task()). In particular this means that:

  • KeyboardInterrupt protection is enabled by default; if you want sync_fn to be interruptible by control-C, then you need to use disable_ki_protection() explicitly.
  • If sync_fn raises an exception, then it’s converted into a TrioInternalError and all tasks are cancelled. You should be careful that sync_fn doesn’t crash.

All calls with idempotent=False are processed in strict first-in first-out order.

If idempotent=True, then sync_fn and args must be hashable, and trio will make a best-effort attempt to discard any call submission which is equal to an already-pending call. Trio will make an attempt to process these in first-in first-out order, but no guarantees. (Currently processing is FIFO on CPython 3.6 and PyPy, but not CPython 3.5.)

Any ordering guarantees apply separately to idempotent=False and idempotent=True calls; there’s no rule for how calls in the different categories are ordered with respect to each other.

Raises:trio.RunFinishedError – if the associated call to trio.run() has already exited. (Any call that doesn’t raise this error is guaranteed to be fully processed before trio.run() exits.)
trio.hazmat.current_trio_token()

Retrieve the TrioToken for the current call to trio.run().

Safer KeyboardInterrupt handling

Trio’s handling of control-C is designed to balance usability and safety. On the one hand, there are sensitive regions (like the core scheduling loop) where it’s simply impossible to handle arbitrary KeyboardInterrupt exceptions while maintaining our core correctness invariants. On the other, if the user accidentally writes an infinite loop, we do want to be able to break out of that. Our solution is to install a default signal handler which checks whether it’s safe to raise KeyboardInterrupt at the place where the signal is received. If so, then we do; otherwise, we schedule a KeyboardInterrupt to be delivered to the main task at the next available opportunity (similar to how Cancelled is delivered).

So that’s great, but – how do we know whether we’re in one of the sensitive parts of the program or not?

This is determined on a function-by-function basis. By default, a function is protected if its caller is, and not if its caller isn’t; this is helpful because it means you only need to override the defaults at places where you transition from protected code to unprotected code or vice-versa.

These transitions are accomplished using two function decorators:

@trio.hazmat.disable_ki_protection

Decorator that marks the given regular function, generator function, async function, or async generator function as unprotected against KeyboardInterrupt, i.e., the code inside this function can be rudely interrupted by KeyboardInterrupt at any moment.

If you have multiple decorators on the same function, then this should be at the bottom of the stack (closest to the actual function).

An example of where you’d use this is in implementing something like run_in_trio_thread, which uses call_soon_thread_and_signal_safe to get into the trio thread. call_soon_thread_and_signal_safe callbacks are run with KeyboardInterrupt protection enabled, and run_in_trio_thread takes advantage of this to safely set up the machinery for sending a response back to the original thread, and then uses disable_ki_protection() when entering the user-provided function.

@trio.hazmat.enable_ki_protection

Decorator that marks the given regular function, generator function, async function, or async generator function as protected against KeyboardInterrupt, i.e., the code inside this function won’t be rudely interrupted by KeyboardInterrupt. (Though if it contains any checkpoints, then it can still receive KeyboardInterrupt at those. This is considered a polite interruption.)

Warning

Be very careful to only use this decorator on functions that you know will either exit in bounded time, or else pass through a checkpoint regularly. (Of course all of your functions should have this property, but if you mess it up here then you won’t even be able to use control-C to escape!)

If you have multiple decorators on the same function, then this should be at the bottom of the stack (closest to the actual function).

An example of where you’d use this is on the __exit__ implementation for something like a Lock, where a poorly-timed KeyboardInterrupt could leave the lock in an inconsistent state and cause a deadlock.

trio.hazmat.currently_ki_protected()

Check whether the calling code has KeyboardInterrupt protection enabled.

It’s surprisingly easy to think that one’s KeyboardInterrupt protection is enabled when it isn’t, or vice-versa. This function tells you what trio thinks of the matter, which makes it useful for asserts and unit tests.

Returns:True if protection is enabled, and False otherwise.
Return type:bool

Result objects

Trio provides some simple classes for representing the result of a Python function call, so that it can be passed around. The basic rule is:

result = Result.capture(f, *args)
x = result.unwrap()

is the same as:

x = f(*args)

even if f raises an error. And there’s also Result.acapture(), which is like await f(*args).

There’s nothing really dangerous about this system – it’s actually very general and quite handy! But mostly it’s used for things like implementing trio.run_sync_in_worker_thread(), or for getting values to pass to reschedule(), so we put it in trio.hazmat to avoid cluttering up the main API.

Since Result objects are simple immutable data structures that don’t otherwise interact with the trio machinery, it’s safe to create and access Result objects from any thread you like.

class trio.hazmat.Result

An abstract class representing the result of a Python computation.

This class has two concrete subclasses: Value representing a value, and Error representing an exception.

In addition to the methods described below, comparison operators on Value and Error objects (==, <, etc.) check that the other object is also a Value or Error object respectively, and then compare the contained objects.

Result objects are hashable if the contained objects are hashable.

staticmethod await acapture(async_fn, *args)

Run await async_fn(*args) and capture the result.

Returns:Either a Value or Error as appropriate.
abstractmethod await asend(agen)

Send or throw the contained value or exception into the given async generator object.

Parameters:agen – An async generator object supporting .asend() and .athrow() methods.
staticmethod capture(sync_fn, *args)

Run sync_fn(*args) and capture the result.

Returns:Either a Value or Error as appropriate.
abstractmethod send(gen)

Send or throw the contained value or exception into the given generator object.

Parameters:gen – A generator object supporting .send() and .throw() methods.
abstractmethod unwrap()

Return or raise the contained value or exception.

These two lines of code are equivalent:

x = fn(*args)
x = Result.capture(fn, *args).unwrap()
class trio.hazmat.Value(value)

Concrete Result subclass representing a regular value.

class trio.hazmat.Error(error)

Concrete Result subclass representing a raised exception.

Sleeping and waking

Wait queue abstraction

class trio.hazmat.ParkingLot

A fair wait queue with cancellation and requeueing.

This class encapsulates the tricky parts of implementing a wait queue. It’s useful for implementing higher-level synchronization primitives like queues and locks.

In addition to the methods below, you can use len(parking_lot) to get the number of parked tasks, and if parking_lot: ... to check whether there are any parked tasks.

await park()

Park the current task until woken by a call to unpark() or unpark_all().

repark(new_lot, *, count=1)

Move parked tasks from one ParkingLot object to another.

This dequeues count tasks from one lot, and requeues them on another, preserving order. For example:

async def parker(lot):
    print("sleeping")
    await lot.park()
    print("woken")

async def main():
    lot1 = trio.hazmat.ParkingLot()
    lot2 = trio.hazmat.ParkingLot()
    async with trio.open_nursery() as nursery:
        nursery.start_soon(lot1)
        await trio.testing.wait_all_tasks_blocked()
        assert len(lot1) == 1
        assert len(lot2) == 0
        lot1.repark(lot2)
        assert len(lot1) == 0
        assert len(lot2) == 1
        # This wakes up the task that was originally parked in lot1
        lot2.unpark()

If there are fewer than count tasks parked, then reparks as many tasks as are available and then returns successfully.

Parameters:
  • new_lot (ParkingLot) – the parking lot to move tasks to.
  • count (int) – the number of tasks to move.
repark_all(new_lot)

Move all parked tasks from one ParkingLot object to another.

See repark() for details.

statistics()

Return an object containing debugging information.

Currently the following fields are defined:

  • tasks_waiting: The number of tasks blocked on this lot’s park() method.
unpark(*, count=1)

Unpark one or more tasks.

This wakes up count tasks that are blocked in park(). If there are fewer than count tasks parked, then wakes as many tasks are available and then returns successfully.

Parameters:count (int) – the number of tasks to unpark.
unpark_all()

Unpark all parked tasks.

Low-level checkpoint functions

await trio.hazmat.checkpoint()

A pure checkpoint.

This checks for cancellation and allows other tasks to be scheduled, without otherwise blocking.

Note that the scheduler has the option of ignoring this and continuing to run the current task if it decides this is appropriate (e.g. for increased efficiency).

Equivalent to await trio.sleep(0) (which is implemented by calling checkpoint().)

The next two functions are used together to make up a checkpoint:

await trio.hazmat.checkpoint_if_cancelled()

Issue a checkpoint if the calling context has been cancelled.

Equivalent to (but potentially more efficient than):

if trio.current_deadline() == -inf:
    await trio.hazmat.checkpoint()

This is either a no-op, or else it allow other tasks to be scheduled and then raises trio.Cancelled.

Typically used together with cancel_shielded_checkpoint().

await trio.hazmat.cancel_shielded_checkpoint()

Introduce a schedule point, but not a cancel point.

This is not a checkpoint, but it is half of a checkpoint, and when combined with checkpoint_if_cancelled() it can make a full checkpoint.

Equivalent to (but potentially more efficient than):

with trio.open_cancel_scope(shield=True):
    await trio.hazmat.checkpoint()

These are commonly used in cases where you have an operation that might-or-might-not block, and you want to implement trio’s standard checkpoint semantics. Example:

async def operation_that_maybe_blocks():
    await checkpoint_if_cancelled()
    try:
        ret = attempt_operation()
    except BlockingIOError:
        # need to block and then retry, which we do below
        pass
    except:
        # some other error, finish the checkpoint then let it propagate
        await cancel_shielded_checkpoint()
        raise
    else:
        # operation succeeded, finish the checkpoint then return
        await cancel_shielded_checkpoint()
        return ret
    while True:
        await wait_for_operation_to_be_ready()
        try:
            return attempt_operation()
        except BlockingIOError:
            pass

This logic is a bit convoluted, but accomplishes all of the following:

  • Every execution path passes through a checkpoint (assuming that wait_for_operation_to_be_ready is an unconditional checkpoint)
  • Our cancellation semantics say that Cancelled should only be raised if the operation didn’t happen. Using cancel_shielded_checkpoint() on the early-exit branches accomplishes this.
  • On the path where we do end up blocking, we don’t pass through any schedule points before that, which avoids some unnecessary work.
  • Avoids implicitly chaining the BlockingIOError with any errors raised by attempt_operation or wait_for_operation_to_be_ready, by keeping the while True: loop outside of the except BlockingIOError: block.

These functions can also be useful in other situations. For example, when trio.run_sync_in_worker_thread() schedules some work to run in a worker thread, it blocks until the work is finished (so it’s a schedule point), but by default it doesn’t allow cancellation. So to make sure that the call always acts as a checkpoint, it calls checkpoint_if_cancelled() before starting the thread.

Low-level blocking

await trio.hazmat.wait_task_rescheduled(abort_func)

Put the current task to sleep, with cancellation support.

This is the lowest-level API for blocking in trio. Every time a Task blocks, it does so by calling this function (usually indirectly via some higher-level API).

This is a tricky interface with no guard rails. If you can use ParkingLot or the built-in I/O wait functions instead, then you should.

Generally the way it works is that before calling this function, you make arrangements for “someone” to call reschedule() on the current task at some later point.

Then you call wait_task_rescheduled(), passing in abort_func, an “abort callback”.

(Terminology: in trio, “aborting” is the process of attempting to interrupt a blocked task to deliver a cancellation.)

There are two possibilities for what happens next:

  1. “Someone” calls reschedule() on the current task, and wait_task_rescheduled() returns or raises whatever value or error was passed to reschedule().

  2. The call’s context transitions to a cancelled state (e.g. due to a timeout expiring). When this happens, the abort_func is called. It’s interface looks like:

    def abort_func(raise_cancel):
        ...
        return trio.hazmat.Abort.SUCCEEDED  # or FAILED
    

    It should attempt to clean up any state associated with this call, and in particular, arrange that reschedule() will not be called later. If (and only if!) it is successful, then it should return Abort.SUCCEEDED, in which case the task will automatically be rescheduled with an appropriate Cancelled error.

    Otherwise, it should return Abort.FAILED. This means that the task can’t be cancelled at this time, and still has to make sure that “someone” eventually calls reschedule().

    At that point there are again two possibilities. You can simply ignore the cancellation altogether: wait for the operation to complete and then reschedule and continue as normal. (For example, this is what trio.run_sync_in_worker_thread() does if cancellation is disabled.) The other possibility is that the abort_func does succeed in cancelling the operation, but for some reason isn’t able to report that right away. (Example: on Windows, it’s possible to request that an async (“overlapped”) I/O operation be cancelled, but this request is also asynchronous – you don’t find out until later whether the operation was actually cancelled or not.) To report a delayed cancellation, then you should reschedule the task yourself, and call the raise_cancel callback passed to abort_func to raise a Cancelled (or possibly KeyboardInterrupt) exception into this task. Either of the approaches sketched below can work:

    # Option 1:
    # Catch the exception from raise_cancel and inject it into the task.
    # (This is what trio does automatically for you if you return
    # Abort.SUCCEEDED.)
    trio.hazmat.reschedule(task, Result.capture(raise_cancel))
    
    # Option 2:
    # wait to be woken by "someone", and then decide whether to raise
    # the error from inside the task.
    outer_raise_cancel = None
    def abort(inner_raise_cancel):
        nonlocal outer_raise_cancel
        outer_raise_cancel = inner_raise_cancel
        TRY_TO_CANCEL_OPERATION()
        return trio.hazmat.Abort.FAILED
    await wait_task_rescheduled(abort)
    if OPERATION_WAS_SUCCESSFULLY_CANCELLED:
        # raises the error
        outer_raise_cancel()
    

    In any case it’s guaranteed that we only call the abort_func at most once per call to wait_task_rescheduled().

Warning

If your abort_func raises an error, or returns any value other than Abort.SUCCEEDED or Abort.FAILED, then trio will crash violently. Be careful! Similarly, it is entirely possible to deadlock a trio program by failing to reschedule a blocked task, or cause havoc by calling reschedule() too many times. Remember what we said up above about how you should use a higher-level API if at all possible?

class trio.hazmat.Abort

enum.Enum used as the return value from abort functions.

See wait_task_rescheduled() for details.

SUCCEEDED
FAILED
trio.hazmat.reschedule(task, next_send=Value(None))

Reschedule the given task with the given Result.

See wait_task_rescheduled() for the gory details.

There must be exactly one call to reschedule() for every call to wait_task_rescheduled(). (And when counting, keep in mind that returning Abort.SUCCEEDED from an abort callback is equivalent to calling reschedule() once.)

Parameters:

Here’s an example lock class implemented using wait_task_rescheduled() directly. This implementation has a number of flaws, including lack of fairness, O(n) cancellation, missing error checking, failure to insert a checkpoint on the non-blocking path, etc. If you really want to implement your own lock, then you should study the implementation of trio.Lock and use ParkingLot, which handles some of these issues for you. But this does serve to illustrate the basic structure of the wait_task_rescheduled() API:

class NotVeryGoodLock:
    def __init__(self):
        self._blocked_tasks = collections.deque()
        self._held = False

    async def acquire(self):
        while self._held:
            task = trio.current_task()
            self._blocked_tasks.append(task)
            def abort_fn(_):
                self._blocked_tasks.remove(task)
                return trio.hazmat.Abort.SUCCEEDED
            await trio.hazmat.wait_task_rescheduled(abort_fn)
        self._held = True

    def release(self):
        self._held = False
        if self._blocked_tasks:
            woken_task = self._blocked_tasks.popleft()
            trio.hazmat.reschedule(woken_task)

Task API

trio.hazmat.current_task()

Return the Task object representing the current task.

Returns:the Task that called current_task().
Return type:Task
class trio.hazmat.Task

A Task object represents a concurrent “thread” of execution. It has no public constructor; Trio internally creates a Task object for each call to nursery.start(...) or nursery.start_soon(...).

Its public members are mostly useful for introspection and debugging:

name

String containing this Task’s name. Usually the name of the function this Task is running, but can be overridden by passing name= to start or start_soon.

coro

This task’s coroutine object. Example usage: extracting a stack trace:

import traceback

def walk_coro_stack(coro):
    while coro is not None:
        if hasattr(coro, "cr_frame"):
            # A real coroutine
            yield coro.cr_frame, coro.cr_frame.f_lineno
            coro = coro.cr_await
        else:
            # A generator decorated with @types.coroutine
            yield coro.gi_frame, coro.gi_frame.f_lineno
            coro = coro.gi_yieldfrom

def print_stack_for_task(task):
    ss = traceback.StackSummary.extract(walk_coro_stack(task.coro))
    print("".join(ss.format()))
parent_nursery

The nursery this task is inside (or None if this is the “init” take).

Example use case: drawing a visualization of the task tree in a debugger.

child_nurseries

The nurseries this task contains.

This is a list, with outer nurseries before inner nurseries.

Design and internals

Here we’ll discuss Trio’s overall design and architecture: how it fits together and why we made the decisions we did. If all you want to do is use Trio, then you don’t need to read this – though you might find it interesting. The main target audience here is (a) folks who want to read the code and potentially contribute, (b) anyone working on similar libraries who want to understand what we’re up to, (c) anyone interested in I/O library design generally.

There are many valid approaches to writing an async I/O library. This is ours.

High-level design principles

Trio’s two overriding goals are usability and correctness: we want to make it easy to get things right.

Of course there are lots of other things that matter too, like speed, maintainability, etc. We want those too, as much as we can get. But sometimes these things come in conflict, and when that happens, these are our priorities.

In some sense the entire rest of this document is a description of how these play out, but to give a simple example: Trio’s KeyboardInterrupt handling machinery is a bit tricky and hard to test, so it scores poorly on simplicity and maintainability. But we think the usability+correctness gains outweigh this.

There are some subtleties here. Notice that it’s specifically “easy to get things right”. There are situations (e.g. writing one-off scripts) where the most “usable” tool is the one that will happily ignore errors and keep going no matter what, or that doesn’t bother with resource cleanup. (Cf. the success of PHP.) This is a totally valid use case and valid definition of usability, but it’s not the one we use: we think it’s easier to build reliable and correct systems if exceptions propagate until handled and if the system catches you when you make potentially dangerous resource handling errors, so that’s what we optimize for.

It’s also worth saying something about speed, since it often looms large in comparisons between I/O libraries. This is a rather subtle and complex topic.

In general, speed is certainly important – but the fact that people sometimes use Python instead of C is a pretty good indicator that usability often trumps speed in practice. We want to make trio fast, but it’s not an accident that it’s left off our list of overriding goals at the top: if necessary we are willing to accept some slowdowns in the service of usability and reliability.

To break things down in more detail:

First of all, there are the cases where speed directly impacts correctness, like when you hit an accidental O(N**2) algorithm and your program effectively locks up. Trio is very careful to use algorithms and data structures that have good worst-case behavior (even if this might mean sacrificing a few percentage points of speed in the average case).

Similarly, when there’s a conflict, we care more about 99th percentile latencies than we do about raw throughput, because insufficient throughput – if it’s consistent! – can often be budgeted for and handled with horizontal scaling, but once you lose latency it’s gone forever, and latency spikes can easily cross over to become a correctness issue (e.g., an RPC server that responds slowly enough to trigger timeouts is effectively non-functional). Again, of course, this doesn’t mean we don’t care about throughput – but sometimes engineering requires making trade-offs, especially for early-stage projects that haven’t had time to optimize for all use cases yet.

And finally: we care about speed on real-world applications quite a bit, but speed on microbenchmarks is just about our lowest priority. We aren’t interested in competing to build “the fastest echo server in the West”. I mean, it’s nice if it happens or whatever, and microbenchmarks are an invaluable tool for understanding a system’s behavior. But if you play that game to win then it’s very easy to get yourself into a situation with seriously misaligned incentives, where you have to start compromising on features and correctness in order to get a speedup that’s totally irrelevant to real-world applications. In most cases (we suspect) it’s the application code that’s the bottleneck, and you’ll get more of a win out of running the whole app under PyPy than out of any heroic optimizations to the I/O layer. (And this is why Trio does place a priority on PyPy compatibility.)

As a matter of tactics, we also note that at this stage in Trio’s lifecycle, it’d probably be a mistake to worry about speed too much. It doesn’t make sense to spend lots of effort optimizing an API whose semantics are still in flux.

User-level API principles

Basic principles

Trio is very much a continuation of the ideas explored in this blog post, and in particular the principles identified there that make curio easier to use correctly than asyncio. So trio also adopts these rules, in particular:

  • The only form of concurrency is the task.
  • Tasks are guaranteed to run to completion.
  • Task spawning is always explicit. No callbacks, no implicit concurrency, no futures/deferreds/promises/other APIs that involve callbacks. All APIs are “causal” except for those that are explicitly used for task spawning.
  • Exceptions are used for error handling; try/finally and with blocks for handling cleanup.

Cancel points and schedule points

The first major place that trio departs from curio is in its decision to make a much larger fraction of the API use sync functions rather than async functions, and to provide strong conventions about cancel points and schedule points. (At this point, there are a lot of ways that trio and curio have diverged. But this was really the origin – the tipping point where I realized that exploring these ideas would require a new library, and couldn’t be done inside curio.) The full reasoning here takes some unpacking.

First, some definitions: a cancel point is a point where your code checks if it has been cancelled – e.g., due to a timeout having expired – and potentially raises a Cancelled error. A schedule point is a point where the current task can potentially be suspended, and another task allowed to run.

In curio, the convention is that all operations that interact with the run loop in any way are syntactically async, and it’s undefined which of these operations are cancel/schedule points; users are instructed to assume that any of them might be cancel/schedule points, but with a few exceptions there’s no guarantee that any of them are unless they actually block. (I.e., whether a given call acts as a cancel/schedule point is allowed to vary across curio versions and also depending on runtime factors like network load.)

But when using an async library, there are good reasons why you need to be aware of cancel and schedule points. They introduce a set of complex and partially conflicting constraints on your code:

You need to make sure that every task passes through a cancel point regularly, because otherwise timeouts become ineffective and your code becomes subject to DoS attacks and other problems. So for correctness, it’s important to make sure you have enough cancel points.

But… every cancel point also increases the chance of subtle bugs in your program, because it’s a place where you have to be prepared to handle a Cancelled exception and clean up properly. And while we try to make this as easy as possible, these kinds of clean-up paths are notorious for getting missed in testing and harboring subtle bugs. So the more cancel points you have, the harder it is to make sure your code is correct.

Similarly, you need to make sure that every task passes through a schedule point regularly, because otherwise this task could end up hogging the event loop and preventing other code from running, causing a latency spike. So for correctness, it’s important to make sure you have enough schedule points.

But… you have to be careful here too, because every schedule point is a point where arbitrary other code could run, and alter your program’s state out from under you, introducing classic concurrency bugs. So as you add more schedule points, it becomes exponentially harder to reason about how your code is interleaved and be sure that it’s correct.

So an important question for an async I/O library is: how do we help the user manage these trade-offs?

Trio’s answer is informed by two further observations:

First, any time a task blocks (e.g., because it does an await sock.recv() but there’s no data available to receive), that has to be a cancel point (because if the I/O never arrives, we need to be able to time out), and it has to be a schedule point (because the whole idea of asynchronous programming is that when one task is waiting we can switch to another task to get something useful done).

And second, a function which sometimes counts as a cancel/schedule point, and sometimes doesn’t, is the worst of both worlds: you have put in the effort to make sure your code handles cancellation or interleaving correctly, but you can’t count on it to help meet latency requirements.

With all that in mind, trio takes the following approach:

Rule 1: to reduce the number of concepts to keep track of, we collapse cancel points and schedule points together. Every point that is a cancel point is also a schedule point and vice versa. These are distinct concepts both theoretically and in the actual implementation, but we hide that distinction from the user so that there’s only one concept they need to keep track of.

Rule 2: Cancel+schedule points are determined statically. A trio primitive is either always a cancel+schedule point, or never a cancel+schedule point, regardless of runtime conditions. This is because we want it to be possible to determine whether some code has “enough” cancel/schedule points by reading the source code.

In fact, to make this even simpler, we make it so you don’t even have to look at the function arguments: each function is either a cancel+schedule point on every call or on no calls.

Observation: since blocking is always a cancel+schedule point, rule 2 implies that any function that sometimes blocks is always a cancel+schedule point.

So that gives us a number of cancel+schedule points: all the functions that can block. Are there any others? Trio’s answer is: no. It’s easy to add new points explicitly (throw in a sleep(0) or whatever) but hard to get rid of them when you don’t want them. (And this is a real issue – “too many potential cancel points” is definitely a tension I’ve felt while trying to build things like task supervisors in curio.) And we expect that most trio programs will execute potentially-blocking operations “often enough” to produce reasonable behavior. So, rule 3: the only cancel+schedule points are the potentially-blocking operations.

And now that we know where our cancel+schedule points are, there’s the question of how to effectively communicate this information to the user. We want some way to mark out a category of functions that might block or trigger a task switch, so that they’re clearly distinguished from functions that don’t do this. Wouldn’t it be nice if there were some Python feature, that naturally divided functions into two categories, and maybe put some sort of special syntactic marking on with the functions that can do weird things like block and task switch…? What a coincidence, that’s exactly how async functions work! Rule 4: in trio, only the potentially blocking functions are async. So e.g. Event.wait() is async, but Event.set() is sync.

Summing up: out of what’s actually a pretty vast space of design possibilities, we declare by fiat that when it comes to trio primitives, all of these categories are identical:

  • async functions
  • functions that can, under at least some circumstances, block
  • functions where the caller needs to be prepared to handle potential Cancelled exceptions
  • functions that are guaranteed to notice any pending cancellation
  • functions where you need to be prepared for a potential task switch
  • functions that are guaranteed to take care of switching tasks if appropriate

This requires some non-trivial work internally – it actually takes a fair amount of care to make those 4 cancel/schedule categories line up, and there are some shenanigans required to let sync and async APIs both interact with the run loop on an equal footing. But this is all invisible to the user, we feel that it pays off in terms of usability and correctness.

There is one exception to these rules, for async context managers. Context managers are composed of two operations – enter and exit – and sometimes only one of these is potentially blocking. (Examples: async with lock: can block when entering but never when exiting; async with open_nursery() as ...: can block when exiting but never when entering.) But, Python doesn’t have “half-asynchronous” context managers: either both operations are async-flavored, or neither is. In Trio we take a pragmatic approach: for this kind of async context manager, we enforce the above rules only on the potentially blocking operation, and the other operation is allowed to be syntactically async but semantically synchronous. And async context managers should always document which of their operations are schedule+cancel points.

Exceptions always propagate

Another rule that trio follows is that exceptions must always propagate. This is like the zen line about “Errors should never pass silently”, except that in every other concurrency library for Python (threads, asyncio, curio, …), it’s fairly common to end up with an undeliverable exception, which just gets printed to stderr and then discarded. While we understand the pragmatic constraints that motivated these libraries to adopt this approach, we feel that there are far too many situations where no human will ever look at stderr and notice the problem, and insist that trio APIs find a way to propagate exceptions “up the stack” – whatever that might mean.

This is often a challenging rule to follow – for example, the call soon code has to jump through some hoops to make it happen – but its most dramatic influence can seen in trio’s task-spawning interface, where it motivates the use of “nurseries”:

async def parent():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(child)

(See Tasks let you do multiple things at once for full details.)

If you squint you can see the conceptual influence of Erlang’s “task linking” and “task tree” ideas here, though the details are different.

This design also turns out to enforce a remarkable, unexpected invariant.

In the blog post I called out a nice feature of curio’s spawning API, which is that since spawning is the only way to break causality, and in curio spawn is async, which means that in curio sync functions are guaranteed to be causal. One limitation though is that this invariant is actually not very predictive: in curio there are lots of async functions that could spawn off children and violate causality, but most of them don’t, but there’s no clear marker for the ones that do.

Our API doesn’t quite give that guarantee, but actually a better one. In trio:

  • Sync functions can’t create nurseries, because nurseries require an async with
  • Any async function can create a nursery and start new tasks… but creating a nursery allows task starting but does not permit causality breaking, because the children have to exit before the function is allowed to return. So we can preserve causality without having to give up concurrency!
  • The only way to violate causality (which is an important feature, just one that needs to be handled carefully) is to explicitly create a nursery object in one task and then pass it into another task. And this provides a very clear and precise signal about where the funny stuff is happening – just watch for the nursery object getting passed around.

Introspection, debugging, testing

Tools for introspection and debugging are critical to achieving usability and correctness in practice, so they should be first-class considerations in trio.

Similarly, the availability of powerful testing tools has a huge impact on usability and correctness; we consider testing helpers to be very much in scope for the trio project.

Specific style guidelines

  • As noted above, functions that don’t block should be sync-colored, and functions that might block should be async-colored and unconditionally act as cancel+schedule points.

  • Any function that takes a callable to run should have a signature like:

    def call_the_thing(fn, *args, kwonly1, kwonly2, ...)::
        ...
    

    where fn(*args) is the thing to be called, and kwonly1, kwonly2, … are keyword-only arguments that belong to call_the_thing. This applies even if call_the_thing doesn’t take any arguments of its own, i.e. in this case its signature looks like:

    def call_the_thing(fn, *args)::
        ...
    

    This allows users to skip faffing about with functools.partial() in most cases, while still providing an unambiguous and extensible way to pass arguments to the caller. (Hat-tip to asyncio, who we stole this convention from.)

  • Whenever it makes sense, trio classes should have a method called statistics() which returns an immutable object with named fields containing internal statistics about the object that are useful for debugging or introspection (examples).

  • Functions or methods whose purpose is to wait for a condition to become true should be called wait_<condition>. This avoids ambiguities like “does await readable() check readability (returning a bool) or wait for readability?”.

    Sometimes this leads to the slightly funny looking await wait_.... Sorry. As far as I can tell all the alternatives are worse, and you get used to the convention pretty quick.

  • If it’s desirable to have both blocking and non-blocking versions of a function, then they look like:

    async def OPERATION(...):
        ...
    
    def OPERATION_nowait(...):
        ...
    

    and the nowait version raises trio.WouldBlock if it would block.

  • The word monitor is used for APIs that involve an trio.hazmat.UnboundedQueue receiving some kind of events. (Examples: nursery .monitor attribute, some of the low-level I/O functions in trio.hazmat.)

  • …we should, but currently don’t, have a solid convention to distinguish between functions that take an async callable and those that take a sync callable. See issue #68.

A brief tour of trio’s internals

If you want to understand how trio is put together internally, then the first thing to know is that there’s a very strict internal layering: the trio._core package is a fully self-contained implementation of the core scheduling/cancellation/IO handling logic, and then the other trio.* modules are implemented in terms of the API it exposes. (If you want to see what this API looks like, then import trio; print(trio._core.__all__)). Everything exported from trio._core is also exported as part of the trio, trio.hazmat, or trio.testing namespaces. (See their respective __init__.py files for details; there’s a test to enforce this.)

Rationale: currently, trio is a new project in a novel part of the design space, so we don’t make any stability guarantees. But the goal is to reach the point where we can declare the API stable. It’s unlikely that we’ll be able to quickly explore all possible corners of the design space and cover all possible types of I/O. So instead, our strategy is to make sure that it’s possible for independent packages to add new features on top of trio. Enforcing the trio vs trio._core split is a way of eating our own dogfood: basic functionality like trio.Queue and trio.socket is actually implemented solely in terms of public APIs. And the hope is that by doing this, we increase the chances that someone who comes up with a better kind of queue or wants to add some new functionality like, say, file system change watching, will be able to do that on top of our public APIs without having to modify trio internals.

Inside trio._core

There are three notable sub-modules that are largely independent of the rest of trio, and could (possibly should?) be extracted into their own independent packages:

  • _result.py: Defines Result.
  • _multierror.py: Implements MultiError and associated infrastructure.
  • _ki.py: Implements the core infrastructure for safe handling of KeyboardInterrupt.

The most important submodule, where everything is integrated, is _run.py. (This is also by far the largest submodule; it’d be nice to factor bits of it out with possible, but it’s tricky because the core functionality genuinely is pretty intertwined.) Notably, this is where cancel scopes, nurseries, and Task are defined; it’s also where the scheduler state and trio.run() live.

The one thing that isn’t in _run.py is I/O handling. This is delegated to an IOManager class, of which there are currently three implementations:

  • EpollIOManager in _io_epoll.py (used on Linux, Illuminos)
  • KqueueIOManager in _io_kqueue.py (used on MacOS, *BSD)
  • WindowsIOManager in _io_windows.py (used on Windows)

The epoll and kqueue backends take advantage of the epoll and kqueue wrappers in the stdlib select module. The windows backend uses CFFI to access to the Win32 API directly (see trio/_core/_windows_cffi.py). In general, we prefer to go directly to the raw OS functionality rather than use selectors, for several reasons:

  • Controlling our own fate: I/O handling is pretty core to what trio is about, and selectors is (as of 2017-03-01) somewhat buggy (e.g. issue 29587, issue 29255). Which isn’t a big deal on its own, but since selectors is part of the standard library we can’t fix it and ship an updated version; we’re stuck with whatever we get. We want more control over our users’ experience than that.
  • Impedence mismatch: the selectors API isn’t particularly well-fitted to how we want to use it. For example, kqueue natively treats an interest in readability of some fd as a separate thing from an interest in that same fd’s writability, which neatly matches trio’s model. selectors.KqueueSelector goes to some effort internally to lump together all interests in a single fd, and to use it we’d then we’d have to jump through more hoops to reverse this. Of course, the native epoll API is fd-centric in the same way as the selectors API so we do still have to write code to jump through these hoops, but the point is that the selectors abstractions aren’t providing a lot of extra value.
  • (Most important) Access to raw platform capabilities: selectors is highly inadequate on Windows, and even on Unix-like systems it hides a lot of power (e.g. kqueue can do a lot more than just check fd readability/writability!).

The IOManager layer provides a fairly raw exposure of the capabilities of each system, with public API functions that vary between different backends. (This is somewhat inspired by how os works.) These public APIs are then exported as part of trio.hazmat, and higher-level APIs like trio.socket abstract over these system-specific APIs to provide a uniform experience.

Currently the choice of backend is made statically at import time, and there is no provision for “pluggable” backends. The intuition here is that we’d rather focus our energy on making one set of solid, official backends that provide a high-quality experience out-of-the-box on all supported systems.

Release history

Trio 0.3.0 (2017-12-28)

Features

  • Simplified nurseries: In Trio, the rule used to be that “parenting is a full time job”, meaning that after a task opened a nursery and spawned some children into it, it had to immediately block in __aexit__ to supervise the new children, or else exception propagation wouldn’t work. Also there was some elaborate machinery to let you replace this supervision logic with your own custom supervision logic. Thanks to new advances in task-rearing technology, parenting is no longer a full time job! Now the supervision happens automatically in the background, and essentially the body of a async with trio.open_nursery() block acts just like a task running inside the nursery. This is important: it makes it possible for libraries to abstract over nursery creation. For example, if you have a Websocket library that needs to run a background task to handle Websocket pings, you can now do that with async with open_websocket(...) as ws: ..., and that can run a task in the background without your users having to worry about parenting it. And don’t worry, you can still make custom supervisors; it turned out all that spiffy machinery was actually redundant and didn’t provide much value. (#136)
  • Trio socket methods like bind and connect no longer require “pre-resolved” numeric addresses; you can now pass regular hostnames and Trio will implicitly resolve them for you. (#377)

Bugfixes

  • Fixed some corner cases in Trio socket method implicit name resolution to better match stdlib behavior. Example: sock.bind(("", port)) now binds to the wildcard address instead of raising an error. (#277)

Deprecations and Removals

  • Removed everything that was deprecated in 0.2.0; see the 0.2.0 release notes below for details.
  • As was foretold in the v0.2.0 release notes, the bind method on Trio sockets is now async. Please update your calls or – better yet – switch to our shiny new high-level networking API, like serve_tcp(). (#241)
  • The resolve_local_address and resolve_remote_address methods on Trio sockets have been deprecated; these are unnecessary now that you can just pass your hostnames directly to the socket methods you want to use. (#377)

Trio 0.2.0 (2017-12-06)

Trio 0.2.0 contains changes from 14 contributors, and brings major new features and bug fixes, as well as a number of deprecations and a very small number of backwards incompatible changes. We anticipate that these should be easy to adapt to, but make sure to read about them below, and if you’re using Trio then remember to read and subscribe to issue #1.

Highlights

Breaking changes and deprecations

Trio is a young and ambitious project, but it also aims to become a stable, production-quality foundation for async I/O in Python. Therefore, our approach for now is to provide deprecation warnings where-ever possible, but on a fairly aggressive cycle as we push towards stability. If you use Trio you should read and subscribe to issue #1. We’d also welcome feedback on how this approach is working, whether our deprecation warnings could be more helpful, or anything else.

The tl;dr is: stop using socket.bind if you can, and then fix everything your test suite warns you about.

Upcoming breaking changes without warnings (i.e., stuff that works in 0.2.0, but won’t work in 0.3.0):

  • In the next release, the bind method on Trio socket objects will become async (#241). Unfortunately, there’s no good way to provide a warning here. We recommend switching to the new highlevel networking APIs like serve_tcp(), which will insulate you from this change.

Breaking changes (i.e., stuff that could theoretically break a program that worked on 0.1.0):

  • trio.socket no longer attempts to normalize or modernize socket options across different platforms. The high-level networking API now handles that, freeing trio.socket to focus on giving you raw, unadulterated BSD sockets.
  • When a socket sendall call was cancelled, it used to attach some metadata to the exception reporting how much data was actually sent. It no longer does this, because in common configurations like an SSLStream wrapped around a SocketStream it becomes ambiguous which “level” the partial metadata applies to, leading to confusion and bugs. There is no longer any way to tell how much data was sent after a sendall is cancelled.
  • The trio.socket.getprotobyname() function is now async, like it should have been all along. I doubt anyone will ever use it, but that’s no reason not to get the details right.
  • The trio.socket functions getservbyport, getservbyname, and getfqdn have been removed, because they were obscure, buggy, and obsolete. Use getaddrinfo() instead.

Upcoming breaking changes with warnings (i.e., stuff that in 0.2.0 will work but will print loud complaints, and that won’t work in 0.3.0):

Unfortunately, a limitation in PyPy3 5.8 breaks our deprecation handling for some renames. (Attempting to use the old names will give an unhelpful error instead of a helpful warning.) This does not affect CPython, or PyPy3 5.9+.

Other changes

  • run_sync_in_worker_thread() now has a robust mechanism for applying capacity limits to the number of concurrent threads (#10, #57, #156)

  • New support for tests to cleanly hook hostname lookup and socket operations: see Virtual networking for testing. In addition, trio.socket.SocketType is now an empty abstract base class, with the actual socket class made private. This shouldn’t effect anyone, since the only thing you could directly use it for in the first place was isinstance checks, and those still work (#170)

  • New class StrictFIFOLock

  • New exception ResourceBusyError

  • The trio.hazmat.ParkingLot class (which is used to implement many of Trio’s synchronization primitives) was rewritten to be simpler and faster (#272, #287)

  • It’s generally true that if you’re using Trio you have to use Trio functions, if you’re using asyncio you have to use asyncio functions, and so forth. (See the discussion of the “async sandwich” in the Trio tutorial for more details.) So for example, this isn’t going to work:

    async def main():
        # asyncio here
        await asyncio.sleep(1)
    
    # trio here
    trio.run(main)
    

    Trio now reliably detects if you accidentally do something like this, and gives a helpful error message.

  • Trio now also has special error messages for several other common errors, like doing trio.run(some_func()) (should be trio.run(some_func)).

  • trio.socket now handles non-ascii domain names using the modern IDNA 2008 standard instead of the obsolete IDNA 2003 standard (#11)

  • When an Instrument raises an unexpected error, we now route it through the logging module instead of printing it directly to stderr. Normally this produces exactly the same effect, but this way it’s more configurable. (#306)

  • Fixed a minor race condition in IOCP thread shutdown on Windows (#81)

  • Control-C handling on Windows now uses signal.set_wakeup_fd() and should be more reliable (#42)

  • trio.run() takes a new keyword argument restrict_keyboard_interrupt_to_checkpoints

  • New attributes allow more detailed introspection of the task tree: nursery.child_tasks, Task.child_nurseries, nursery.parent_task, Task.parent_nursery

  • trio.testing.wait_all_tasks_blocked() now takes a tiebreaker= argument. The main use is to allow MockClock’s auto-jump functionality to avoid interfering with direct use of wait_all_tasks_blocked() in the same test.

  • MultiError.catch() now correctly preserves __context__, despite Python’s best attempts to stop us (#165)

  • It is now possible to take weakrefs to Lock and many other classes (#331)

  • Fix sock.accept() for IPv6 sockets (#164)

  • PyCharm (and hopefully other IDEs) can now offer better completions for the trio and trio.hazmat modules (#314)

  • Trio now uses yapf to standardize formatting across the source tree, so we never have to think about whitespace again.

  • Many documentation improvements

Trio 0.1.0 (2017-03-10)

  • Initial release.

Code of Conduct

This code of conduct applies to the Trio project, and all associated projects in the python-trio organization.

When Something Happens

If you see a Code of Conduct violation, follow these steps:

  1. Let the person know that what they did is not appropriate and ask them to stop and/or edit their message(s) or commits.
  2. That person should immediately stop the behavior and correct the issue.
  3. If this doesn’t happen, or if you’re uncomfortable speaking up, contact the maintainers.
  4. As soon as possible, a maintainer will look into the issue, and take further action (see below), starting with a warning, then temporary block, then long-term repo or organization ban.

When reporting, please include any relevant details, links, screenshots, context, or other information that may be used to better understand and resolve the situation.

The maintainer team will prioritize the well-being and comfort of the recipients of the violation over the comfort of the violator. See some examples below.

Our Pledge

In the interest of fostering an open and welcoming environment, we as contributors and maintainers of this project pledge to making participation in our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, gender identity and expression, level of experience, technical preferences, nationality, personal appearance, race, religion, or sexual identity and orientation.

Our Standards

Examples of behavior that contributes to creating a positive environment include:

  • Using welcoming and inclusive language.
  • Being respectful of differing viewpoints and experiences.
  • Gracefully accepting constructive feedback.
  • Focusing on what is best for the community.
  • Showing empathy and kindness towards other community members.
  • Encouraging and raising up your peers in the project so you can all bask in hacks and glory.

Examples of unacceptable behavior by participants include:

  • The use of sexualized language or imagery and unwelcome sexual attention or advances, including when simulated online. The only exception to sexual topics is channels/spaces specifically for topics of sexual identity.
  • Casual mention of slavery or indentured servitude and/or false comparisons of one’s occupation or situation to slavery. Please consider using or asking about alternate terminology when referring to such metaphors in technology.
  • Making light of/making mocking comments about trigger warnings and content warnings.
  • Trolling, insulting/derogatory comments, and personal or political attacks.
  • Public or private harassment, deliberate intimidation, or threats.
  • Publishing others’ private information, such as a physical or electronic address, without explicit permission. This includes any sort of “outing” of any aspect of someone’s identity without their consent.
  • Publishing private screenshots or quotes of interactions in the context of this project without all quoted users’ explicit consent.
  • Publishing of private communication that doesn’t have to do with reporting harrassment.
  • Any of the above even when presented as “ironic” or “joking”.
  • Any attempt to present “reverse-ism” versions of the above as violations. Examples of reverse-isms are “reverse racism”, “reverse sexism”, “heterophobia”, and “cisphobia”.
  • Unsolicited explanations under the assumption that someone doesn’t already know it. Ask before you teach! Don’t assume what people’s knowledge gaps are.
  • Feigning or exaggerating surprise when someone admits to not knowing something.
  • Well-actuallies
  • Other conduct which could reasonably be considered inappropriate in a professional or community setting.

Scope

This Code of Conduct applies both within spaces involving this project and in other spaces involving community members. This includes the repository, its Pull Requests and Issue tracker, its Twitter community, private email communications in the context of the project, and any events where members of the project are participating, as well as adjacent communities and venues affecting the project’s members.

Depending on the violation, the maintainers may decide that violations of this code of conduct that have happened outside of the scope of the community may deem an individual unwelcome, and take appropriate action to maintain the comfort and safety of its members.

Other Community Standards

As a project on GitHub, this project is additionally covered by the GitHub Community Guidelines.

Enforcement of those guidelines after violations overlapping with the above are the responsibility of the entities, and enforcement may happen in any or all of the services/communities.

Maintainer Enforcement Process

Once the maintainers get involved, they will follow a documented series of steps and do their best to preserve the well-being of project members. This section covers actual concrete steps.

Contacting Maintainers

As a small and young project, we don’t yet have a Code of Conduct enforcement team. Hopefully that will be addressed as we grow, but for now, any issues should be addressed to Nathaniel J. Smith, via email or any other medium that you feel comfortable with. Using words like “Trio code of conduct” in your subject will help make sure your message is noticed quickly.

Further Enforcement

If you’ve already followed the initial enforcement steps, these are the steps maintainers will take for further enforcement, as needed:

  1. Repeat the request to stop.
  2. If the person doubles down, they will have offending messages removed or edited by a maintainers given an official warning. The PR or Issue may be locked.
  3. If the behavior continues or is repeated later, the person will be blocked from participating for 24 hours.
  4. If the behavior continues or is repeated after the temporary block, a long-term (6-12mo) ban will be used.
  5. If after this the behavior still continues, a permanent ban may be enforced.

On top of this, maintainers may remove any offending messages, images, contributions, etc, as they deem necessary.

Maintainers reserve full rights to skip any of these steps, at their discretion, if the violation is considered to be a serious and/or immediate threat to the health and well-being of members of the community. These include any threats, serious physical or verbal attacks, and other such behavior that would be completely unacceptable in any social setting that puts our members at risk.

Members expelled from events or venues with any sort of paid attendance will not be refunded.

Who Watches the Watchers?

Maintainers and other leaders who do not follow or enforce the Code of Conduct in good faith may face temporary or permanent repercussions as determined by other members of the project’s leadership. These may include anything from removal from the maintainer team to a permanent ban from the community.

Additionally, as a project hosted on GitHub, their Code of Conduct may be applied against maintainers of this project, externally of this project’s procedures.

Enforcement Examples

The Best Case

The vast majority of situations work out like this. This interaction is common, and generally positive.

Alex: “Yeah I used X and it was really crazy!”

Patt (not a maintainer): “Hey, could you not use that word? What about ‘ridiculous’ instead?”

Alex: “oh sorry, sure.” -> edits old comment to say “it was really confusing!”

The Maintainer Case

Sometimes, though, you need to get maintainers involved. Maintainers will do their best to resolve conflicts, but people who were harmed by something will take priority.

Patt: “Honestly, sometimes I just really hate using $library and anyone who uses it probably sucks at their job.”

Alex: “Whoa there, could you dial it back a bit? There’s a CoC thing about attacking folks’ tech use like that.”

Patt: “I’m not attacking anyone, what’s your problem?”

Alex: “@maintainers hey uh. Can someone look at this issue? Patt is getting a bit aggro. I tried to nudge them about it, but nope.”

KeeperOfCommitBits: (on issue) “Hey Patt, maintainer here. Could you tone it down? This sort of attack is really not okay in this space.”

Patt: “Leave me alone I haven’t said anything bad wtf is wrong with you.”

KeeperOfCommitBits: (deletes user’s comment), “@patt I mean it. Please refer to the CoC over at (URL to this CoC) if you have questions, but you can consider this an actual warning. I’d appreciate it if you reworded your messages in this thread, since they made folks there uncomfortable. Let’s try and be kind, yeah?”

Patt: “@keeperofbits Okay sorry. I’m just frustrated and I’m kinda burnt out and I guess I got carried away. I’ll DM Alex a note apologizing and edit my messages. Sorry for the trouble.”

KeeperOfCommitBits: “@patt Thanks for that. I hear you on the stress. Burnout sucks :/. Have a good one!”

The Nope Case

PepeTheFrog🐸: “Hi, I am a literal actual nazi and I think white supremacists are quite fashionable.”

Patt: “NOOOOPE. OH NOPE NOPE.”

Alex: “JFC NO. NOPE. @keeperofbits NOPE NOPE LOOK HERE”

KeeperOfCommitBits: “👀 Nope. NOPE NOPE NOPE. 🔥”

PepeTheFrog🐸 has been banned from all organization or user repositories belonging to KeeperOfCommitBits.

Attribution

This Code of Conduct was generated using WeAllJS Code of Conduct Generator, which is based on the WeAllJS Code of Conduct, which is itself based on Contributor Covenant, version 1.4, available at http://contributor-covenant.org/version/1/4, and the LGBTQ in Technology Slack Code of Conduct.