Trio’s core functionality

Entering Trio

If you want to use Trio, then the first thing you have to do is call trio.run():

trio.run(async_fn: Callable[..., Awaitable[RetT]], *args: object, clock: Clock | None = None, instruments: Sequence[Instrument] = (), restrict_keyboard_interrupt_to_checkpoints: bool = False, strict_exception_groups: bool = True) RetT

Run a Trio-flavored async function, and return the result.

Calling:

run(async_fn, *args)

is the equivalent of:

await async_fn(*args)

except that run() can (and must) be called from a synchronous context.

This is Trio’s main entry point. Almost every other function in Trio requires that you be inside a call to run().

Parameters:
  • async_fn – An async function.

  • args – Positional arguments to be passed to async_fn. If you need to pass keyword arguments, then use functools.partial().

  • clockNone to use the default system-specific monotonic clock; otherwise, an object implementing the trio.abc.Clock interface, like (for example) a trio.testing.MockClock instance.

  • instruments (list of trio.abc.Instrument objects) – Any instrumentation you want to apply to this run. This can also be modified during the run; see Instrument API.

  • restrict_keyboard_interrupt_to_checkpoints (bool) –

    What happens if the user hits control-C while run() is running? If this argument is False (the default), then you get the standard Python behavior: a KeyboardInterrupt exception will immediately interrupt whatever task is running (or if no task is running, then Trio will wake up a task to be interrupted). Alternatively, if you set this argument to True, then KeyboardInterrupt delivery will be delayed: it will be only be raised at checkpoints, like a Cancelled exception.

    The default behavior is nice because it means that even if you accidentally write an infinite loop that never executes any checkpoints, then you can still break out of it using control-C. The alternative behavior is nice if you’re paranoid about a KeyboardInterrupt at just the wrong place leaving your program in an inconsistent state, because it means that you only have to worry about KeyboardInterrupt at the exact same places where you already have to worry about Cancelled.

    This setting has no effect if your program has registered a custom SIGINT handler, or if run() is called from anywhere but the main thread (this is a Python limitation), or if you use open_signal_receiver() to catch SIGINT.

  • strict_exception_groups (bool) – Unless set to False, nurseries will always wrap even a single raised exception in an exception group. This can be overridden on the level of individual nurseries. Setting it to False will be deprecated and ultimately removed in a future version of Trio.

Returns:

Whatever async_fn returns.

Raises:
  • TrioInternalError – if an unexpected error is encountered inside Trio’s internal machinery. This is a bug and you should let us know.

  • Anything else – if async_fn raises an exception, then run() propagates it.

General principles

Checkpoints

When writing code using Trio, it’s very important to understand the concept of a checkpoint. Many of Trio’s functions act as checkpoints.

A checkpoint is two things:

  1. It’s a point where Trio checks for cancellation. For example, if the code that called your function set a timeout, and that timeout has expired, then the next time your function executes a checkpoint Trio will raise a Cancelled exception. See Cancellation and timeouts below for more details.

  2. It’s a point where the Trio scheduler checks its scheduling policy to see if it’s a good time to switch to another task, and potentially does so. (Currently, this check is very simple: the scheduler always switches at every checkpoint. But this might change in the future.)

When writing Trio code, you need to keep track of where your checkpoints are. Why? First, because checkpoints require extra scrutiny: whenever you execute a checkpoint, you need to be prepared to handle a Cancelled error, or for another task to run and rearrange some state out from under you. And second, because you also need to make sure that you have enough checkpoints: if your code doesn’t pass through a checkpoint on a regular basis, then it will be slow to notice and respond to cancellation and – much worse – since Trio is a cooperative multi-tasking system where the only place the scheduler can switch tasks is at checkpoints, it’ll also prevent the scheduler from fairly allocating time between different tasks and adversely effect the response latency of all the other code running in the same process. (Informally we say that a task that does this is “hogging the run loop”.)

So when you’re doing code review on a project that uses Trio, one of the things you’ll want to think about is whether there are enough checkpoints, and whether each one is handled correctly. Of course this means you need a way to recognize checkpoints. How do you do that? The underlying principle is that any operation that blocks has to be a checkpoint. This makes sense: if an operation blocks, then it might block for a long time, and you’ll want to be able to cancel it if a timeout expires; and in any case, while this task is blocked we want another task to be scheduled to run so our code can make full use of the CPU.

But if we want to write correct code in practice, then this principle is a little too sloppy and imprecise to be useful. How do we know which functions might block? What if a function blocks sometimes, but not others, depending on the arguments passed / network speed / phase of the moon? How do we figure out where the checkpoints are when we’re stressed and sleep deprived but still want to get this code review right, and would prefer to reserve our mental energy for thinking about the actual logic instead of worrying about checkpoints?

Don’t worry – Trio’s got your back. Since checkpoints are important and ubiquitous, we make it as simple as possible to keep track of them. Here are the rules:

  • Regular (synchronous) functions never contain any checkpoints.

  • If you call an async function provided by Trio (await <something in trio>), and it doesn’t raise an exception, then it always acts as a checkpoint. (If it does raise an exception, it might act as a checkpoint or might not.)

    • This includes async iterators: If you write async for ... in <a trio object>, then there will be at least one checkpoint in each iteration of the loop, and it will still checkpoint if the iterable is empty.

    • Partial exception for async context managers: Both the entry and exit of an async with block are defined as async functions; but for a particular type of async context manager, it’s often the case that only one of them is able to block, which means only that one will act as a checkpoint. This is documented on a case-by-case basis.

  • Third-party async functions / iterators / context managers can act as checkpoints; if you see await <something> or one of its friends, then that might be a checkpoint. So to be safe, you should prepare for scheduling or cancellation happening there.

The reason we distinguish between Trio functions and other functions is that we can’t make any guarantees about third party code. Checkpoint-ness is a transitive property: if function A acts as a checkpoint, and you write a function that calls function A, then your function also acts as a checkpoint. If you don’t, then it isn’t. So there’s nothing stopping someone from writing a function like:

# technically legal, but bad style:
async def why_is_this_async():
    return 7

that never calls any of Trio’s async functions. This is an async function, but it’s not a checkpoint. But why make a function async if it never calls any async functions? It’s possible, but it’s a bad idea. If you have a function that’s not calling any async functions, then you should make it synchronous. The people who use your function will thank you, because it makes it obvious that your function is not a checkpoint, and their code reviews will go faster.

(Remember how in the tutorial we emphasized the importance of the “async sandwich”, and the way it means that await ends up being a marker that shows when you’re calling a function that calls a function that … eventually calls one of Trio’s built-in async functions? The transitivity of async-ness is a technical requirement that Python imposes, but since it exactly matches the transitivity of checkpoint-ness, we’re able to exploit it to help you keep track of checkpoints. Pretty sneaky, eh?)

A slightly trickier case is a function like:

async def sleep_or_not(should_sleep):
    if should_sleep:
        await trio.sleep(1)
    else:
        pass

Here the function acts as a checkpoint if you call it with should_sleep set to a true value, but not otherwise. This is why we emphasize that Trio’s own async functions are unconditional checkpoints: they always check for cancellation and check for scheduling, regardless of what arguments they’re passed. If you find an async function in Trio that doesn’t follow this rule, then it’s a bug and you should let us know.

Inside Trio, we’re very picky about this, because Trio is the foundation of the whole system so we think it’s worth the extra effort to make things extra predictable. It’s up to you how picky you want to be in your code. To give you a more realistic example of what this kind of issue looks like in real life, consider this function:

async def recv_exactly(sock, nbytes):
    data = bytearray()
    while nbytes > 0:
        # recv() reads up to 'nbytes' bytes each time
        chunk = await sock.recv(nbytes)
        if not chunk:
            raise RuntimeError("socket unexpected closed")
        nbytes -= len(chunk)
        data += chunk
    return data

If called with an nbytes that’s greater than zero, then it will call sock.recv at least once, and recv is an async Trio function, and thus an unconditional checkpoint. So in this case, recv_exactly acts as a checkpoint. But if we do await recv_exactly(sock, 0), then it will immediately return an empty buffer without executing a checkpoint. If this were a function in Trio itself, then this wouldn’t be acceptable, but you may decide you don’t want to worry about this kind of minor edge case in your own code.

If you do want to be careful, or if you have some CPU-bound code that doesn’t have enough checkpoints in it, then it’s useful to know that await trio.sleep(0) is an idiomatic way to execute a checkpoint without doing anything else, and that trio.testing.assert_checkpoints() can be used to test that an arbitrary block of code contains a checkpoint.

Thread safety

The vast majority of Trio’s API is not thread safe: it can only be used from inside a call to trio.run(). This manual doesn’t bother documenting this on individual calls; unless specifically noted otherwise, you should assume that it isn’t safe to call any Trio functions from anywhere except the Trio thread. (But see below if you really do need to work with threads.)

Time and clocks

Every call to run() has an associated clock.

By default, Trio uses an unspecified monotonic clock, but this can be changed by passing a custom clock object to run() (e.g. for testing).

You should not assume that Trio’s internal clock matches any other clock you have access to, including the clocks of simultaneous calls to trio.run() happening in other processes or threads!

The default clock is currently implemented as time.perf_counter() plus a large random offset. The idea here is to catch code that accidentally uses time.perf_counter() early, which should help keep our options open for changing the clock implementation later, and (more importantly) make sure you can be confident that custom clocks like trio.testing.MockClock will work with third-party libraries you don’t control.

trio.current_time() float

Returns the current time according to Trio’s internal clock.

Returns:

The current time.

Return type:

float

Raises:

RuntimeError – if not inside a call to trio.run().

await trio.sleep(seconds: float) None

Pause execution of the current task for the given number of seconds.

Parameters:

seconds (float) – The number of seconds to sleep. May be zero to insert a checkpoint without actually blocking.

Raises:

ValueError – if seconds is negative or NaN.

await trio.sleep_until(deadline: float) None

Pause execution of the current task until the given time.

The difference between sleep() and sleep_until() is that the former takes a relative time and the latter takes an absolute time according to Trio’s internal clock (as returned by current_time()).

Parameters:

deadline (float) – The time at which we should wake up again. May be in the past, in which case this function executes a checkpoint but does not block.

Raises:

ValueError – if deadline is NaN.

await trio.sleep_forever() None

Pause execution of the current task forever (or until cancelled).

Equivalent to calling await sleep(math.inf).

If you’re a mad scientist or otherwise feel the need to take direct control over the PASSAGE OF TIME ITSELF, then you can implement a custom Clock class:

class trio.abc.Clock

The interface for custom run loop clocks.

abstractmethod current_time() float

Return the current time, according to this clock.

This is used to implement functions like trio.current_time() and trio.move_on_after().

Returns:

The current time.

Return type:

float

abstractmethod deadline_to_sleep_time(deadline: float) float

Compute the real time until the given deadline.

This is called before we enter a system-specific wait function like select.select(), to get the timeout to pass.

For a clock using wall-time, this should be something like:

return deadline - self.current_time()

but of course it may be different if you’re implementing some kind of virtual clock.

Parameters:

deadline (float) – The absolute time of the next deadline, according to this clock.

Returns:

The number of real seconds to sleep until the given deadline. May be math.inf.

Return type:

float

abstractmethod start_clock() None

Do any setup this clock might need.

Called at the beginning of the run.

Cancellation and timeouts

Trio has a rich, composable system for cancelling work, either explicitly or when a timeout expires.

A simple timeout example

In the simplest case, you can apply a timeout to a block of code:

with trio.move_on_after(30):
    result = await do_http_get("https://...")
    print("result is", result)
print("with block finished")

We refer to move_on_after() as creating a “cancel scope”, which contains all the code that runs inside the with block. If the HTTP request takes more than 30 seconds to run, then it will be cancelled: we’ll abort the request and we won’t see result is ... printed on the console; instead we’ll go straight to printing the with block finished message.

Note

Note that this is a single 30 second timeout for the entire body of the with statement. This is different from what you might have seen with other Python libraries, where timeouts often refer to something more complicated. We think this way is easier to reason about.

How does this work? There’s no magic here: Trio is built using ordinary Python functionality, so we can’t just abandon the code inside the with block. Instead, we take advantage of Python’s standard way of aborting a large and complex piece of code: we raise an exception.

