# Introspecting and extending Trio with trio.hazmat¶

Warning

You probably don’t want to use this module.

trio.hazmat is Trio’s “hazardous materials” layer: it contains APIs useful for introspecting and extending Trio. If you’re writing ordinary, everyday code, then you can ignore this module completely. But sometimes you need something a bit lower level. Here are some examples of situations where you should reach for trio.hazmat:

• You want to implement a new synchronization primitive that Trio doesn’t (yet) provide, like a reader-writer lock.
• You want to extract low-level metrics to monitor the health of your application.
• You want to add support for a low-level operating system interface that Trio doesn’t (yet) expose, like watching a filesystem directory for changes.
• You want to implement an interface for calling between Trio and another event loop within the same process.
• You’re writing a debugger and want to visualize Trio’s task tree.
• You need to interoperate with a C library whose API exposes raw file descriptors.

Using trio.hazmat isn’t really that hazardous; in fact you’re already using it – it’s how most of the functionality described in previous chapters is implemented. The APIs described here have strictly defined and carefully documented semantics, and are perfectly safe – if you read carefully and take proper precautions. Some of those strict semantics have nasty big pointy teeth. If you make a mistake, Trio may not be able to handle it gracefully; conventions and guarantees that are followed strictly in the rest of Trio do not always apply. Using this module makes it your responsibility to think through and handle the nasty cases to expose a friendly Trio-style API to your users.

## Debugging and instrumentation¶

Trio tries hard to provide useful hooks for debugging and instrumentation. Some are documented above (the nursery introspection attributes, trio.Lock.statistics(), etc.). Here are some more.

### Global statistics¶

trio.hazmat.current_statistics()

Returns an object containing run-loop-level debugging information.

Currently the following fields are defined:

• tasks_living (int): The number of tasks that have been spawned and not yet exited.
• tasks_runnable (int): The number of tasks that are currently queued on the run queue (as opposed to blocked waiting for something to happen).
• seconds_to_next_deadline (float): The time until the next pending cancel scope deadline. May be negative if the deadline has expired but we haven’t yet processed cancellations. May be inf if there are no pending deadlines.
• run_sync_soon_queue_size (int): The number of unprocessed callbacks queued via trio.hazmat.TrioToken.run_sync_soon().
• io_statistics (object): Some statistics from trio’s I/O backend. This always has an attribute backend which is a string naming which operating-system-specific I/O backend is in use; the other attributes vary between backends.

### The current clock¶

trio.hazmat.current_clock()

Returns the current Clock.

### Instrument API¶

The instrument API provides a standard way to add custom instrumentation to the run loop. Want to make a histogram of scheduling latencies, log a stack trace of any task that blocks the run loop for >50 ms, or measure what percentage of your process’s running time is spent waiting for I/O? This is the place.

The general idea is that at any given moment, trio.run() maintains a set of “instruments”, which are objects that implement the trio.abc.Instrument interface. When an interesting event happens, it loops over these instruments and notifies them by calling an appropriate method. The tutorial has a simple example of using this for tracing.

Since this hooks into trio at a rather low level, you do have to be careful. The callbacks are run synchronously, and in many cases if they error out then there isn’t any plausible way to propagate this exception (for instance, we might be deep in the guts of the exception propagation machinery…). Therefore our current strategy for handling exceptions raised by instruments is to (a) log an exception to the "trio.abc.Instrument" logger, which by default prints a stack trace to standard error and (b) disable the offending instrument.

You can register an initial list of instruments by passing them to trio.run(). add_instrument() and remove_instrument() let you add and remove instruments at runtime.

trio.hazmat.add_instrument(instrument)

Start instrumenting the current run loop with the given instrument.

Parameters: instrument (trio.abc.Instrument) – The instrument to activate.

If instrument is already active, does nothing.

trio.hazmat.remove_instrument(instrument)

Stop instrumenting the current run loop with the given instrument.

Parameters: instrument (trio.abc.Instrument) – The instrument to de-activate. KeyError – if the instrument is not currently active. This could occur either because you never added it, or because you added it and then it raised an unhandled exception and was automatically deactivated.

And here’s the interface to implement if you want to build your own Instrument:

class trio.abc.Instrument

The interface for run loop instrumentation.

Instruments don’t have to inherit from this abstract base class, and all of these methods are optional. This class serves mostly as documentation.