Here’s the idea: whenever you call a cancellable function like await trio.sleep(...) or await sock.recv(...) – see Checkpoints – then the first thing that function does is to check if there’s a surrounding cancel scope whose timeout has expired, or otherwise been cancelled. If so, then instead of performing the requested operation, the function fails immediately with a Cancelled exception. In this example, this probably happens somewhere deep inside the bowels of do_http_get. The exception then propagates out like any normal exception (you could even catch it if you wanted, but that’s generally a bad idea), until it reaches the with move_on_after(...):. And at this point, the Cancelled exception has done its job – it’s successfully unwound the whole cancelled scope – so move_on_after() catches it, and execution continues as normal after the with block. And this all works correctly even if you have nested cancel scopes, because every Cancelled object carries an invisible marker that makes sure that the cancel scope that triggered it is the only one that will catch it.

Handling cancellation

Pretty much any code you write using Trio needs to have some strategy to handle Cancelled exceptions – even if you didn’t set a timeout, then your caller might (and probably will).

You can catch Cancelled, but you shouldn’t! Or more precisely, if you do catch it, then you should do some cleanup and then re-raise it or otherwise let it continue propagating (unless you encounter an error, in which case it’s OK to let that propagate instead). To help remind you of this fact, Cancelled inherits from BaseException, like KeyboardInterrupt and SystemExit do, so that it won’t be caught by catch-all except Exception: blocks.

It’s also important in any long-running code to make sure that you regularly check for cancellation, because otherwise timeouts won’t work! This happens implicitly every time you call a cancellable operation; see below for details. If you have a task that has to do a lot of work without any I/O, then you can use await sleep(0) to insert an explicit cancel+schedule point.

Here’s a rule of thumb for designing good Trio-style (“trionic”?) APIs: if you’re writing a reusable function, then you shouldn’t take a timeout= parameter, and instead let your caller worry about it. This has several advantages. First, it leaves the caller’s options open for deciding how they prefer to handle timeouts – for example, they might find it easier to work with absolute deadlines instead of relative timeouts. If they’re the ones calling into the cancellation machinery, then they get to pick, and you don’t have to worry about it. Second, and more importantly, this makes it easier for others to reuse your code. If you write a http_get function, and then I come along later and write a log_in_to_twitter function that needs to internally make several http_get calls, I don’t want to have to figure out how to configure the individual timeouts on each of those calls – and with Trio’s timeout system, it’s totally unnecessary.

Of course, this rule doesn’t apply to APIs that need to impose internal timeouts. For example, if you write a start_http_server function, then you probably should give your caller some way to configure timeouts on individual requests.

Cancellation semantics

You can freely nest cancellation blocks, and each Cancelled exception “knows” which block it belongs to. So long as you don’t stop it, the exception will keep propagating until it reaches the block that raised it, at which point it will stop automatically.

Here’s an example:

print("starting...")
with trio.move_on_after(5):
    with trio.move_on_after(10):
        await trio.sleep(20)
        print("sleep finished without error")
    print("move_on_after(10) finished without error")
print("move_on_after(5) finished without error")

In this code, the outer scope will expire after 5 seconds, causing the sleep() call to return early with a Cancelled exception. Then this exception will propagate through the with move_on_after(10) line until it’s caught by the with move_on_after(5) context manager. So this code will print:

starting...
move_on_after(5) finished without error

The end result is that Trio has successfully cancelled exactly the work that was happening within the scope that was cancelled.

Looking at this, you might wonder how you can tell whether the inner block timed out – perhaps you want to do something different, like try a fallback procedure or report a failure to our caller. To make this easier, move_on_after()´s __enter__ function returns an object representing this cancel scope, which we can use to check whether this scope caught a Cancelled exception:

with trio.move_on_after(5) as cancel_scope:
    await trio.sleep(10)
print(cancel_scope.cancelled_caught)  # prints "True"

The cancel_scope object also allows you to check or adjust this scope’s deadline, explicitly trigger a cancellation without waiting for the deadline, check if the scope has already been cancelled, and so forth – see CancelScope below for the full details.

Cancellations in Trio are “level triggered”, meaning that once a block has been cancelled, all cancellable operations in that block will keep raising Cancelled. This helps avoid some pitfalls around resource clean-up. For example, imagine that we have a function that connects to a remote server and sends some messages, and then cleans up on the way out:

with trio.move_on_after(TIMEOUT):
    conn = make_connection()
    try:
        await conn.send_hello_msg()
    finally:
        await conn.send_goodbye_msg()

Now suppose that the remote server stops responding, so our call to await conn.send_hello_msg() hangs forever. Fortunately, we were clever enough to put a timeout around this code, so eventually the timeout will expire and send_hello_msg will raise Cancelled. But then, in the finally block, we make another blocking operation, which will also hang forever! At this point, if we were using asyncio or another library with “edge-triggered” cancellation, we’d be in trouble: since our timeout already fired, it wouldn’t fire again, and at this point our application would lock up forever. But in Trio, this doesn’t happen: the await conn.send_goodbye_msg() call is still inside the cancelled block, so it will also raise Cancelled.

Of course, if you really want to make another blocking call in your cleanup handler, Trio will let you; it’s trying to prevent you from accidentally shooting yourself in the foot. Intentional foot-shooting is no problem (or at least – it’s not Trio’s problem). To do this, create a new scope, and set its shield attribute to True:

with trio.move_on_after(TIMEOUT):
    conn = make_connection()
    try:
        await conn.send_hello_msg()
    finally:
        with trio.move_on_after(CLEANUP_TIMEOUT) as cleanup_scope:
            cleanup_scope.shield = True
            await conn.send_goodbye_msg()

So long as you’re inside a scope with shield = True set, then you’ll be protected from outside cancellations. Note though that this only applies to outside cancellations: if CLEANUP_TIMEOUT expires then await conn.send_goodbye_msg() will still be cancelled, and if await conn.send_goodbye_msg() call uses any timeouts internally, then those will continue to work normally as well. This is a pretty advanced feature that most people probably won’t use, but it’s there for the rare cases where you need it.

Cancellation and primitive operations

We’ve talked a lot about what happens when an operation is cancelled, and how you need to be prepared for this whenever calling a cancellable operation… but we haven’t gone into the details about which operations are cancellable, and how exactly they behave when they’re cancelled.

Here’s the rule: if it’s in the trio namespace, and you use await to call it, then it’s cancellable (see Checkpoints above). Cancellable means:

  • If you try to call it when inside a cancelled scope, then it will raise Cancelled.

  • If it blocks, and while it’s blocked then one of the scopes around it becomes cancelled, it will return early and raise Cancelled.

  • Raising Cancelled means that the operation did not happen. If a Trio socket’s send method raises Cancelled, then no data was sent. If a Trio socket’s recv method raises Cancelled then no data was lost – it’s still sitting in the socket receive buffer waiting for you to call recv again. And so forth.

There are a few idiosyncratic cases where external constraints make it impossible to fully implement these semantics. These are always documented. There is also one systematic exception:

  • Async cleanup operations – like __aexit__ methods or async close methods – are cancellable just like anything else except that if they are cancelled, they still perform a minimum level of cleanup before raising Cancelled.

For example, closing a TLS-wrapped socket normally involves sending a notification to the remote peer, so that they can be cryptographically assured that you really meant to close the socket, and your connection wasn’t just broken by a man-in-the-middle attacker. But handling this robustly is a bit tricky. Remember our example above where the blocking send_goodbye_msg caused problems? That’s exactly how closing a TLS socket works: if the remote peer has disappeared, then our code may never be able to actually send our shutdown notification, and it would be nice if it didn’t block forever trying. Therefore, the method for closing a TLS-wrapped socket will try to send that notification – and if it gets cancelled, then it will give up on sending the message, but will still close the underlying socket before raising Cancelled, so at least you don’t leak that resource.

Cancellation API details

move_on_after() and all the other cancellation facilities provided by Trio are ultimately implemented in terms of CancelScope objects.

class trio.CancelScope(*, deadline: float = inf, shield: bool = False)

A cancellation scope: the link between a unit of cancellable work and Trio’s cancellation system.

A CancelScope becomes associated with some cancellable work when it is used as a context manager surrounding that work:

cancel_scope = trio.CancelScope()
...
with cancel_scope:
    await long_running_operation()

Inside the with block, a cancellation of cancel_scope (via a call to its cancel() method or via the expiry of its deadline) will immediately interrupt the long_running_operation() by raising Cancelled at its next checkpoint.

The context manager __enter__ returns the CancelScope object itself, so you can also write with trio.CancelScope() as cancel_scope:.

If a cancel scope becomes cancelled before entering its with block, the Cancelled exception will be raised at the first checkpoint inside the with block. This allows a CancelScope to be created in one task and passed to another, so that the first task can later cancel some work inside the second.

Cancel scopes are not reusable or reentrant; that is, each cancel scope can be used for at most one with block. (You’ll get a RuntimeError if you violate this rule.)

The CancelScope constructor takes initial values for the cancel scope’s deadline and shield attributes; these may be freely modified after construction, whether or not the scope has been entered yet, and changes take immediate effect.

deadline

Read-write, float. An absolute time on the current run’s clock at which this scope will automatically become cancelled. You can adjust the deadline by modifying this attribute, e.g.:

# I need a little more time!
cancel_scope.deadline += 30

Note that for efficiency, the core run loop only checks for expired deadlines every once in a while. This means that in certain cases there may be a short delay between when the clock says the deadline should have expired, and when checkpoints start raising Cancelled. This is a very obscure corner case that you’re unlikely to notice, but we document it for completeness. (If this does cause problems for you, of course, then we want to know!)

Defaults to math.inf, which means “no deadline”, though this can be overridden by the deadline= argument to the CancelScope constructor.

shield

Read-write, bool, default False. So long as this is set to True, then the code inside this scope will not receive Cancelled exceptions from scopes that are outside this scope. They can still receive Cancelled exceptions from (1) this scope, or (2) scopes inside this scope. You can modify this attribute:

with trio.CancelScope() as cancel_scope:
    cancel_scope.shield = True
    # This cannot be interrupted by any means short of
    # killing the process:
    await sleep(10)

    cancel_scope.shield = False
    # Now this can be cancelled normally:
    await sleep(10)

Defaults to False, though this can be overridden by the shield= argument to the CancelScope constructor.

cancel()

Cancels this scope immediately.

This method is idempotent, i.e., if the scope was already cancelled then this method silently does nothing.

cancelled_caught

Readonly bool. Records whether this scope caught a Cancelled exception. This requires two things: (1) the with block exited with a Cancelled exception, and (2) this scope is the one that was responsible for triggering this Cancelled exception.

cancel_called

Readonly bool. Records whether cancellation has been requested for this scope, either by an explicit call to cancel() or by the deadline expiring.

This attribute being True does not necessarily mean that the code within the scope has been, or will be, affected by the cancellation. For example, if cancel() was called after the last checkpoint in the with block, when it’s too late to deliver a Cancelled exception, then this attribute will still be True.

This attribute is mostly useful for debugging and introspection. If you want to know whether or not a chunk of code was actually cancelled, then cancelled_caught is usually more appropriate.

Often there is no need to create CancelScope object. Trio already includes cancel_scope attribute in a task-related Nursery object. We will cover nurseries later in the manual.

Trio also provides several convenience functions for the common situation of just wanting to impose a timeout on some code:

with trio.move_on_after(seconds: float) CancelScope as cancel_scope

Use as a context manager to create a cancel scope whose deadline is set to now + seconds.

Parameters:

seconds (float) – The timeout.

Raises:

ValueError – if timeout is less than zero or NaN.

with trio.move_on_at(deadline: float) CancelScope as cancel_scope

Use as a context manager to create a cancel scope with the given absolute deadline.

Parameters:

deadline (float) – The deadline.

Raises:

ValueError – if deadline is NaN.

with trio.fail_after(seconds: float) AbstractContextManager[CancelScope] as cancel_scope

Creates a cancel scope with the given timeout, and raises an error if it is actually cancelled.

This function and move_on_after() are similar in that both create a cancel scope with a given timeout, and if the timeout expires then both will cause Cancelled to be raised within the scope. The difference is that when the Cancelled exception reaches move_on_after(), it’s caught and discarded. When it reaches fail_after(), then it’s caught and TooSlowError is raised in its place.