after_io_wait(timeout)

Called after handling pending I/O.

Parameters: timeout (float) – The number of seconds we were willing to wait. This much time may or may not have elapsed, depending on whether any I/O was ready.
after_run()

Called just before trio.run() returns.

after_task_step(task)

before_io_wait(timeout)

Called before blocking to wait for I/O readiness.

Parameters: timeout (float) – The number of seconds we are willing to wait.
before_run()

Called at the beginning of trio.run().

before_task_step(task)

Called immediately before we resume running the given task.

task_exited(task)

Called when the given task exits.

task_scheduled(task)

Called when the given task becomes runnable.

It may still be some time before it actually runs, if there are other runnable tasks ahead of it.

task_spawned(task)

Called when the given task is created.

The tutorial has a fully-worked example of defining a custom instrument to log trio’s internal scheduling decisions.

## Low-level I/O primitives¶

Different environments expose different low-level APIs for performing async I/O. trio.hazmat exposes these APIs in a relatively direct way, so as to allow maximum power and flexibility for higher level code. However, this means that the exact API provided may vary depending on what system trio is running on.

### Universally available API¶

All environments provide the following functions:

await trio.hazmat.wait_socket_readable(sock)

Block until the given socket.socket() object is readable.

On Unix systems, sockets are fds, and this is identical to wait_readable(). On Windows, SOCKET handles and fds are different, and this works on SOCKET handles or Python socket objects.

await trio.hazmat.wait_socket_writable(sock)

Block until the given socket.socket() object is writable.

On Unix systems, sockets are fds, and this is identical to wait_writable(). On Windows, SOCKET handles and fds are different, and this works on SOCKET handles or Python socket objects.

Raises: trio.BusyResourceError – if another task is already waiting for the given socket to become writable. trio.ClosedResourceError – if another task calls notify_socket_close() while this function is still working.
trio.hazmat.notify_socket_close(sock)

Notifies Trio’s internal I/O machinery that you are about to close a socket.

This causes any operations currently waiting for this socket to immediately raise ClosedResourceError.

This does not actually close the socket. Generally when closing a socket, you should first call this function, and then close the socket.

On Unix systems, sockets are fds, and this is identical to notify_fd_close(). On Windows, SOCKET handles and fds are different, and this works on SOCKET handles or Python socket objects.

### Unix-specific API¶

Unix-like systems provide the following functions:

await trio.hazmat.wait_readable(fd)

Block until the given file descriptor is readable.

Warning

This is “readable” according to the operating system’s definition of readable. In particular, it probably won’t tell you anything useful for on-disk files.

Parameters: fd – integer file descriptor, or else an object with a fileno() method trio.BusyResourceError – if another task is already waiting for the given fd to become readable. trio.ClosedResourceError – if another task calls notify_fd_close() while this function is still working.
await trio.hazmat.wait_writable(fd)

Block until the given file descriptor is writable.

Warning

This is “writable” according to the operating system’s definition of writable. In particular, it probably won’t tell you anything useful for on-disk files.

Parameters: fd – integer file descriptor, or else an object with a fileno() method trio.BusyResourceError – if another task is already waiting for the given fd to become writable. trio.ClosedResourceError – if another task calls notify_fd_close() while this function is still working.
trio.hazmat.notify_fd_close(fd)

Notifies Trio’s internal I/O machinery that you are about to close a file descriptor.

This causes any operations currently waiting for this file descriptor to immediately raise ClosedResourceError.

This does not actually close the file descriptor. Generally when closing a file descriptor, you should first call this function, and then actually close it.

### Kqueue-specific API¶

TODO: these are implemented, but are currently more of a sketch than anything real. See #26.

trio.hazmat.current_kqueue()
await trio.hazmat.wait_kevent(ident, filter, abort_func)
with trio.hazmat.monitor_kevent(ident, filter) as queue

### Windows-specific API¶

await trio.hazmat.WaitForSingleObject(handle)

Async and cancellable variant of WaitForSingleObject. Windows only.

Parameters: handle – A Win32 object handle, as a Python integer. OSError – If the handle is invalid, e.g. when it is already closed.

TODO: these are implemented, but are currently more of a sketch than anything real. See #26 and #52.