Parameters:

seconds (float) – The timeout.

Raises:
with trio.fail_at(deadline: float) AbstractContextManager[CancelScope] as cancel_scope

Creates a cancel scope with the given deadline, and raises an error if it is actually cancelled.

This function and move_on_at() are similar in that both create a cancel scope with a given absolute deadline, and if the deadline expires then both will cause Cancelled to be raised within the scope. The difference is that when the Cancelled exception reaches move_on_at(), it’s caught and discarded. When it reaches fail_at(), then it’s caught and TooSlowError is raised in its place.

Parameters:

deadline (float) – The deadline.

Raises:

Cheat sheet:

  • If you want to impose a timeout on a function, but you don’t care whether it timed out or not:

    with trio.move_on_after(TIMEOUT):
        await do_whatever()
    # carry on!
    
  • If you want to impose a timeout on a function, and then do some recovery if it timed out:

    with trio.move_on_after(TIMEOUT) as cancel_scope:
        await do_whatever()
    if cancel_scope.cancelled_caught:
        # The operation timed out, try something else
        try_to_recover()
    
  • If you want to impose a timeout on a function, and then if it times out then just give up and raise an error for your caller to deal with:

    with trio.fail_after(TIMEOUT):
        await do_whatever()
    

It’s also possible to check what the current effective deadline is, which is sometimes useful:

trio.current_effective_deadline() float

Returns the current effective deadline for the current task.

This function examines all the cancellation scopes that are currently in effect (taking into account shielding), and returns the deadline that will expire first.

One example of where this might be is useful is if your code is trying to decide whether to begin an expensive operation like an RPC call, but wants to skip it if it knows that it can’t possibly complete in the available time. Another example would be if you’re using a protocol like gRPC that propagates timeout information to the remote peer; this function gives a way to fetch that information so you can send it along.

If this is called in a context where a cancellation is currently active (i.e., a blocking call will immediately raise Cancelled), then returned deadline is -inf. If it is called in a context where no scopes have a deadline set, it returns inf.

Returns:

the effective deadline, as an absolute time.

Return type:

float

Tasks let you do multiple things at once

One of Trio’s core design principles is: no implicit concurrency. Every function executes in a straightforward, top-to-bottom manner, finishing each operation before moving on to the next – like Guido intended.

But, of course, the entire point of an async library is to let you do multiple things at once. The one and only way to do that in Trio is through the task spawning interface. So if you want your program to walk and chew gum, this is the section for you.

Nurseries and spawning

Most libraries for concurrent programming let you start new child tasks (or threads, or whatever) willy-nilly, whenever and where-ever you feel like it. Trio is a bit different: you can’t start a child task unless you’re prepared to be a responsible parent. The way you demonstrate your responsibility is by creating a nursery:

async with trio.open_nursery() as nursery:
    ...

And once you have a reference to a nursery object, you can start children in that nursery:

async def child():
    ...

async def parent():
    async with trio.open_nursery() as nursery:
        # Make two concurrent calls to child()
        nursery.start_soon(child)
        nursery.start_soon(child)

This means that tasks form a tree: when you call run(), then this creates an initial task, and all your other tasks will be children, grandchildren, etc. of the initial task.

Essentially, the body of the async with block acts like an initial task that’s running inside the nursery, and then each call to nursery.start_soon adds another task that runs in parallel. Two crucial things to keep in mind:

  • If any task inside the nursery finishes with an unhandled exception, then the nursery immediately cancels all the tasks inside the nursery.

  • Since all of the tasks are running concurrently inside the async with block, the block does not exit until all tasks have completed. If you’ve used other concurrency frameworks, then you can think of it as, the de-indentation at the end of the async with automatically “joins” (waits for) all of the tasks in the nursery.

  • Once all the tasks have finished, then:

    • The nursery is marked as “closed”, meaning that no new tasks can be started inside it.

    • Any unhandled exceptions are re-raised inside the parent task. If there are multiple exceptions, then they’re collected up into a single BaseExceptionGroup or ExceptionGroup exception.

Since all tasks are descendents of the initial task, one consequence of this is that run() can’t finish until all tasks have finished.

Note

A return statement will not cancel the nursery if it still has tasks running:

async def main():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(trio.sleep, 5)
        return

trio.run(main)

This code will wait 5 seconds (for the child task to finish), and then return.

Child tasks and cancellation

In Trio, child tasks inherit the parent nursery’s cancel scopes. So in this example, both the child tasks will be cancelled when the timeout expires:

with trio.move_on_after(TIMEOUT):
    async with trio.open_nursery() as nursery:
        nursery.start_soon(child1)
        nursery.start_soon(child2)

Note that what matters here is the scopes that were active when open_nursery() was called, not the scopes active when start_soon is called. So for example, the timeout block below does nothing at all:

async with trio.open_nursery() as nursery:
    with trio.move_on_after(TIMEOUT):  # don't do this!
        nursery.start_soon(child)

Why is this so? Well, start_soon() returns as soon as it has scheduled the new task to start running. The flow of execution in the parent then continues on to exit the with trio.move_on_after(TIMEOUT): block, at which point Trio forgets about the timeout entirely. In order for the timeout to apply to the child task, Trio must be able to tell that its associated cancel scope will stay open for at least as long as the child task is executing. And Trio can only know that for sure if the cancel scope block is outside the nursery block.

You might wonder why Trio can’t just remember “this task should be cancelled in TIMEOUT seconds”, even after the with trio.move_on_after(TIMEOUT): block is gone. The reason has to do with how cancellation is implemented. Recall that cancellation is represented by a Cancelled exception, which eventually needs to be caught by the cancel scope that caused it. (Otherwise, the exception would take down your whole program!) In order to be able to cancel the child tasks, the cancel scope has to be able to “see” the Cancelled exceptions that they raise – and those exceptions come out of the async with open_nursery() block, not out of the call to start_soon().

If you want a timeout to apply to one task but not another, then you need to put the cancel scope in that individual task’s function – child(), in this example.

Errors in multiple child tasks

Normally, in Python, only one thing happens at a time, which means that only one thing can go wrong at a time. Trio has no such limitation. Consider code like:

async def broken1():
    d = {}
    return d["missing"]

async def broken2():
    seq = range(10)
    return seq[20]

async def parent():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(broken1)
        nursery.start_soon(broken2)

broken1 raises KeyError. broken2 raises IndexError. Obviously parent should raise some error, but what? The answer is that both exceptions are grouped in an ExceptionGroup. ExceptionGroup and its parent class BaseExceptionGroup are used to encapsulate multiple exceptions being raised at once.

To catch individual exceptions encapsulated in an exception group, the except* clause was introduced in Python 3.11 (PEP 654). Here’s how it works:

try:
    async with trio.open_nursery() as nursery:
        nursery.start_soon(broken1)
        nursery.start_soon(broken2)
except* KeyError as excgroup:
    for exc in excgroup.exceptions:
        ...  # handle each KeyError
except* IndexError as excgroup:
    for exc in excgroup.exceptions:
        ...  # handle each IndexError

If you want to reraise exceptions, or raise new ones, you can do so, but be aware that exceptions raised in except* sections will be raised together in a new exception group.

But what if you can’t use except* just yet? Well, for that there is the handy exceptiongroup library which lets you approximate this behavior with exception handler callbacks:

from exceptiongroup import catch

def handle_keyerrors(excgroup):
    for exc in excgroup.exceptions:
        ...  # handle each KeyError

def handle_indexerrors(excgroup):
    for exc in excgroup.exceptions:
        ...  # handle each IndexError

with catch({
    KeyError: handle_keyerrors,
    IndexError: handle_indexerrors
}):
    async with trio.open_nursery() as nursery:
        nursery.start_soon(broken1)
        nursery.start_soon(broken2)

The semantics for the handler functions are equal to except* blocks, except for setting local variables. If you need to set local variables, you need to declare them inside the handler function(s) with the nonlocal keyword:

def handle_keyerrors(excgroup):
    nonlocal myflag
    myflag = True

myflag = False
with catch({KeyError: handle_keyerrors}):
    async with trio.open_nursery() as nursery:
        nursery.start_soon(broken1)

“Strict” versus “loose” ExceptionGroup semantics

Ideally, in some abstract sense we’d want everything that can raise an ExceptionGroup to always raise an ExceptionGroup (rather than, say, a single ValueError). Otherwise, it would be easy to accidentally write something like except ValueError: (not except*), which works if a single exception is raised but fails to catch _anything_ in the case of multiple simultaneous exceptions (even if one of them is a ValueError). However, this is not how Trio worked in the past: as a concession to practicality when the except* syntax hadn’t been dreamed up yet, the old trio.MultiError was raised only when at least two exceptions occurred simultaneously. Adding a layer of ExceptionGroup around every nursery, while theoretically appealing, would probably break a lot of existing code in practice.

Therefore, we’ve chosen to gate the newer, “stricter” behavior behind a parameter called strict_exception_groups. This is accepted as a parameter to open_nursery(), to set the behavior for that nursery, and to trio.run(), to set the default behavior for any nursery in your program that doesn’t override it.

  • With strict_exception_groups=True, the exception(s) coming out of a nursery will always be wrapped in an ExceptionGroup, so you’ll know that if you’re handling single errors correctly, multiple simultaneous errors will work as well.

  • With strict_exception_groups=False, a nursery in which only one task has failed will raise that task’s exception without an additional layer of ExceptionGroup wrapping, so you’ll get maximum compatibility with code that was written to support older versions of Trio.

The default is set to strict_exception_groups=True, in line with the default behaviour of TaskGroup in asyncio and anyio. We’ve also found that non-strict mode makes it too easy to neglect the possibility of several exceptions being raised concurrently, causing nasty latent bugs when errors occur under load.

Spawning tasks without becoming a parent

Sometimes it doesn’t make sense for the task that starts a child to take on responsibility for watching it. For example, a server task may want to start a new task for each connection, but it can’t listen for connections and supervise children at the same time.

The solution here is simple once you see it: there’s no requirement that a nursery object stay in the task that created it! We can write code like this:

async def new_connection_listener(handler, nursery):
    while True:
        conn = await get_new_connection()
        nursery.start_soon(handler, conn)

async def server(handler):
    async with trio.open_nursery() as nursery:
        nursery.start_soon(new_connection_listener, handler, nursery)

Notice that server opens a nursery and passes it to new_connection_listener, and then new_connection_listener is able to start new tasks as “siblings” of itself. Of course, in this case, we could just as well have written:

async def server(handler):
    async with trio.open_nursery() as nursery:
        while True:
            conn = await get_new_connection()
            nursery.start_soon(handler, conn)

...but sometimes things aren’t so simple, and this trick comes in handy.

One thing to remember, though: cancel scopes are inherited from the nursery, not from the task that calls start_soon. So in this example, the timeout does not apply to child (or to anything else):

async def do_spawn(nursery):
    with trio.move_on_after(TIMEOUT):  # don't do this, it has no effect
        nursery.start_soon(child)

async with trio.open_nursery() as nursery:
    nursery.start_soon(do_spawn, nursery)

Custom supervisors

The default cleanup logic is often sufficient for simple cases, but what if you want a more sophisticated supervisor? For example, maybe you have Erlang envy and want features like automatic restart of crashed tasks. Trio itself doesn’t provide these kinds of features, but you can build them on top; Trio’s goal is to enforce basic hygiene and then get out of your way. (Specifically: Trio won’t let you build a supervisor that exits and leaves orphaned tasks behind, and if you have an unhandled exception due to bugs or laziness then Trio will make sure they propagate.) And then you can wrap your fancy supervisor up in a library and put it on PyPI, because supervisors are tricky and there’s no reason everyone should have to write their own.

For example, here’s a function that takes a list of functions, runs them all concurrently, and returns the result from the one that finishes first:

async def race(*async_fns):
    if not async_fns:
        raise ValueError("must pass at least one argument")

    winner = None

    async def jockey(async_fn, cancel_scope):
        nonlocal winner
        winner = await async_fn()
        cancel_scope.cancel()

    async with trio.open_nursery() as nursery:
        for async_fn in async_fns:
            nursery.start_soon(jockey, async_fn, nursery.cancel_scope)

    return winner

This works by starting a set of tasks which each try to run their function. As soon as the first function completes its execution, the task will set the nonlocal variable winner from the outer scope to the result of the function, and cancel the other tasks using the passed in cancel scope. Once all tasks have been cancelled (which exits the nursery block), the variable winner will be returned.

Here if one or more of the racing functions raises an unhandled exception then Trio’s normal handling kicks in: it cancels the others and then propagates the exception. If you want different behavior, you can get that by adding a try block to the jockey function to catch exceptions and handle them however you like.

Task-local storage

Suppose you’re writing a server that responds to network requests, and you log some information about each request as you process it. If the server is busy and there are multiple requests being handled at the same time, then you might end up with logs like this:

Request handler started
Request handler started
Request handler finished
Request handler finished

In this log, it’s hard to know which lines came from which request. (Did the request that started first also finish first, or not?) One way to solve this is to assign each request a unique identifier, and then include this identifier in each log message:

request 1: Request handler started
request 2: Request handler started
request 2: Request handler finished
request 1: Request handler finished

This way we can see that request 1 was slow: it started before request 2 but finished afterwards. (You can also get much fancier, but this is enough for an example.)

Now, here’s the problem: how does the logging code know what the request identifier is? One approach would be to explicitly pass it around to every function that might want to emit logs… but that’s basically every function, because you never know when you might need to add a log.debug(...) call to some utility function buried deep in the call stack, and when you’re in the middle of a debugging a nasty problem that last thing you want is to have to stop first and refactor everything to pass through the request identifier! Sometimes this is the right solution, but other times it would be much more convenient if we could store the identifier in a global variable, so that the logging function could look it up whenever it needed it. Except… a global variable can only have one value at a time, so if we have multiple handlers running at once then this isn’t going to work. What we need is something that’s like a global variable, but that can have different values depending on which request handler is accessing it.

To solve this problem, Python has a module in the standard library: contextvars.

Here’s a toy example demonstrating how to use contextvars:

import random
import trio
import contextvars

request_info = contextvars.ContextVar("request_info")


# Example logging function that tags each line with the request identifier.
def log(msg):
    # Read from task-local storage:
    request_tag = request_info.get()

    print(f"request {request_tag}: {msg}")


# An example "request handler" that does some work itself and also
# spawns some helper tasks to do some concurrent work.
async def handle_request(tag):
    # Write to task-local storage:
    request_info.set(tag)

    log("Request handler started")
    await trio.sleep(random.random())
    async with trio.open_nursery() as nursery:
        nursery.start_soon(concurrent_helper, "a")
        nursery.start_soon(concurrent_helper, "b")
    await trio.sleep(random.random())
    log("Request received finished")


async def concurrent_helper(job):
    log(f"Helper task {job} started")
    await trio.sleep(random.random())
    log(f"Helper task {job} finished")


# Spawn several "request handlers" simultaneously, to simulate a
# busy server handling multiple requests at the same time.
async def main():
    async with trio.open_nursery() as nursery:
        for i in range(3):
            nursery.start_soon(handle_request, i)


trio.run(main)

Example output (yours may differ slightly):

request 1: Request handler started
request 2: Request handler started
request 0: Request handler started
request 2: Helper task a started
request 2: Helper task b started
request 1: Helper task a started
request 1: Helper task b started
request 0: Helper task b started
request 0: Helper task a started
request 2: Helper task b finished
request 2: Helper task a finished
request 2: Request received finished
request 0: Helper task a finished
request 1: Helper task a finished
request 1: Helper task b finished
request 1: Request received finished
request 0: Helper task b finished
request 0: Request received finished

For more information, read the contextvars docs.

Synchronizing and communicating between tasks

Trio provides a standard set of synchronization and inter-task communication primitives. These objects’ APIs are generally modelled off of the analogous classes in the standard library, but with some differences.

Blocking and non-blocking methods

The standard library synchronization primitives have a variety of mechanisms for specifying timeouts and blocking behavior, and of signaling whether an operation returned due to success versus a timeout.

In Trio, we standardize on the following conventions:

  • We don’t provide timeout arguments. If you want a timeout, then use a cancel scope.

  • For operations that have a non-blocking variant, the blocking and non-blocking variants are different methods with names like X and X_nowait, respectively. (This is similar to queue.Queue, but unlike most of the classes in threading.) We like this approach because it allows us to make the blocking version async and the non-blocking version sync.

  • When a non-blocking method cannot succeed (the channel is empty, the lock is already held, etc.), then it raises trio.WouldBlock. There’s no equivalent to the queue.Empty versus queue.Full distinction – we just have the one exception that we use consistently.

Fairness

These classes are all guaranteed to be “fair”, meaning that when it comes time to choose who will be next to acquire a lock, get an item from a queue, etc., then it always goes to the task which has been waiting longest. It’s not entirely clear whether this is the best choice, but for now that’s how it works.

As an example of what this means, here’s a small program in which two tasks compete for a lock. Notice that the task which releases the lock always immediately attempts to re-acquire it, before the other task has a chance to run. (And remember that we’re doing cooperative multi-tasking here, so it’s actually deterministic that the task releasing the lock will call acquire() before the other task wakes up; in Trio releasing a lock is not a checkpoint.) With an unfair lock, this would result in the same task holding the lock forever and the other task being starved out. But if you run this, you’ll see that the two tasks politely take turns:

# fairness-demo.py

import trio

async def loopy_child(number, lock):
    while True:
        async with lock:
            print(f"Child {number} has the lock!")
            await trio.sleep(0.5)

async def main():
    async with trio.open_nursery() as nursery:
        lock = trio.Lock()
        nursery.start_soon(loopy_child, 1, lock)
        nursery.start_soon(loopy_child, 2, lock)

trio.run(main)

Broadcasting an event with Event

class trio.Event

A waitable boolean value useful for inter-task synchronization, inspired by threading.Event.

An event object has an internal boolean flag, representing whether the event has happened yet. The flag is initially False, and the wait() method waits until the flag is True. If the flag is already True, then wait() returns immediately. (If the event has already happened, there’s nothing to wait for.) The set() method sets the flag to True, and wakes up any waiters.

This behavior is useful because it helps avoid race conditions and lost wakeups: it doesn’t matter whether set() gets called just before or after wait(). If you want a lower-level wakeup primitive that doesn’t have this protection, consider Condition or trio.lowlevel.ParkingLot.

Note

Unlike threading.Event, trio.Event has no clear method. In Trio, once an Event has happened, it cannot un-happen. If you need to represent a series of events, consider creating a new Event object for each one (they’re cheap!), or other synchronization methods like channels or trio.lowlevel.ParkingLot.

is_set() bool

Return the current value of the internal flag.

set() None

Set the internal flag value to True, and wake any waiting tasks.

statistics() EventStatistics

Return an object containing debugging information.

Currently the following fields are defined:

  • tasks_waiting: The number of tasks blocked on this event’s wait() method.

await wait() None

Block until the internal flag value becomes True.

If it’s already True, then this method returns immediately.

class trio.EventStatistics(tasks_waiting: int)

An object containing debugging information.

Currently the following fields are defined:

  • tasks_waiting: The number of tasks blocked on this event’s trio.Event.wait() method.

Using channels to pass values between tasks

Channels allow you to safely and conveniently send objects between different tasks. They’re particularly useful for implementing producer/consumer patterns.

The core channel API is defined by the abstract base classes trio.abc.SendChannel and trio.abc.ReceiveChannel. You can use these to implement your own custom channels, that do things like pass objects between processes or over the network. But in many cases, you just want to pass objects between different tasks inside a single process, and for that you can use trio.open_memory_channel():

trio.open_memory_channel(max_buffer_size)

Open a channel for passing objects between tasks within a process.

Memory channels are lightweight, cheap to allocate, and entirely in-memory. They don’t involve any operating-system resources, or any kind of serialization. They just pass Python objects directly between tasks (with a possible stop in an internal buffer along the way).

Channel objects can be closed by calling aclose or using async with. They are not automatically closed when garbage collected. Closing memory channels isn’t mandatory, but it is generally a good idea, because it helps avoid situations where tasks get stuck waiting on a channel when there’s no-one on the other side. See Clean shutdown with channels for details.

Memory channel operations are all atomic with respect to cancellation, either receive will successfully return an object, or it will raise Cancelled while leaving the channel unchanged.

Parameters:

max_buffer_size (int or math.inf) – The maximum number of items that can be buffered in the channel before send() blocks. Choosing a sensible value here is important to ensure that backpressure is communicated promptly and avoid unnecessary latency; see Buffering in channels for more details. If in doubt, use 0.

Returns:

A pair (send_channel, receive_channel). If you have trouble remembering which order these go in, remember: data flows from left → right.

In addition to the standard channel methods, all memory channel objects provide a statistics() method, which returns an object with the following fields:

  • current_buffer_used: The number of items currently stored in the channel buffer.

  • max_buffer_size: The maximum number of items allowed in the buffer, as passed to open_memory_channel().

  • open_send_channels: The number of open MemorySendChannel endpoints pointing to this channel. Initially 1, but can be increased by MemorySendChannel.clone().

  • open_receive_channels: Likewise, but for open MemoryReceiveChannel endpoints.

  • tasks_waiting_send: The number of tasks blocked in send on this channel (summing over all clones).

  • tasks_waiting_receive: The number of tasks blocked in receive on this channel (summing over all clones).

Note

If you’ve used the threading or asyncio modules, you may be familiar with queue.Queue or asyncio.Queue. In Trio, open_memory_channel() is what you use when you’re looking for a queue. The main difference is that Trio splits the classic queue interface up into two objects. The advantage of this is that it makes it possible to put the two ends in different processes without rewriting your code, and that we can close the two sides separately.

MemorySendChannel and MemoryReceiveChannel also expose several more features beyond the core channel interface:

class trio.MemorySendChannel(*args: object, **kwargs: object)
clone() MemorySendChannel[SendType]

Clone this send channel object.

This returns a new MemorySendChannel object, which acts as a duplicate of the original: sending on the new object does exactly the same thing as sending on the old object. (If you’re familiar with os.dup, then this is a similar idea.)

However, closing one of the objects does not close the other, and receivers don’t get EndOfChannel until all clones have been closed.

This is useful for communication patterns that involve multiple producers all sending objects to the same destination. If you give each producer its own clone of the MemorySendChannel, and then make sure to close each MemorySendChannel when it’s finished, receivers will automatically get notified when all producers are finished. See Managing multiple producers and/or multiple consumers for examples.

Raises:

trio.ClosedResourceError – if you already closed this MemorySendChannel object.

close() None

Close this send channel object synchronously.

All channel objects have an asynchronous aclose method. Memory channels can also be closed synchronously. This has the same effect on the channel and other tasks using it, but close is not a trio checkpoint. This simplifies cleaning up in cancelled tasks.

Using with send_channel: will close the channel object on leaving the with block.

await send(value: SendType) None

See SendChannel.send.

Memory channels allow multiple tasks to call send at the same time.

send_nowait(value: SendType) None

Like send, but if the channel’s buffer is full, raises WouldBlock instead of blocking.

class trio.MemoryReceiveChannel(*args: object, **kwargs: object)
clone() MemoryReceiveChannel[ReceiveType]

Clone this receive channel object.

This returns a new MemoryReceiveChannel object, which acts as a duplicate of the original: receiving on the new object does exactly the same thing as receiving on the old object.

However, closing one of the objects does not close the other, and the underlying channel is not closed until all clones are closed. (If you’re familiar with os.dup, then this is a similar idea.)

This is useful for communication patterns that involve multiple consumers all receiving objects from the same underlying channel. See Managing multiple producers and/or multiple consumers for examples.

Warning

The clones all share the same underlying channel. Whenever a clone receive()s a value, it is removed from the channel and the other clones do not receive that value. If you want to send multiple copies of the same stream of values to multiple destinations, like itertools.tee(), then you need to find some other solution; this method does not do that.

Raises:

trio.ClosedResourceError – if you already closed this MemoryReceiveChannel object.

close() None

Close this receive channel object synchronously.

All channel objects have an asynchronous aclose method. Memory channels can also be closed synchronously. This has the same effect on the channel and other tasks using it, but close is not a trio checkpoint. This simplifies cleaning up in cancelled tasks.