trio.hazmat.register_with_iocp(handle)
await trio.hazmat.wait_overlapped(handle, lpOverlapped)
trio.hazmat.current_iocp()
with trio.hazmat.monitor_completion_key() as queue

## Global state: system tasks and run-local variables¶

class trio.hazmat.RunVar(name, default=<object object>)

The run-local variant of a context variable.

RunVar objects are similar to context variable objects, except that they are shared across a single call to trio.run() rather than a single task.

trio.hazmat.spawn_system_task(async_fn, *args, name=None)

• They don’t need an explicit nursery; instead they go into the internal “system nursery”.
• If a system task raises an exception, then it’s converted into a TrioInternalError and all tasks are cancelled. If you write a system task, you should be careful to make sure it doesn’t crash.
• By default, system tasks have KeyboardInterrupt protection enabled. If you want your task to be interruptible by control-C, then you need to use disable_ki_protection() explicitly (and come up with some plan for what to do with a KeyboardInterrupt, given that system tasks aren’t allowed to raise exceptions).
• System tasks do not inherit context variables from their creator.
Parameters: async_fn – An async callable. args – Positional arguments for async_fn. If you want to pass keyword arguments, use functools.partial(). name – The name for this task. Only used for debugging/introspection (e.g. repr(task_obj)). If this isn’t a string, spawn_system_task() will try to make it one. A common use case is if you’re wrapping a function before spawning a new task, you might pass the original function as the name= to make debugging easier. the newly spawned task Task

## Trio tokens¶

class trio.hazmat.TrioToken

An opaque object representing a single call to trio.run().

It has no public constructor; instead, see current_trio_token().

This object has two uses:

1. It lets you re-enter the Trio run loop from external threads or signal handlers. This is the low-level primitive that trio.run_sync_in_worker_thread() uses to receive results from worker threads, that trio.open_signal_receiver() uses to receive notifications about signals, and so forth.
2. Each call to trio.run() has exactly one associated TrioToken object, so you can use it to identify a particular call.
run_sync_soon(sync_fn, *args, idempotent=False)

Schedule a call to sync_fn(*args) to occur in the context of a trio task.

This is safe to call from the main thread, from other threads, and from signal handlers. This is the fundamental primitive used to re-enter the Trio run loop from outside of it.

The call will happen “soon”, but there’s no guarantee about exactly when, and no mechanism provided for finding out when it’s happened. If you need this, you’ll have to build your own.

The call is effectively run as part of a system task (see spawn_system_task()). In particular this means that:

All calls with idempotent=False are processed in strict first-in first-out order.

If idempotent=True, then sync_fn and args must be hashable, and trio will make a best-effort attempt to discard any call submission which is equal to an already-pending call. Trio will make an attempt to process these in first-in first-out order, but no guarantees. (Currently processing is FIFO on CPython 3.6 and PyPy, but not CPython 3.5.)

Any ordering guarantees apply separately to idempotent=False and idempotent=True calls; there’s no rule for how calls in the different categories are ordered with respect to each other.

Raises: trio.RunFinishedError – if the associated call to trio.run() has already exited. (Any call that doesn’t raise this error is guaranteed to be fully processed before trio.run() exits.)
trio.hazmat.current_trio_token()

Retrieve the TrioToken for the current call to trio.run().

## Safer KeyboardInterrupt handling¶

Trio’s handling of control-C is designed to balance usability and safety. On the one hand, there are sensitive regions (like the core scheduling loop) where it’s simply impossible to handle arbitrary KeyboardInterrupt exceptions while maintaining our core correctness invariants. On the other, if the user accidentally writes an infinite loop, we do want to be able to break out of that. Our solution is to install a default signal handler which checks whether it’s safe to raise KeyboardInterrupt at the place where the signal is received. If so, then we do; otherwise, we schedule a KeyboardInterrupt to be delivered to the main task at the next available opportunity (similar to how Cancelled is delivered).

So that’s great, but – how do we know whether we’re in one of the sensitive parts of the program or not?

This is determined on a function-by-function basis. By default, a function is protected if its caller is, and not if its caller isn’t; this is helpful because it means you only need to override the defaults at places where you transition from protected code to unprotected code or vice-versa.

These transitions are accomplished using two function decorators:

@trio.hazmat.disable_ki_protection