Using with receive_channel: will close the channel object on leaving the with block.

await receive() ReceiveType

See ReceiveChannel.receive.

Memory channels allow multiple tasks to call receive at the same time. The first task will get the first item sent, the second task will get the second item sent, and so on.

receive_nowait() ReceiveType

Like receive, but if there’s nothing ready to receive, raises WouldBlock instead of blocking.

A simple channel example

Here’s a simple example of how to use memory channels:

import trio


async def main():
    async with trio.open_nursery() as nursery:
        # Open a channel:
        send_channel, receive_channel = trio.open_memory_channel(0)
        # Start a producer and a consumer, passing one end of the channel to
        # each of them:
        nursery.start_soon(producer, send_channel)
        nursery.start_soon(consumer, receive_channel)


async def producer(send_channel):
    # Producer sends 3 messages
    for i in range(3):
        # The producer sends using 'await send_channel.send(...)'
        await send_channel.send(f"message {i}")


async def consumer(receive_channel):
    # The consumer uses an 'async for' loop to receive the values:
    async for value in receive_channel:
        print(f"got value {value!r}")


trio.run(main)

If you run this, it prints:

got value "message 0"
got value "message 1"
got value "message 2"

And then it hangs forever. (Use control-C to quit.)

Clean shutdown with channels

Of course we don’t generally like it when programs hang. What happened? The problem is that the producer sent 3 messages and then exited, but the consumer has no way to tell that the producer is gone: for all it knows, another message might be coming along any moment. So it hangs forever waiting for the 4th message.

Here’s a new version that fixes this: it produces the same output as the previous version, and then exits cleanly. The only change is the addition of async with blocks inside the producer and consumer:

import trio


async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        nursery.start_soon(producer, send_channel)
        nursery.start_soon(consumer, receive_channel)


async def producer(send_channel):
    async with send_channel:
        for i in range(3):
            await send_channel.send(f"message {i}")


async def consumer(receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            print(f"got value {value!r}")


trio.run(main)

The really important thing here is the producer’s async with . When the producer exits, this closes the send_channel, and that tells the consumer that no more messages are coming, so it can cleanly exit its async for loop. Then the program shuts down because both tasks have exited.

We also added an async with to the consumer. This isn’t as important, but it can help us catch mistakes or other problems. For example, suppose that the consumer exited early for some reason – maybe because of a bug. Then the producer would be sending messages into the void, and might get stuck indefinitely. But, if the consumer closes its receive_channel, then the producer will get a BrokenResourceError to alert it that it should stop sending messages because no-one is listening.

If you want to see the effect of the consumer exiting early, try adding a break statement to the async for loop – you should see a BrokenResourceError from the producer.

Managing multiple producers and/or multiple consumers

You can also have multiple producers, and multiple consumers, all sharing the same channel. However, this makes shutdown a little more complicated.

For example, consider this naive extension of our previous example, now with two producers and two consumers:

# This example usually crashes!

import trio
import random


async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        # Start two producers
        nursery.start_soon(producer, "A", send_channel)
        nursery.start_soon(producer, "B", send_channel)
        # And two consumers
        nursery.start_soon(consumer, "X", receive_channel)
        nursery.start_soon(consumer, "Y", receive_channel)


async def producer(name, send_channel):
    async with send_channel:
        for i in range(3):
            await send_channel.send(f"{i} from producer {name}")
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())


async def consumer(name, receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            print(f"consumer {name} got value {value!r}")
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())


trio.run(main)

The two producers, A and B, send 3 messages apiece. These are then randomly distributed between the two consumers, X and Y. So we’re hoping to see some output like:

consumer Y got value '0 from producer B'
consumer X got value '0 from producer A'
consumer Y got value '1 from producer A'
consumer Y got value '1 from producer B'
consumer X got value '2 from producer B'
consumer X got value '2 from producer A'

However, on most runs, that’s not what happens – the first part of the output is OK, and then when we get to the end the program crashes with ClosedResourceError. If you run the program a few times, you’ll see that sometimes the traceback shows send crashing, and other times it shows receive crashing, and you might even find that on some runs it doesn’t crash at all.

Here’s what’s happening: suppose that producer A finishes first. It exits, and its async with block closes the send_channel. But wait! Producer B was still using that send_channel… so the next time B calls send, it gets a ClosedResourceError.

Sometimes, though if we’re lucky, the two producers might finish at the same time (or close enough), so they both make their last send before either of them closes the send_channel.

But, even if that happens, we’re not out of the woods yet! After the producers exit, the two consumers race to be the first to notice that the send_channel has closed. Suppose that X wins the race. It exits its async for loop, then exits the async with block… and closes the receive_channel, while Y is still using it. Again, this causes a crash.

We could avoid this by using some complicated bookkeeping to make sure that only the last producer and the last consumer close their channel endpoints… but that would be tiresome and fragile. Fortunately, there’s a better way! Here’s a fixed version of our program above:

import trio
import random


async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        async with send_channel, receive_channel:
            # Start two producers, giving each its own private clone
            nursery.start_soon(producer, "A", send_channel.clone())
            nursery.start_soon(producer, "B", send_channel.clone())
            # And two consumers, giving each its own private clone
            nursery.start_soon(consumer, "X", receive_channel.clone())
            nursery.start_soon(consumer, "Y", receive_channel.clone())


async def producer(name, send_channel):
    async with send_channel:
        for i in range(3):
            await send_channel.send(f"{i} from producer {name}")
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())


async def consumer(name, receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            print(f"consumer {name} got value {value!r}")
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())


trio.run(main)

This example demonstrates using the MemorySendChannel.clone and MemoryReceiveChannel.clone methods. What these do is create copies of our endpoints, that act just like the original – except that they can be closed independently. And the underlying channel is only closed after all the clones have been closed. So this completely solves our problem with shutdown, and if you run this program, you’ll see it print its six lines of output and then exits cleanly.

Notice a small trick we use: the code in main creates clone objects to pass into all the child tasks, and then closes the original objects using async with. Another option is to pass clones into all-but-one of the child tasks, and then pass the original object into the last task, like:

# Also works, but is more finicky:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, "A", send_channel.clone())
nursery.start_soon(producer, "B", send_channel)
nursery.start_soon(consumer, "X", receive_channel.clone())
nursery.start_soon(consumer, "Y", receive_channel)

But this is more error-prone, especially if you use a loop to spawn the producers/consumers.

Just make sure that you don’t write:

# Broken, will cause program to hang:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, "A", send_channel.clone())
nursery.start_soon(producer, "B", send_channel.clone())
nursery.start_soon(consumer, "X", receive_channel.clone())
nursery.start_soon(consumer, "Y", receive_channel.clone())

Here we pass clones into the tasks, but never close the original objects. That means we have 3 send channel objects (the original + two clones), but we only close 2 of them, so the consumers will hang around forever waiting for that last one to be closed.

Buffering in channels

When you call open_memory_channel(), you have to specify how many values can be buffered internally in the channel. If the buffer is full, then any task that calls send() will stop and wait for another task to call receive(). This is useful because it produces backpressure: if the channel producers are running faster than the consumers, then it forces the producers to slow down.

You can disable buffering entirely, by doing open_memory_channel(0). In that case any task that calls send() will wait until another task calls receive(), and vice versa. This is similar to how channels work in the classic Communicating Sequential Processes model, and is a reasonable default if you aren’t sure what size buffer to use. (That’s why we used it in the examples above.)

At the other extreme, you can make the buffer unbounded by using open_memory_channel(math.inf). In this case, send() always returns immediately. Normally, this is a bad idea. To see why, consider a program where the producer runs more quickly than the consumer:

# Simulate a producer that generates values 10x faster than the
# consumer can handle them.

import trio
import math


async def producer(send_channel):
    count = 0
    while True:
        # Pretend that we have to do some work to create this message, and it
        # takes 0.1 seconds:
        await trio.sleep(0.1)
        await send_channel.send(count)
        print("Sent message:", count)
        count += 1


async def consumer(receive_channel):
    async for value in receive_channel:
        print("Received message:", value)
        # Pretend that we have to do some work to handle this message, and it
        # takes 1 second
        await trio.sleep(1)


async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(producer, send_channel)
        nursery.start_soon(consumer, receive_channel)


trio.run(main)

If you run this program, you’ll see output like:

Sent message: 0
Received message: 0
Sent message: 1
Sent message: 2
Sent message: 3
Sent message: 4
Sent message: 5
Sent message: 6
Sent message: 7
Sent message: 8
Sent message: 9
Received message: 1
Sent message: 10
Sent message: 11
Sent message: 12
...

On average, the producer sends ten messages per second, but the consumer only calls receive once per second. That means that each second, the channel’s internal buffer has to grow to hold an extra nine items. After a minute, the buffer will have ~540 items in it; after an hour, that grows to ~32,400. Eventually, the program will run out of memory. And well before we run out of memory, our latency on handling individual messages will become abysmal. For example, at the one minute mark, the producer is sending message ~600, but the consumer is still processing message ~60. Message 600 will have to sit in the channel for ~9 minutes before the consumer catches up and processes it.

Now try replacing open_memory_channel(math.inf) with open_memory_channel(0), and run it again. We get output like:

Sent message: 0
Received message: 0
Received message: 1
Sent message: 1
Received message: 2
Sent message: 2
Sent message: 3
Received message: 3
...

Now the send calls wait for the receive calls to finish, which forces the producer to slow down to match the consumer’s speed. (It might look strange that some values are reported as “Received” before they’re reported as “Sent”; this happens because the actual send/receive happen at the same time, so which line gets printed first is random.)

Now, let’s try setting a small but nonzero buffer size, like open_memory_channel(3). what do you think will happen?

I get:

Sent message: 0
Received message: 0
Sent message: 1
Sent message: 2
Sent message: 3
Received message: 1
Sent message: 4
Received message: 2
Sent message: 5
...

So you can see that the producer runs ahead by 3 messages, and then stops to wait: when the consumer reads message 1, it sends message 4, then when the consumer reads message 2, it sends message 5, and so on. Once it reaches the steady state, this version acts just like our previous version where we set the buffer size to 0, except that it uses a bit more memory and each message sits in the buffer for a bit longer before being processed (i.e., the message latency is higher).

Of course real producers and consumers are usually more complicated than this, and in some situations, a modest amount of buffering might improve throughput. But too much buffering wastes memory and increases latency, so if you want to tune your application you should experiment to see what value works best for you.

Why do we even support unbounded buffers then? Good question! Despite everything we saw above, there are times when you actually do need an unbounded buffer. For example, consider a web crawler that uses a channel to keep track of all the URLs it still wants to crawl. Each crawler runs a loop where it takes a URL from the channel, fetches it, checks the HTML for outgoing links, and then adds the new URLs to the channel. This creates a circular flow, where each consumer is also a producer. In this case, if your channel buffer gets full, then the crawlers will block when they try to add new URLs to the channel, and if all the crawlers got blocked, then they aren’t taking any URLs out of the channel, so they’re stuck forever in a deadlock. Using an unbounded channel avoids this, because it means that send() never blocks.

Lower-level synchronization primitives

Personally, I find that events and channels are usually enough to implement most things I care about, and lead to easier to read code than the lower-level primitives discussed in this section. But if you need them, they’re here. (If you find yourself reaching for these because you’re trying to implement a new higher-level synchronization primitive, then you might also want to check out the facilities in trio.lowlevel for a more direct exposure of Trio’s underlying synchronization logic. All of classes discussed in this section are implemented on top of the public APIs in trio.lowlevel; they don’t have any special access to Trio’s internals.)

class trio.CapacityLimiter(total_tokens: int | float)

An object for controlling access to a resource with limited capacity.

Sometimes you need to put a limit on how many tasks can do something at the same time. For example, you might want to use some threads to run multiple blocking I/O operations in parallel… but if you use too many threads at once, then your system can become overloaded and it’ll actually make things slower. One popular solution is to impose a policy like “run up to 40 threads at the same time, but no more”. But how do you implement a policy like this?

That’s what CapacityLimiter is for. You can think of a CapacityLimiter object as a sack that starts out holding some fixed number of tokens:

limit = trio.CapacityLimiter(40)

Then tasks can come along and borrow a token out of the sack:

# Borrow a token:
async with limit:
    # We are holding a token!
    await perform_expensive_operation()
# Exiting the 'async with' block puts the token back into the sack

And crucially, if you try to borrow a token but the sack is empty, then you have to wait for another task to finish what it’s doing and put its token back first before you can take it and continue.

Another way to think of it: a CapacityLimiter is like a sofa with a fixed number of seats, and if they’re all taken then you have to wait for someone to get up before you can sit down.

By default, trio.to_thread.run_sync() uses a CapacityLimiter to limit the number of threads running at once; see trio.to_thread.current_default_thread_limiter for details.

If you’re familiar with semaphores, then you can think of this as a restricted semaphore that’s specialized for one common use case, with additional error checking. For a more traditional semaphore, see Semaphore.

Note

Don’t confuse this with the “leaky bucket” or “token bucket” algorithms used to limit bandwidth usage on networks. The basic idea of using tokens to track a resource limit is similar, but this is a very simple sack where tokens aren’t automatically created or destroyed over time; they’re just borrowed and then put back.

await acquire() None

Borrow a token from the sack, blocking if necessary.

Raises:

RuntimeError – if the current task already holds one of this sack’s tokens.

acquire_nowait() None

Borrow a token from the sack, without blocking.

Raises:
  • WouldBlock – if no tokens are available.

  • RuntimeError – if the current task already holds one of this sack’s tokens.

await acquire_on_behalf_of(borrower: Task | object) None

Borrow a token from the sack on behalf of borrower, blocking if necessary.

Parameters:

borrower – A trio.lowlevel.Task or arbitrary opaque object used to record who is borrowing this token; see acquire_on_behalf_of_nowait() for details.

Raises:

RuntimeError – if borrower task already holds one of this sack’s tokens.

acquire_on_behalf_of_nowait(borrower: Task | object) None

Borrow a token from the sack on behalf of borrower, without blocking.

Parameters:

borrower – A trio.lowlevel.Task or arbitrary opaque object used to record who is borrowing this token. This is used by trio.to_thread.run_sync() to allow threads to “hold tokens”, with the intention in the future of using it to allow deadlock detection and other useful things

Raises:
  • WouldBlock – if no tokens are available.

  • RuntimeError – if borrower already holds one of this sack’s tokens.

property available_tokens: int | float

The amount of capacity that’s available to use.

property borrowed_tokens: int

The amount of capacity that’s currently in use.

release() None

Put a token back into the sack.

Raises:

RuntimeError – if the current task has not acquired one of this sack’s tokens.

release_on_behalf_of(borrower: Task | object) None

Put a token back into the sack on behalf of borrower.

Raises:

RuntimeError – if the given borrower has not acquired one of this sack’s tokens.

statistics() CapacityLimiterStatistics

Return an object containing debugging information.

Currently the following fields are defined:

  • borrowed_tokens: The number of tokens currently borrowed from the sack.

  • total_tokens: The total number of tokens in the sack. Usually this will be larger than borrowed_tokens, but it’s possibly for it to be smaller if total_tokens was recently decreased.

  • borrowers: A list of all tasks or other entities that currently hold a token.

  • tasks_waiting: The number of tasks blocked on this CapacityLimiter’s acquire() or acquire_on_behalf_of() methods.

property total_tokens: int | float

The total capacity available.

You can change total_tokens by assigning to this attribute. If you make it larger, then the appropriate number of waiting tasks will be woken immediately to take the new tokens. If you decrease total_tokens below the number of tasks that are currently using the resource, then all current tasks will be allowed to finish as normal, but no new tasks will be allowed in until the total number of tasks drops below the new total_tokens.

class trio.Semaphore(initial_value: int, *, max_value: int | None = None)

A semaphore.

A semaphore holds an integer value, which can be incremented by calling release() and decremented by calling acquire() – but the value is never allowed to drop below zero. If the value is zero, then acquire() will block until someone calls release().

If you’re looking for a Semaphore to limit the number of tasks that can access some resource simultaneously, then consider using a CapacityLimiter instead.

This object’s interface is similar to, but different from, that of threading.Semaphore.

A Semaphore object can be used as an async context manager; it blocks on entry but not on exit.

Parameters:
  • initial_value (int) – A non-negative integer giving semaphore’s initial value.

  • max_value (int or None) – If given, makes this a “bounded” semaphore that raises an error if the value is about to exceed the given max_value.

await acquire() None

Decrement the semaphore value, blocking if necessary to avoid letting it drop below zero.

acquire_nowait() None

Attempt to decrement the semaphore value, without blocking.

Raises:

WouldBlock – if the value is zero.

property max_value: int | None

The maximum allowed value. May be None to indicate no limit.

release() None

Increment the semaphore value, possibly waking a task blocked in acquire().

Raises:

ValueError – if incrementing the value would cause it to exceed max_value.

statistics() ParkingLotStatistics

Return an object containing debugging information.

Currently the following fields are defined:

  • tasks_waiting: The number of tasks blocked on this semaphore’s acquire() method.

property value: int

The current value of the semaphore.

class trio.Lock

A classic mutex.

This is a non-reentrant, single-owner lock. Unlike threading.Lock, only the owner of the lock is allowed to release it.

A Lock object can be used as an async context manager; it blocks on entry but not on exit.

await acquire() None

Acquire the lock, blocking if necessary.

acquire_nowait() None

Attempt to acquire the lock, without blocking.

Raises:

WouldBlock – if the lock is held.

locked() bool

Check whether the lock is currently held.

Returns:

True if the lock is held, False otherwise.

Return type:

bool

release() None

Release the lock.

Raises:

RuntimeError – if the calling task does not hold the lock.

statistics() LockStatistics

Return an object containing debugging information.

Currently the following fields are defined:

  • locked: boolean indicating whether the lock is held.

  • owner: the trio.lowlevel.Task currently holding the lock, or None if the lock is not held.

  • tasks_waiting: The number of tasks blocked on this lock’s acquire() method.

class trio.StrictFIFOLock

A variant of Lock where tasks are guaranteed to acquire the lock in strict first-come-first-served order.

An example of when this is useful is if you’re implementing something like trio.SSLStream or an HTTP/2 server using h2, where you have multiple concurrent tasks that are interacting with a shared state machine, and at unpredictable moments the state machine requests that a chunk of data be sent over the network. (For example, when using h2 simply reading incoming data can occasionally create outgoing data to send.) The challenge is to make sure that these chunks are sent in the correct order, without being garbled.

One option would be to use a regular Lock, and wrap it around every interaction with the state machine:

# This approach is sometimes workable but often sub-optimal; see below
async with lock:
    state_machine.do_something()
    if state_machine.has_data_to_send():
        await conn.sendall(state_machine.get_data_to_send())

But this can be problematic. If you’re using h2 then usually reading incoming data doesn’t create the need to send any data, so we don’t want to force every task that tries to read from the network to sit and wait a potentially long time for sendall to finish. And in some situations this could even potentially cause a deadlock, if the remote peer is waiting for you to read some data before it accepts the data you’re sending.

StrictFIFOLock provides an alternative. We can rewrite our example like:

# Note: no awaits between when we start using the state machine and
# when we block to take the lock!
state_machine.do_something()
if state_machine.has_data_to_send():
    # Notice that we fetch the data to send out of the state machine
    # *before* sleeping, so that other tasks won't see it.
    chunk = state_machine.get_data_to_send()
    async with strict_fifo_lock:
        await conn.sendall(chunk)

First we do all our interaction with the state machine in a single scheduling quantum (notice there are no awaits in there), so it’s automatically atomic with respect to other tasks. And then if and only if we have data to send, we get in line to send it – and StrictFIFOLock guarantees that each task will send its data in the same order that the state machine generated it.

Currently, StrictFIFOLock is identical to Lock, but (a) this may not always be true in the future, especially if Trio ever implements more sophisticated scheduling policies, and (b) the above code is relying on a pretty subtle property of its lock. Using a StrictFIFOLock acts as an executable reminder that you’re relying on this property.

class trio.Condition(lock: Lock | None = None)

A classic condition variable, similar to threading.Condition.

A Condition object can be used as an async context manager to acquire the underlying lock; it blocks on entry but not on exit.

Parameters:

lock (Lock) – the lock object to use. If given, must be a trio.Lock. If None, a new Lock will be allocated and used.

await acquire() None

Acquire the underlying lock, blocking if necessary.

acquire_nowait() None

Attempt to acquire the underlying lock, without blocking.

Raises:

WouldBlock – if the lock is currently held.

locked() bool

Check whether the underlying lock is currently held.

Returns:

True if the lock is held, False otherwise.

Return type:

bool

notify(n: int = 1) None

Wake one or more tasks that are blocked in wait().

Parameters:

n (int) – The number of tasks to wake.

Raises:

RuntimeError – if the calling task does not hold the lock.

notify_all() None

Wake all tasks that are currently blocked in wait().

Raises:

RuntimeError – if the calling task does not hold the lock.

release() None

Release the underlying lock.

statistics() ConditionStatistics

Return an object containing debugging information.

Currently the following fields are defined:

  • tasks_waiting: The number of tasks blocked on this condition’s wait() method.

  • lock_statistics: The result of calling the underlying Locks statistics() method.

await wait() None

Wait for another task to call notify() or notify_all().

When calling this method, you must hold the lock. It releases the lock while waiting, and then re-acquires it before waking up.

There is a subtlety with how this method interacts with cancellation: when cancelled it will block to re-acquire the lock before raising Cancelled. This may cause cancellation to be less prompt than expected. The advantage is that it makes code like this work:

async with condition:
    await condition.wait()

If we didn’t re-acquire the lock before waking up, and wait() were cancelled here, then we’d crash in condition.__aexit__ when we tried to release the lock we no longer held.

Raises:

RuntimeError – if the calling task does not hold the lock.

These primitives return statistics objects that can be inspected.

class trio.CapacityLimiterStatistics(borrowed_tokens: int, total_tokens: int | float, borrowers: list[Task | object], tasks_waiting: int)

An object containing debugging information.

Currently the following fields are defined:

class trio.LockStatistics(locked: bool, owner: Task | None, tasks_waiting: int)

An object containing debugging information for a Lock.

Currently the following fields are defined:

  • locked (boolean): indicating whether the lock is held.

  • owner: the trio.lowlevel.Task currently holding the lock, or None if the lock is not held.

  • tasks_waiting (int): The number of tasks blocked on this lock’s trio.Lock.acquire() method.

class trio.ConditionStatistics(tasks_waiting: int, lock_statistics: LockStatistics)

An object containing debugging information for a Condition.

Currently the following fields are defined:

Notes on async generators

Python 3.6 added support for async generators, which can use await, async for, and async with in between their yield statements. As you might expect, you use async for to iterate over them. PEP 525 has many more details if you want them.

For example, the following is a roundabout way to print the numbers 0 through 9 with a 1-second delay before each one:

async def range_slowly(*args):
    """Like range(), but adds a 1-second sleep before each value."""
    for value in range(*args):
        await trio.sleep(1)
        yield value

async def use_it():
    async for value in range_slowly(10):
        print(value)

trio.run(use_it)

Trio supports async generators, with some caveats described in this section.

Finalization

If you iterate over an async generator in its entirety, like the example above does, then the execution of the async generator will occur completely in the context of the code that’s iterating over it, and there aren’t too many surprises.

If you abandon a partially-completed async generator, though, such as by breaking out of the iteration, things aren’t so simple. The async generator iterator object is still alive, waiting for you to resume iterating it so it can produce more values. At some point, Python will realize that you’ve dropped all references to the iterator, and will call on Trio to throw in a GeneratorExit exception so that any remaining cleanup code inside the generator has a chance to run: finally blocks, __aexit__ handlers, and so on.

So far, so good. Unfortunately, Python provides no guarantees about when this happens. It could be as soon as you break out of the async for loop, or an arbitrary amount of time later. It could even be after the entire Trio run has finished! Just about the only guarantee is that it won’t happen in the task that was using the generator. That task will continue on with whatever else it’s doing, and the async generator cleanup will happen “sometime later, somewhere else”: potentially with different context variables, not subject to timeouts, and/or after any nurseries you’re using have been closed.

If you don’t like that ambiguity, and you want to ensure that a generator’s finally blocks and __aexit__ handlers execute as soon as you’re done using it, then you’ll need to wrap your use of the generator in something like async_generator.aclosing():

# Instead of this:
async for value in my_generator():
    if value == 42:
        break

# Do this:
async with aclosing(my_generator()) as aiter:
    async for value in aiter:
        if value == 42:
            break

This is cumbersome, but Python unfortunately doesn’t provide any other reliable options. If you use aclosing(), then your generator’s cleanup code executes in the same context as the rest of its iterations, so timeouts, exceptions, and context variables work like you’d expect.

If you don’t use aclosing(), then Trio will do its best anyway, but you’ll have to contend with the following semantics:

  • The cleanup of the generator occurs in a cancelled context, i.e., all blocking calls executed during cleanup will raise Cancelled. This is to compensate for the fact that any timeouts surrounding the original use of the generator have been long since forgotten.

  • The cleanup runs without access to any context variables that may have been present when the generator was originally being used.

  • If the generator raises an exception during cleanup, then it’s printed to the trio.async_generator_errors logger and otherwise ignored.

  • If an async generator is still alive at the end of the whole call to trio.run(), then it will be cleaned up after all tasks have exited and before trio.run() returns. Since the “system nursery” has already been closed at this point, Trio isn’t able to support any new calls to trio.lowlevel.spawn_system_task().

If you plan to run your code on PyPy to take advantage of its better performance, you should be aware that PyPy is far more likely than CPython to perform async generator cleanup at a time well after the last use of the generator. (This is a consequence of the fact that PyPy does not use reference counting to manage memory.) To help catch issues like this, Trio will issue a ResourceWarning (ignored by default, but enabled when running under python -X dev for example) for each async generator that needs to be handled through the fallback finalization path.

Cancel scopes and nurseries

Warning

You may not write a yield statement that suspends an async generator inside a CancelScope or Nursery that was entered within the generator.

That is, this is OK:

async def some_agen():
    with trio.move_on_after(1):
        await long_operation()
    yield "first"
    async with trio.open_nursery() as nursery:
        nursery.start_soon(task1)
        nursery.start_soon(task2)
    yield "second"
    ...

But this is not:

async def some_agen():
    with trio.move_on_after(1):
        yield "first"
    async with trio.open_nursery() as nursery:
        yield "second"
    ...

Async generators decorated with @asynccontextmanager to serve as the template for an async context manager are not subject to this constraint, because @asynccontextmanager uses them in a limited way that doesn’t create problems.

Violating the rule described in this section will sometimes get you a useful error message, but Trio is not able to detect all such cases, so sometimes you’ll get an unhelpful TrioInternalError. (And sometimes it will seem to work, which is probably the worst outcome of all, since then you might not notice the issue until you perform some minor refactoring of the generator or the code that’s iterating it, or just get unlucky. There is a proposed Python enhancement that would at least make it fail consistently.)

The reason for the restriction on cancel scopes has to do with the difficulty of noticing when a generator gets suspended and resumed. The cancel scopes inside the generator shouldn’t affect code running outside the generator, but Trio isn’t involved in the process of exiting and reentering the generator, so it would be hard pressed to keep its cancellation plumbing in the correct state. Nurseries use a cancel scope internally, so they have all the problems of cancel scopes plus a number of problems of their own: for example, when the generator is suspended, what should the background tasks do? There’s no good way to suspend them, but if they keep running and throw an exception, where can that exception be reraised?

If you have an async generator that wants to yield from within a nursery or cancel scope, your best bet is to refactor it to be a separate task that communicates over memory channels. The trio_util package offers a decorator that does this for you transparently.

For more discussion, see Trio issues 264 (especially this comment) and 638.

Threads (if you must)

In a perfect world, all third-party libraries and low-level APIs would be natively async and integrated into Trio, and all would be happiness and rainbows.

That world, alas, does not (yet) exist. Until it does, you may find yourself needing to interact with non-Trio APIs that do rude things like “blocking”.

In acknowledgment of this reality, Trio provides two useful utilities for working with real, operating-system level, threading-module-style threads. First, if you’re in Trio but need to push some blocking I/O into a thread, there’s trio.to_thread.run_sync. And if you’re in a thread and need to communicate back with Trio, you can use trio.from_thread.run() and trio.from_thread.run_sync().

Trio’s philosophy about managing worker threads

If you’ve used other I/O frameworks, you may have encountered the concept of a “thread pool”, which is most commonly implemented as a fixed size collection of threads that hang around waiting for jobs to be assigned to them. These solve two different problems: First, re-using the same threads over and over is more efficient than starting and stopping a new thread for every job you need done; basically, the pool acts as a kind of cache for idle threads. And second, having a fixed size avoids getting into a situation where 100,000 jobs are submitted simultaneously, and then 100,000 threads are spawned and the system gets overloaded and crashes. Instead, the N threads start executing the first N jobs, while the other (100,000 - N) jobs sit in a queue and wait their turn. Which is generally what you want, and this is how trio.to_thread.run_sync() works by default.

The downside of this kind of thread pool is that sometimes, you need more sophisticated logic for controlling how many threads are run at once. For example, you might want a policy like “at most 20 threads total, but no more than 3 of those can be running jobs associated with the same user account”, or you might want a pool whose size is dynamically adjusted over time in response to system conditions.

It’s even possible for a fixed-size policy to cause unexpected deadlocks. Imagine a situation where we have two different types of blocking jobs that you want to run in the thread pool, type A and type B. Type A is pretty simple: it just runs and completes pretty quickly. But type B is more complicated: it has to stop in the middle and wait for some other work to finish, and that other work includes running a type A job. Now, suppose you submit N jobs of type B to the pool. They all start running, and then eventually end up submitting one or more jobs of type A. But since every thread in our pool is already busy, the type A jobs don’t actually start running – they just sit in a queue waiting for the type B jobs to finish. But the type B jobs will never finish, because they’re waiting for the type A jobs. Our system has deadlocked. The ideal solution to this problem is to avoid having type B jobs in the first place – generally it’s better to keep complex synchronization logic in the main Trio thread. But if you can’t do that, then you need a custom thread allocation policy that tracks separate limits for different types of jobs, and make it impossible for type B jobs to fill up all the slots that type A jobs need to run.

So, we can see that it’s important to be able to change the policy controlling the allocation of threads to jobs. But in many frameworks, this requires implementing a new thread pool from scratch, which is highly non-trivial; and if different types of jobs need different policies, then you may have to create multiple pools, which is inefficient because now you effectively have two different thread caches that aren’t sharing resources.

Trio’s solution to this problem is to split worker thread management into two layers. The lower layer is responsible for taking blocking I/O jobs and arranging for them to run immediately on some worker thread. It takes care of solving the tricky concurrency problems involved in managing threads and is responsible for optimizations like re-using threads, but has no admission control policy: if you give it 100,000 jobs, it will spawn 100,000 threads. The upper layer is responsible for providing the policy to make sure that this doesn’t happen – but since it only has to worry about policy, it can be much simpler. In fact, all there is to it is the limiter= argument passed to trio.to_thread.run_sync(). This defaults to a global CapacityLimiter object, which gives us the classic fixed-size thread pool behavior. (See trio.to_thread.current_default_thread_limiter().) But if you want to use “separate pools” for type A jobs and type B jobs, then it’s just a matter of creating two separate CapacityLimiter objects and passing them in when running these jobs. Or here’s an example of defining a custom policy that respects the global thread limit, while making sure that no individual user can use more than 3 threads at a time:

class CombinedLimiter:
     def __init__(self, first, second):
         self._first = first
         self._second = second

     async def acquire_on_behalf_of(self, borrower):
         # Acquire both, being careful to clean up properly on error
         await self._first.acquire_on_behalf_of(borrower)
         try:
             await self._second.acquire_on_behalf_of(borrower)
         except:
             self._first.release_on_behalf_of(borrower)
             raise

     def release_on_behalf_of(self, borrower):
         # Release both, being careful to clean up properly on error
         try:
             self._second.release_on_behalf_of(borrower)
         finally:
             self._first.release_on_behalf_of(borrower)


# Use a weak value dictionary, so that we don't waste memory holding
# limiter objects for users who don't have any worker threads running.
USER_LIMITERS = weakref.WeakValueDictionary()
MAX_THREADS_PER_USER = 3

def get_user_limiter(user_id):
    try:
        return USER_LIMITERS[user_id]
    except KeyError:
        per_user_limiter = trio.CapacityLimiter(MAX_THREADS_PER_USER)
        global_limiter = trio.current_default_thread_limiter()
        # IMPORTANT: acquire the per_user_limiter before the global_limiter.
        # If we get 100 jobs for a user at the same time, we want
        # to only allow 3 of them at a time to even compete for the
        # global thread slots.
        combined_limiter = CombinedLimiter(per_user_limiter, global_limiter)
        USER_LIMITERS[user_id] = combined_limiter
        return combined_limiter


async def run_sync_in_thread_for_user(user_id, sync_fn, *args):
    combined_limiter = get_user_limiter(user_id)
    return await trio.to_thread.run_sync(sync_fn, *args, limiter=combined_limiter)

Putting blocking I/O into worker threads

await trio.to_thread.run_sync(sync_fn: Callable[..., RetT], *args: object, thread_name: str | None = None, abandon_on_cancel: bool | None = None, cancellable: bool | None = None, limiter: CapacityLimiter | None = None) RetT

Convert a blocking operation into an async operation using a thread.

These two lines are equivalent:

sync_fn(*args)
await trio.to_thread.run_sync(sync_fn, *args)

except that if sync_fn takes a long time, then the first line will block the Trio loop while it runs, while the second line allows other Trio tasks to continue working while sync_fn runs. This is accomplished by pushing the call to sync_fn(*args) off into a worker thread.

From inside the worker thread, you can get back into Trio using the functions in trio.from_thread.

Parameters:
  • sync_fn – An arbitrary synchronous callable.

  • *args – Positional arguments to pass to sync_fn. If you need keyword arguments, use functools.partial().

  • abandon_on_cancel (bool) – Whether to abandon this thread upon cancellation of this operation. See discussion below.

  • thread_name (str) – Optional string to set the name of the thread. Will always set threading.Thread.name, but only set the os name if pthread.h is available (i.e. most POSIX installations). pthread names are limited to 15 characters, and can be read from /proc/<PID>/task/<SPID>/comm or with ps -eT, among others. Defaults to {sync_fn.__name__|None} from {trio.lowlevel.current_task().name}.

  • limiter (None, or CapacityLimiter-like object) –

    An object used to limit the number of simultaneous threads. Most commonly this will be a CapacityLimiter, but it could be anything providing compatible acquire_on_behalf_of() and release_on_behalf_of() methods. This function will call acquire_on_behalf_of before starting the thread, and release_on_behalf_of after the thread has finished.

    If None (the default), uses the default CapacityLimiter, as returned by current_default_thread_limiter().

Cancellation handling: Cancellation is a tricky issue here, because neither Python nor the operating systems it runs on provide any general mechanism for cancelling an arbitrary synchronous function running in a thread. This function will always check for cancellation on entry, before starting the thread. But once the thread is running, there are two ways it can handle being cancelled:

  • If abandon_on_cancel=False, the function ignores the cancellation and keeps going, just like if we had called sync_fn synchronously. This is the default behavior.

  • If abandon_on_cancel=True, then this function immediately raises Cancelled. In this case the thread keeps running in background – we just abandon it to do whatever it’s going to do, and silently discard any return value or errors that it raises. Only use this if you know that the operation is safe and side-effect free. (For example: trio.socket.getaddrinfo() uses a thread with abandon_on_cancel=True, because it doesn’t really affect anything if a stray hostname lookup keeps running in the background.)

    The limiter is only released after the thread has actually finished – which in the case of cancellation may be some time after this function has returned. If trio.run() finishes before the thread does, then the limiter release method will never be called at all.

Warning

You should not use this function to call long-running CPU-bound functions! In addition to the usual GIL-related reasons why using threads for CPU-bound work is not very effective in Python, there is an additional problem: on CPython, CPU-bound threads tend to “starve out” IO-bound threads, so using threads for CPU-bound work is likely to adversely affect the main thread running Trio. If you need to do this, you’re better off using a worker process, or perhaps PyPy (which still has a GIL, but may do a better job of fairly allocating CPU time between threads).