Decorator that marks the given regular function, generator function, async function, or async generator function as unprotected against KeyboardInterrupt, i.e., the code inside this function can be rudely interrupted by KeyboardInterrupt at any moment.

If you have multiple decorators on the same function, then this should be at the bottom of the stack (closest to the actual function).

An example of where you’d use this is in implementing something like run_in_trio_thread, which uses call_soon_thread_and_signal_safe to get into the trio thread. call_soon_thread_and_signal_safe callbacks are run with KeyboardInterrupt protection enabled, and run_in_trio_thread takes advantage of this to safely set up the machinery for sending a response back to the original thread, and then uses disable_ki_protection() when entering the user-provided function.

@trio.hazmat.enable_ki_protection

Decorator that marks the given regular function, generator function, async function, or async generator function as protected against KeyboardInterrupt, i.e., the code inside this function won’t be rudely interrupted by KeyboardInterrupt. (Though if it contains any checkpoints, then it can still receive KeyboardInterrupt at those. This is considered a polite interruption.)

Warning

Be very careful to only use this decorator on functions that you know will either exit in bounded time, or else pass through a checkpoint regularly. (Of course all of your functions should have this property, but if you mess it up here then you won’t even be able to use control-C to escape!)

If you have multiple decorators on the same function, then this should be at the bottom of the stack (closest to the actual function).

An example of where you’d use this is on the __exit__ implementation for something like a Lock, where a poorly-timed KeyboardInterrupt could leave the lock in an inconsistent state and cause a deadlock.

trio.hazmat.currently_ki_protected()

Check whether the calling code has KeyboardInterrupt protection enabled.

It’s surprisingly easy to think that one’s KeyboardInterrupt protection is enabled when it isn’t, or vice-versa. This function tells you what trio thinks of the matter, which makes it useful for asserts and unit tests.

Returns: True if protection is enabled, and False otherwise. bool

## Sleeping and waking¶

### Wait queue abstraction¶

class trio.hazmat.ParkingLot

A fair wait queue with cancellation and requeueing.

This class encapsulates the tricky parts of implementing a wait queue. It’s useful for implementing higher-level synchronization primitives like queues and locks.

In addition to the methods below, you can use len(parking_lot) to get the number of parked tasks, and if parking_lot: ... to check whether there are any parked tasks.

await park()

Park the current task until woken by a call to unpark() or unpark_all().

repark(new_lot, *, count=1)

Move parked tasks from one ParkingLot object to another.

This dequeues count tasks from one lot, and requeues them on another, preserving order. For example:

async def parker(lot):
print("sleeping")
await lot.park()
print("woken")

async def main():
lot1 = trio.hazmat.ParkingLot()
lot2 = trio.hazmat.ParkingLot()
async with trio.open_nursery() as nursery:
nursery.start_soon(parker, lot1)
assert len(lot1) == 1
assert len(lot2) == 0
lot1.repark(lot2)
assert len(lot1) == 0
assert len(lot2) == 1
# This wakes up the task that was originally parked in lot1
lot2.unpark()


If there are fewer than count tasks parked, then reparks as many tasks as are available and then returns successfully.

Parameters: new_lot (ParkingLot) – the parking lot to move tasks to. count (int) – the number of tasks to move.
repark_all(new_lot)

Move all parked tasks from one ParkingLot object to another.

See repark() for details.

statistics()

Return an object containing debugging information.

Currently the following fields are defined:

• tasks_waiting: The number of tasks blocked on this lot’s park() method.
unpark(*, count=1)

This wakes up count tasks that are blocked in park(). If there are fewer than count tasks parked, then wakes as many tasks are available and then returns successfully.

Parameters: count (int) – the number of tasks to unpark.
unpark_all()

### Low-level checkpoint functions¶

await trio.hazmat.checkpoint()

A pure checkpoint.

This checks for cancellation and allows other tasks to be scheduled, without otherwise blocking.

Note that the scheduler has the option of ignoring this and continuing to run the current task if it decides this is appropriate (e.g. for increased efficiency).

Equivalent to await trio.sleep(0) (which is implemented by calling checkpoint().)

The next two functions are used together to make up a checkpoint:

await trio.hazmat.checkpoint_if_cancelled()

Issue a checkpoint if the calling context has been cancelled.

Equivalent to (but potentially more efficient than):