Returns:

Whatever sync_fn(*args) returns.

Raises:

Exception – Whatever sync_fn(*args) raises.

trio.to_thread.current_default_thread_limiter() CapacityLimiter

Get the default CapacityLimiter used by trio.to_thread.run_sync.

The most common reason to call this would be if you want to modify its total_tokens attribute.

Getting back into the Trio thread from another thread

trio.from_thread.run(afn: Callable[..., Awaitable[RetT]], *args: object, trio_token: TrioToken | None = None) RetT

Run the given async function in the parent Trio thread, blocking until it is complete.

Returns:

Whatever afn(*args) returns.

Returns or raises whatever the given function returns or raises. It can also raise exceptions of its own:

Raises:
  • RunFinishedError – if the corresponding call to trio.run() has already completed, or if the run has started its final cleanup phase and can no longer spawn new system tasks.

  • Cancelled – If the original call to trio.to_thread.run_sync() is cancelled (if trio_token is None) or the call to trio.run() completes (if trio_token is not None) while afn(*args) is running, then afn is likely to raise trio.Cancelled.

  • RuntimeError – if you try calling this from inside the Trio thread, which would otherwise cause a deadlock, or if no trio_token was provided, and we can’t infer one from context.

  • TypeError – if afn is not an asynchronous function.

Locating a TrioToken: There are two ways to specify which trio.run loop to reenter:

  • Spawn this thread from trio.to_thread.run_sync. Trio will automatically capture the relevant Trio token and use it to re-enter the same Trio task.

  • Pass a keyword argument, trio_token specifying a specific trio.run loop to re-enter. This is useful in case you have a “foreign” thread, spawned using some other framework, and still want to enter Trio, or if you want to use a new system task to call afn, maybe to avoid the cancellation context of a corresponding trio.to_thread.run_sync task.

trio.from_thread.run_sync(fn: Callable[..., RetT], *args: object, trio_token: TrioToken | None = None) RetT

Run the given sync function in the parent Trio thread, blocking until it is complete.

Returns:

Whatever fn(*args) returns.

Returns or raises whatever the given function returns or raises. It can also raise exceptions of its own:

Raises:
  • RunFinishedError – if the corresponding call to trio.run has already completed.

  • RuntimeError – if you try calling this from inside the Trio thread, which would otherwise cause a deadlock or if no trio_token was provided, and we can’t infer one from context.

  • TypeError – if fn is an async function.

Locating a TrioToken: There are two ways to specify which trio.run loop to reenter:

  • Spawn this thread from trio.to_thread.run_sync. Trio will automatically capture the relevant Trio token and use it when you want to re-enter Trio.

  • Pass a keyword argument, trio_token specifying a specific trio.run loop to re-enter. This is useful in case you have a “foreign” thread, spawned using some other framework, and still want to enter Trio, or if you want to use a new system task to call fn, maybe to avoid the cancellation context of a corresponding trio.to_thread.run_sync task.

This will probably be clearer with an example. Here we demonstrate how to spawn a child thread, and then use a memory channel to send messages between the thread and a Trio task:

import trio


def thread_fn(receive_from_trio, send_to_trio):
    while True:
        # Since we're in a thread, we can't call methods on Trio
        # objects directly -- so we use trio.from_thread to call them.
        try:
            request = trio.from_thread.run(receive_from_trio.receive)
        except trio.EndOfChannel:
            trio.from_thread.run(send_to_trio.aclose)
            return
        else:
            response = request + 1
            trio.from_thread.run(send_to_trio.send, response)


async def main():
    send_to_thread, receive_from_trio = trio.open_memory_channel(0)
    send_to_trio, receive_from_thread = trio.open_memory_channel(0)

    async with trio.open_nursery() as nursery:
        # In a background thread, run:
        #   thread_fn(receive_from_trio, send_to_trio)
        nursery.start_soon(
            trio.to_thread.run_sync, thread_fn, receive_from_trio, send_to_trio
        )

        # prints "1"
        await send_to_thread.send(0)
        print(await receive_from_thread.receive())

        # prints "2"
        await send_to_thread.send(1)
        print(await receive_from_thread.receive())

        # When we close the channel, it signals the thread to exit.
        await send_to_thread.aclose()

        # When we exit the nursery, it waits for the background thread to
        # exit.


trio.run(main)

Note

The from_thread.run* functions reuse the host task that called trio.to_thread.run_sync() to run your provided function, as long as you’re using the default abandon_on_cancel=False so Trio can be sure that the task will remain around to perform the work. If you pass abandon_on_cancel=True at the outset, or if you provide a TrioToken when calling back in to Trio, your functions will be executed in a new system task. Therefore, the current_task(), current_effective_deadline(), or other task-tree specific values may differ depending on keyword argument values.

You can also use trio.from_thread.check_cancelled() to check for cancellation from a thread that was spawned by trio.to_thread.run_sync(). If the call to run_sync() was cancelled, then check_cancelled() will raise trio.Cancelled(). It’s like trio.from_thread.run(trio.sleep, 0), but much faster.

trio.from_thread.check_cancelled() None

Raise trio.Cancelled if the associated Trio task entered a cancelled status.

Only applicable to threads spawned by trio.to_thread.run_sync. Poll to allow abandon_on_cancel=False threads to raise Cancelled at a suitable place, or to end abandoned abandon_on_cancel=True threads sooner than they may otherwise.

Raises:

Note

To be precise, check_cancelled() checks whether the task running trio.to_thread.run_sync() has ever been cancelled since the last time it was running a trio.from_thread.run() or trio.from_thread.run_sync() function. It may raise trio.Cancelled even if a cancellation occurred that was later hidden by a modification to trio.CancelScope.shield between the cancelled CancelScope and trio.to_thread.run_sync(). This differs from the behavior of normal Trio checkpoints, which raise Cancelled only if the cancellation is still active when the checkpoint executes. The distinction here is exceedingly unlikely to be relevant to your application, but we mention it for completeness.

Threads and task-local storage

When working with threads, you can use the same contextvars we discussed above, because their values are preserved.

This is done by automatically copying the contextvars context when you use any of:

That means that the values of the context variables are accessible even in worker threads, or when sending a function to be run in the main/parent Trio thread using trio.from_thread.run from one of these worker threads.

But it also means that as the context is not the same but a copy, if you set the context variable value inside one of these functions that work in threads, the new value will only be available in that context (that was copied). So, the new value will be available for that function and other internal/children tasks, but the value won’t be available in the parent thread.

If you need to modify values that would live in the context variables and you need to make those modifications from the child threads, you can instead set a mutable object (e.g. a dictionary) in the context variable of the top level/parent Trio thread. Then in the children, instead of setting the context variable, you can get the same object, and modify its values. That way you keep the same object in the context variable and only mutate it in child threads.

This way, you can modify the object content in child threads and still access the new content in the parent thread.

Here’s an example:

import contextvars
import time

import trio

request_state = contextvars.ContextVar("request_state")

# Blocking function that should be run on a thread
# It could be reading or writing files, communicating with a database
# with a driver not compatible with async / await, etc.
def work_in_thread(msg):
    # Only use request_state.get() inside the worker thread
    state_value = request_state.get()
    current_user_id = state_value["current_user_id"]
    time.sleep(3)  # this would be some blocking call, like reading a file
    print(f"Processed user {current_user_id} with message {msg} in a thread worker")
    # Modify/mutate the state object, without setting the entire
    # contextvar with request_state.set()
    state_value["msg"] = msg


# An example "request handler" that does some work itself and also
# spawns some helper tasks in threads to execute blocking code.
async def handle_request(current_user_id):
    # Write to task-local storage:
    current_state = {"current_user_id": current_user_id, "msg": ""}
    request_state.set(current_state)

    # Here the current implicit contextvars context will be automatically copied
    # inside the worker thread
    await trio.to_thread.run_sync(work_in_thread, f"Hello {current_user_id}")
    # Extract the value set inside the thread in the same object stored in a contextvar
    new_msg = current_state["msg"]
    print(
        f"New contextvar value from worker thread for user {current_user_id}: {new_msg}"
    )


# Spawn several "request handlers" simultaneously, to simulate a
# busy server handling multiple requests at the same time.
async def main():
    async with trio.open_nursery() as nursery:
        for i in range(3):
            nursery.start_soon(handle_request, i)


trio.run(main)

Running that script will result in the output:

Processed user 2 with message Hello 2 in a thread worker
Processed user 0 with message Hello 0 in a thread worker
Processed user 1 with message Hello 1 in a thread worker
New contextvar value from worker thread for user 2: Hello 2
New contextvar value from worker thread for user 1: Hello 1
New contextvar value from worker thread for user 0: Hello 0

If you are using contextvars or you are using a library that uses them, now you know how they interact when working with threads in Trio.

But have in mind that in many cases it might be a lot simpler to not use context variables in your own code and instead pass values in arguments, as it might be more explicit and might be easier to reason about.

Note

The context is automatically copied instead of using the same parent context because a single context can’t be used in more than one thread, it’s not supported by contextvars.

Exceptions and warnings

exception trio.Cancelled(*args: object, **kwargs: object)

Raised by blocking calls if the surrounding scope has been cancelled.

You should let this exception propagate, to be caught by the relevant cancel scope. To remind you of this, it inherits from BaseException instead of Exception, just like KeyboardInterrupt and SystemExit do. This means that if you write something like:

try:
    ...
except Exception:
    ...

then this won’t catch a Cancelled exception.

You cannot raise Cancelled yourself. Attempting to do so will produce a TypeError. Use cancel_scope.cancel() instead.

Note

In the US it’s also common to see this word spelled “canceled”, with only one “l”. This is a recent and US-specific innovation, and even in the US both forms are still commonly used. So for consistency with the rest of the world and with “cancellation” (which always has two “l”s), Trio uses the two “l” spelling everywhere.

exception trio.TooSlowError

Raised by fail_after() and fail_at() if the timeout expires.

exception trio.WouldBlock

Raised by X_nowait functions if X would block.

exception trio.EndOfChannel

Raised when trying to receive from a trio.abc.ReceiveChannel that has no more data to receive.

This is analogous to an “end-of-file” condition, but for channels.

exception trio.BusyResourceError

Raised when a task attempts to use a resource that some other task is already using, and this would lead to bugs and nonsense.

For example, if two tasks try to send data through the same socket at the same time, Trio will raise BusyResourceError instead of letting the data get scrambled.

exception trio.ClosedResourceError

Raised when attempting to use a resource after it has been closed.

Note that “closed” here means that your code closed the resource, generally by calling a method with a name like close or aclose, or by exiting a context manager. If a problem arises elsewhere – for example, because of a network failure, or because a remote peer closed their end of a connection – then that should be indicated by a different exception class, like BrokenResourceError or an OSError subclass.

exception trio.BrokenResourceError

Raised when an attempt to use a resource fails due to external circumstances.

For example, you might get this if you try to send data on a stream where the remote side has already closed the connection.

You don’t get this error if you closed the resource – in that case you get ClosedResourceError.

This exception’s __cause__ attribute will often contain more information about the underlying error.

exception trio.RunFinishedError

Raised by trio.from_thread.run and similar functions if the corresponding call to trio.run() has already finished.

exception trio.TrioInternalError

Raised by run() if we encounter a bug in Trio, or (possibly) a misuse of one of the low-level trio.lowlevel APIs.

This should never happen! If you get this error, please file a bug.

Unfortunately, if you get this error it also means that all bets are off – Trio doesn’t know what is going on and its normal invariants may be void. (For example, we might have “lost track” of a task. Or lost track of all tasks.) Again, though, this shouldn’t happen.

exception trio.TrioDeprecationWarning

Bases: FutureWarning

Warning emitted if you use deprecated Trio functionality.

As a young project, Trio is currently quite aggressive about deprecating and/or removing functionality that we realize was a bad idea. If you use Trio, you should subscribe to issue #1 to get information about upcoming deprecations and other backwards compatibility breaking changes.

Despite the name, this class currently inherits from FutureWarning, not DeprecationWarning, because while we’re in young-and-aggressive mode we want these warnings to be visible by default. You can hide them by installing a filter or with the -W switch: see the warnings documentation for details.