if trio.current_deadline() == -inf:
await trio.hazmat.checkpoint()


This is either a no-op, or else it allow other tasks to be scheduled and then raises trio.Cancelled.

Typically used together with cancel_shielded_checkpoint().

await trio.hazmat.cancel_shielded_checkpoint()

Introduce a schedule point, but not a cancel point.

This is not a checkpoint, but it is half of a checkpoint, and when combined with checkpoint_if_cancelled() it can make a full checkpoint.

Equivalent to (but potentially more efficient than):

with trio.open_cancel_scope(shield=True):
await trio.hazmat.checkpoint()


These are commonly used in cases where you have an operation that might-or-might-not block, and you want to implement trio’s standard checkpoint semantics. Example:

async def operation_that_maybe_blocks():
await checkpoint_if_cancelled()
try:
ret = attempt_operation()
except BlockingIOError:
# need to block and then retry, which we do below
pass
except:
# some other error, finish the checkpoint then let it propagate
await cancel_shielded_checkpoint()
raise
else:
# operation succeeded, finish the checkpoint then return
await cancel_shielded_checkpoint()
return ret
while True:
try:
return attempt_operation()
except BlockingIOError:
pass


This logic is a bit convoluted, but accomplishes all of the following:

• Every execution path passes through a checkpoint (assuming that wait_for_operation_to_be_ready is an unconditional checkpoint)
• Our cancellation semantics say that Cancelled should only be raised if the operation didn’t happen. Using cancel_shielded_checkpoint() on the early-exit branches accomplishes this.
• On the path where we do end up blocking, we don’t pass through any schedule points before that, which avoids some unnecessary work.
• Avoids implicitly chaining the BlockingIOError with any errors raised by attempt_operation or wait_for_operation_to_be_ready, by keeping the while True: loop outside of the except BlockingIOError: block.

These functions can also be useful in other situations. For example, when trio.run_sync_in_worker_thread() schedules some work to run in a worker thread, it blocks until the work is finished (so it’s a schedule point), but by default it doesn’t allow cancellation. So to make sure that the call always acts as a checkpoint, it calls checkpoint_if_cancelled() before starting the thread.

### Low-level blocking¶

await trio.hazmat.wait_task_rescheduled(abort_func)

Put the current task to sleep, with cancellation support.

This is the lowest-level API for blocking in trio. Every time a Task blocks, it does so by calling this function (usually indirectly via some higher-level API).

This is a tricky interface with no guard rails. If you can use ParkingLot or the built-in I/O wait functions instead, then you should.

Generally the way it works is that before calling this function, you make arrangements for “someone” to call reschedule() on the current task at some later point.

Then you call wait_task_rescheduled(), passing in abort_func, an “abort callback”.

(Terminology: in trio, “aborting” is the process of attempting to interrupt a blocked task to deliver a cancellation.)

There are two possibilities for what happens next:

1. “Someone” calls reschedule() on the current task, and wait_task_rescheduled() returns or raises whatever value or error was passed to reschedule().

2. The call’s context transitions to a cancelled state (e.g. due to a timeout expiring). When this happens, the abort_func is called. It’s interface looks like:

def abort_func(raise_cancel):
...
return trio.hazmat.Abort.SUCCEEDED  # or FAILED


It should attempt to clean up any state associated with this call, and in particular, arrange that reschedule() will not be called later. If (and only if!) it is successful, then it should return Abort.SUCCEEDED, in which case the task will automatically be rescheduled with an appropriate Cancelled error.

Otherwise, it should return Abort.FAILED. This means that the task can’t be cancelled at this time, and still has to make sure that “someone” eventually calls reschedule().

At that point there are again two possibilities. You can simply ignore the cancellation altogether: wait for the operation to complete and then reschedule and continue as normal. (For example, this is what trio.run_sync_in_worker_thread() does if cancellation is disabled.) The other possibility is that the abort_func does succeed in cancelling the operation, but for some reason isn’t able to report that right away. (Example: on Windows, it’s possible to request that an async (“overlapped”) I/O operation be cancelled, but this request is also asynchronous – you don’t find out until later whether the operation was actually cancelled or not.) To report a delayed cancellation, then you should reschedule the task yourself, and call the raise_cancel callback passed to abort_func to raise a Cancelled (or possibly KeyboardInterrupt) exception into this task. Either of the approaches sketched below can work:

# Option 1:
# Catch the exception from raise_cancel and inject it into the task.
# (This is what trio does automatically for you if you return
# Abort.SUCCEEDED.)

# Option 2:
# wait to be woken by "someone", and then decide whether to raise
# the error from inside the task.
outer_raise_cancel = None
def abort(inner_raise_cancel):
nonlocal outer_raise_cancel
outer_raise_cancel = inner_raise_cancel
TRY_TO_CANCEL_OPERATION()
return trio.hazmat.Abort.FAILED
if OPERATION_WAS_SUCCESSFULLY_CANCELLED:
# raises the error
outer_raise_cancel()


In any case it’s guaranteed that we only call the abort_func at most once per call to wait_task_rescheduled().

Sometimes, it’s useful to be able to share some mutable sleep-related data between the sleeping task, the abort function, and the waking task. You can use the sleeping task’s custom_sleep_data attribute to store this data, and Trio won’t touch it, except to make sure that it gets cleared when the task is rescheduled.

Warning

If your abort_func raises an error, or returns any value other than Abort.SUCCEEDED or Abort.FAILED, then trio will crash violently. Be careful! Similarly, it is entirely possible to deadlock a trio program by failing to reschedule a blocked task, or cause havoc by calling reschedule() too many times. Remember what we said up above about how you should use a higher-level API if at all possible?

class trio.hazmat.Abort

enum.Enum used as the return value from abort functions.

See wait_task_rescheduled() for details.

SUCCEEDED
FAILED
trio.hazmat.reschedule(task, next_send=<object object>)

Reschedule the given task with the given outcome.Outcome.

See wait_task_rescheduled() for the gory details.

There must be exactly one call to reschedule() for every call to wait_task_rescheduled(). (And when counting, keep in mind that returning Abort.SUCCEEDED from an abort callback is equivalent to calling reschedule() once.)

Parameters: task (trio.hazmat.Task) – the task to be rescheduled. Must be blocked in a call to wait_task_rescheduled(). next_send (outcome.Outcome) – the value (or error) to return (or raise) from wait_task_rescheduled().

Here’s an example lock class implemented using wait_task_rescheduled() directly. This implementation has a number of flaws, including lack of fairness, O(n) cancellation, missing error checking, failure to insert a checkpoint on the non-blocking path, etc. If you really want to implement your own lock, then you should study the implementation of trio.Lock and use ParkingLot, which handles some of these issues for you. But this does serve to illustrate the basic structure of the wait_task_rescheduled() API:

class NotVeryGoodLock:
def __init__(self):
self._held = False

async def acquire(self):
while self._held:
def abort_fn(_):
return trio.hazmat.Abort.SUCCEEDED
self._held = True

def release(self):
self._held = False


trio.hazmat.current_root_task()

Returns the current root Task.

This is the task that is the ultimate parent of all other tasks.

trio.hazmat.current_task()

Return the Task object representing the current task.

Returns: the Task that called current_task(). Task
class trio.hazmat.Task

A Task object represents a concurrent “thread” of execution. It has no public constructor; Trio internally creates a Task object for each call to nursery.start(...) or nursery.start_soon(...).

Its public members are mostly useful for introspection and debugging:

name

String containing this Task’s name. Usually the name of the function this Task is running, but can be overridden by passing name= to start or start_soon.

coro

This task’s coroutine object. Example usage: extracting a stack trace:

import traceback

def walk_coro_stack(coro):
while coro is not None:
if hasattr(coro, "cr_frame"):
# A real coroutine
yield coro.cr_frame, coro.cr_frame.f_lineno
coro = coro.cr_await
else:
# A generator decorated with @types.coroutine
yield coro.gi_frame, coro.gi_frame.f_lineno
coro = coro.gi_yieldfrom

print("".join(ss.format()))

context

This task’s contextvars.Context object.

parent_nursery

The nursery this task is inside (or None if this is the “init” task).

Example use case: drawing a visualization of the task tree in a debugger.

child_nurseries

custom_sleep_data
Trio doesn’t assign this variable any meaning, except that it sets it to None whenever a task is rescheduled. It can be used to share data between the different tasks involved in putting a task to sleep and then waking it up again. (See wait_task_rescheduled() for details.)