Low-level operations in trio.hazmat

DANGER DANGER DANGER

You probably don’t want to use this module.

The API defined here is public and stable (at least as much as anything in trio is stable given its pre-1.0 status), but it has nasty big pointy teeth. Mistakes may not be handled gracefully; rules and conventions that are followed strictly in the rest of trio do not always apply. Read and tread carefully.

But if you find yourself needing to, for example, implement new synchronization primitives or expose new low-level I/O functionality, then you’re in the right place.

Low-level I/O primitives

Different environments expose different low-level APIs for performing async I/O. trio.hazmat attempts to expose these APIs in a relatively direct way, so as to allow maximum power and flexibility for higher level code. However, this means that the exact API provided may vary depending on what system trio is running on.

Universally available API

All environments provide the following functions:

await trio.hazmat.wait_socket_readable(sock)

Block until the given socket.socket() object is readable.

The given object must be exactly of type socket.socket(), nothing else.

Raises:
await trio.hazmat.wait_socket_writable(sock)

Block until the given socket.socket() object is writable.

The given object must be exactly of type socket.socket(), nothing else.

Raises:

Unix-specific API

Unix-like systems provide the following functions:

await trio.hazmat.wait_readable(fd)

Block until the given file descriptor is readable.

Warning

This is “readable” according to the operating system’s definition of readable. In particular, it probably won’t tell you anything useful for on-disk files.

Parameters:fd – integer file descriptor, or else an object with a fileno() method
Raises:RuntimeError – if another task is already waiting for the given fd to become readable.
await trio.hazmat.wait_writable(fd)

Block until the given file descriptor is writable.

Warning

This is “writable” according to the operating system’s definition of writable. In particular, it probably won’t tell you anything useful for on-disk files.

Parameters:fd – integer file descriptor, or else an object with a fileno() method
Raises:RuntimeError – if another task is already waiting for the given fd to become writable.

Kqueue-specific API

TODO: these are currently more of a sketch than anything real. See #26.

trio.hazmat.current_kqueue()
await trio.hazmat.wait_kevent(ident, filter, abort_func)
with trio.hazmat.monitor_kevent(ident, filter) as queue

Windows-specific API

TODO: these are currently more of a sketch than anything real. See #26 and #52.

trio.hazmat.register_with_iocp(handle)
await trio.hazmat.wait_overlapped(handle, lpOverlapped)
trio.hazmat.current_iocp()
with trio.hazmat.monitor_completion_key() as queue

System tasks

trio.hazmat.spawn_system_task(async_fn, *args, name=None)

Spawn a “system” task.

System tasks have a few differences from regular tasks:

  • They don’t need an explicit nursery; instead they go into the internal “system nursery”.
  • If a system task raises an exception, then it’s converted into a TrioInternalError and all tasks are cancelled. If you write a system task, you should be careful to make sure it doesn’t crash.
  • System tasks are automatically cancelled when the main task exits.
  • By default, system tasks have KeyboardInterrupt protection enabled. If you want your task to be interruptible by control-C, then you need to use disable_ki_protection() explicitly.
Parameters:
  • async_fn – An async callable.
  • args – Positional arguments for async_fn. If you want to pass keyword arguments, use functools.partial().
  • name – The name for this task. Only used for debugging/introspection (e.g. repr(task_obj)). If this isn’t a string, spawn_system_task() will try to make it one. A common use case is if you’re wrapping a function before spawning a new task, you might pass the original function as the name= to make debugging easier.
Returns:

the newly spawned task

Return type:

Task

Entering trio from external threads or signal handlers

trio.hazmat.current_call_soon_thread_and_signal_safe()

Returns a reference to the call_soon_thread_and_signal_safe function for the current trio run:

call_soon_thread_and_signal_safe(sync_fn, *args, idempotent=False)

Schedule a call to sync_fn(*args) to occur in the context of a trio task. This is safe to call from the main thread, from other threads, and from signal handlers.

The call is effectively run as part of a system task (see spawn_system_task()). In particular this means that:

  • KeyboardInterrupt protection is enabled by default; if you want sync_fn to be interruptible by control-C, then you need to use disable_ki_protection() explicitly.
  • If sync_fn raises an exception, then it’s converted into a TrioInternalError and all tasks are cancelled. You should be careful that sync_fn doesn’t crash.

All calls with idempotent=False are processed in strict first-in first-out order.

If idempotent=True, then sync_fn and args must be hashable, and trio will make a best-effort attempt to discard any call submission which is equal to an already-pending call. Trio will make an attempt to process these in first-in first-out order, but no guarantees. (Currently processing is FIFO on CPython 3.6 and PyPy, but not CPython 3.5.)

Any ordering guarantees apply separately to idempotent=False and idempotent=True calls; there’s no rule for how calls in the different categories are ordered with respect to each other.

Raises:trio.RunFinishedError – if the associated call to trio.run() has already exited. (Any call that doesn’t raise this error is guaranteed to be fully processed before trio.run() exits.)

Safe KeyboardInterrupt handling

Trio’s handling of control-C is designed to balance usability and safety. On the one hand, there are sensitive regions (like the core scheduling loop) where it’s simply impossible to handle arbitrary KeyboardInterrupt exceptions while maintaining our core correctness invariants. On the other, if the user accidentally writes an infinite loop, we do want to be able to break out of that. Our solution is to install a default signal handler which checks whether it’s safe to raise KeyboardInterrupt at the place where the signal is received. If so, then we do; otherwise, we schedule a KeyboardInterrupt to be delivered to the main task at the next available opportunity (similar to how Cancelled is delivered).

So that’s great, but – how do we know whether we’re in one of the sensitive parts of the program or not?

This is determined on a function-by-function basis. By default, a function is protected if its caller is, and not if its caller isn’t; this is helpful because it means you only need to override the defaults at places where you transition from protected code to unprotected code or vice-versa.

These transitions are accomplished using two function decorators:

@trio.hazmat.disable_ki_protection

Decorator that marks the given regular function, generator function, async function, or async generator function as unprotected, i.e., the code inside this function can be rudely interrupted by KeyboardInterrupt at any moment.

If you have multiple decorators on the same function, then this should be at the bottom of the stack (closest to the actual function).

An example of where you’d use this is in implementing something like run_in_trio_thread, which uses call_soon_thread_and_signal_safe to get into the trio thread. call_soon_thread_and_signal_safe callbacks are run with KeyboardInterrupt protection enabled, and run_in_trio_thread takes advantage of this to safely set up the machinery for sending a response back to the original thread, and then uses disable_ki_protection() when entering the user-provided function.

@trio.hazmat.enable_ki_protection

Decorator that marks the given regular function, generator function, async function, or async generator function as unprotected, i.e., the code inside this function won’t be rudely interrupted by KeyboardInterrupt at any moment. (Though if it contains any yield points, then it can still receive KeyboardInterrupt at those.)

Be very careful to only use this decorator on functions that you know will run in bounded time.

If you have multiple decorators on the same function, then this should be at the bottom of the stack (closest to the actual function).

An example of where you’d use this is on the __exit__ implementation for something like a Lock, where a poorly-timed KeyboardInterrupt could leave the lock in an inconsistent state and cause a deadlock.

trio.hazmat.currently_ki_protected()

Check whether the calling code has KeyboardInterrupt protection enabled.

It’s surprisingly easy to think that one’s KeyboardInterrupt protection is enabled when it isn’t, or vice-versa. This function tells you what trio thinks of the matter, which makes it useful for asserts and unit tests.

Returns:True if protection is enabled, and False otherwise.
Return type:bool

Sleeping and waking

Wait queue abstraction

class trio.hazmat.ParkingLot

A fair wait queue with cancellation and requeueing.

This class encapsulates the tricky parts of implementing a wait queue. It’s useful for implementing higher-level synchronization primitives like queues and locks.

In addition to the methods below, you can use len(parking_lot) to get the number of parked tasks, and if parking_lot: ... to check whether there are any parked tasks.

await park()

Park the current task until woken by a call to unpark() or unpark_all().

await unpark(*, count=1)

Unpark one or more tasks.

This wakes up count tasks that are blocked in park(). If there are fewer than count tasks parked, then wakes as many tasks are available and then returns successfully.

Parameters:count (int) – the number of tasks to unpark.
await unpark_all()

Unpark all parked tasks.

await repark(new_lot, *, count=1)

Move parked tasks from one ParkingLot object to another.

This dequeues count tasks from one lot, and requeues them on another, preserving order. For example:

async def parker(lot):
    print("sleeping")
    await lot.park()
    print("woken")

async def main():
    lot1 = trio.hazmat.ParkingLot()
    lot2 = trio.hazmat.ParkingLot()
    async with trio.open_nursery() as nursery:
        nursery.spawn(lot1)
        await trio.testing.wait_all_tasks_blocked()
        assert len(lot1) == 1
        assert len(lot2) == 0
        lot1.repark(lot2)
        assert len(lot1) == 0
        assert len(lot2) == 1
        # This wakes up the task that was originally parked in lot1
        lot2.unpark()

If there are fewer than count tasks parked, then reparks as many tasks as are available and then returns successfully.

Parameters:
  • new_lot (ParkingLot) – the parking lot to move tasks to.
  • count (int) – the number of tasks to move.
await repark_all(new_lot)

Move all parked tasks from one ParkingLot object to another.

See repark() for details.

await statistics()

Return an object containing debugging information.

Currently the following fields are defined:

  • tasks_waiting: The number of tasks blocked on this lot’s park() method.

Inserting yield points

await trio.hazmat.yield_briefly()

A pure yield point.

This checks for cancellation and allows other tasks to be scheduled, without otherwise blocking.

Note that the scheduler has the option of ignoring this and continuing to run the current task if it decides this is appropriate (e.g. for increased efficiency).

Equivalent to await trio.sleep(0) (which is implemented by calling yield_briefly().)

The next two functions are used together to make up a yield point:

await trio.hazmat.yield_if_cancelled()

A conditional yield point.

If a cancellation is active, then allows other tasks to be scheduled, and then raises trio.Cancelled.

await trio.hazmat.yield_briefly_no_cancel()

Introduce a schedule point, but not a cancel point.

These are commonly used in cases where we have an operation that might-or-might-not block, and we want to implement trio’s standard yield point semantics. Example:

async def operation_that_maybe_blocks():
    await yield_if_cancelled()
    try:
        ret = attempt_operation()
    except BlockingIOError:
        # need to block and then retry, which we do below
        pass
    except:
        # some other error, finish the yield point then let it propagate
        await yield_briefly_no_cancel()
        raise
    else:
        # operation succeeded, finish the yield point then return
        await yield_briefly_no_cancel()
        return ret
    while True:
        await wait_for_operation_to_be_ready()
        try:
            return attempt_operation()
        except BlockingIOError:
            pass

This logic is a bit convoluted, but accomplishes all of the following:

  • Every execution path passes through a yield point (assuming that wait_for_operation_to_be_ready is an unconditional yield point)
  • Our cancellation semantics say that Cancelled should only be raised if the operation didn’t happen. Using yield_briefly_no_cancel() on the early-exit branches accomplishes this.
  • On the path where we do end up blocking, we don’t pass through any schedule points before that, which avoids some unnecessary work.
  • Avoids implicitly chaining the BlockingIOError with any errors raised by attempt_operation or wait_for_operation_to_be_ready, by keeping the while True: loop outside of the except BlockingIOError: block.

These functions can also be useful in other situations, e.g. if you’re going to call an uncancellable operation like trio.run_in_worker_thread() or (potentially) overlapped I/O operations on Windows, then you can call yield_if_cancelled() first to make sure that the whole thing is a yield point.

Low-level blocking

class trio.hazmat.Abort

enum.Enum used as the return value from abort functions.

See yield_indefinitely() for details.

SUCCEEDED
FAILED
trio.hazmat.reschedule(task, next_send=Value(None))

Reschedule the given task with the given Result.

See yield_indefinitely() for the gory details.

There must be exactly one call to reschedule() for every call to yield_indefinitely(). (And when counting, keep in mind that returning Abort.SUCCEEDED from an abort callback is equivalent to calling reschedule() once.)

Parameters:
await trio.hazmat.yield_indefinitely(abort_fn)

Put the current task to sleep, with cancellation support.

This is the lowest-level API for blocking in trio. Every time a Task blocks, it does so by calling this function.

This is a tricky interface with no guard rails. If you can use ParkingLot or the built-in I/O wait functions instead, then you should.

Generally the way it works is that before calling this function, you make arrangements for “someone” to call reschedule() on the current task at some later point.

Then you call yield_indefinitely(), passing in abort_fn, an “abort callback”.

(Terminology: in trio, “aborting” is the process of attempting to interrupt a blocked task to deliver a cancellation.)

There are two possibilities for what happens next:

  1. “Someone” calls reschedule() on the current task, and yield_indefinitely() returns or raises whatever value or error was passed to reschedule().

  2. The call’s context transitions to a cancelled state (e.g. due to a timeout expiring). When this happens, the abort_fn is called. It’s interface looks like:

    def abort_fn(raise_cancel):
        ...
        return trio.hazmat.Abort.SUCCEEDED  # or FAILED
    

    It should attempt to clean up any state associated with this call, and in particular, arrange that reschedule() will not be called later. If (and only if!) it is successful, then it should return Abort.SUCCEEDED, in which case the task will automatically be rescheduled with an appropriate Cancelled error.

    Otherwise, it should return Abort.FAILED. This means that the task can’t be cancelled at this time, and still has to make sure that “someone” eventually calls reschedule().

    At that point there are again two possibilities. You can simply ignore the cancellation altogether: wait for the operation to complete and then reschedule and continue as normal. (For example, this is what trio.run_in_worker_thread() does if cancellation is disabled.) The other possibility is that the abort_fn does succeed in cancelling the operation, but for some reason isn’t able to report that right away. (Example: on Windows, it’s possible to request that an async (“overlapped”) I/O operation be cancelled, but this request is also asynchronous – you don’t find out until later whether the operation was actually cancelled or not.) To report a delayed cancellation, then you should reschedule the task yourself, and call the raise_cancel callback passed to abort_fn to raise a Cancelled (or possibly KeyboardInterrupt) exception into this task. Either of the approaches sketched below can work:

    # Option 1:
    # Catch the exception from raise_cancel and inject it into the task.
    # (This is what trio does automatically for you if you return
    # Abort.SUCCEEDED.)
    trio.hazmat.reschedule(task, Result.capture(raise_cancel))
    
    # Option 2:
    # wait to be woken by "someone", and then decide whether to raise
    # the error from inside the task.
    outer_raise_cancel = None
    def abort(inner_raise_cancel):
        nonlocal outer_raise_cancel
        outer_raise_cancel = inner_raise_cancel
        TRY_TO_CANCEL_OPERATION()
        return trio.hazmat.Abort.FAILED
    await yield_indefinitely(abort)
    if OPERATION_WAS_SUCCESSFULLY_CANCELLED:
        # raises the error
        outer_raise_cancel()
    

    In any case it’s guaranteed that we only call the abort_fn at most once per call to yield_indefinitely().

Warning

If your abort_fn raises an error, or returns any value other than Abort.SUCCEEDED or Abort.FAILED, then trio will crash violently. Be careful! Similarly, it is entirely possible to deadlock a trio program by failing to reschedule a blocked task, or cause havoc by calling reschedule() too many times. Remember what we said up above about how you should use a higher-level API if at all possible?

Here’s an example lock class implemented using yield_indefinitely() directly. This implementation has a number of flaws, including lack of fairness, O(n) cancellation, missing error checking, failure to insert a yield point on the non-blocking path, etc. If you really want to implement your own lock, then you should study the implementation of trio.Lock and use ParkingLot, which handles some of these issues for you. But this does serve to illustrate the basic structure of the yield_indefinitely() API:

class NotVeryGoodLock:
    def __init__(self):
        self._blocked_tasks = collections.deque()
        self._held = False

    async def acquire(self):
        while self._held:
            task = trio.current_task()
            self._blocked_tasks.append(task)
            def abort_fn(_):
                self._blocked_tasks.remove(task)
                return trio.hazmat.Abort.SUCCEEDED
            await trio.hazmat.yield_indefinitely(abort_fn)
        self._held = True

    def release(self):
        self._held = False
        if self._blocked_tasks:
            woken_task = self._blocked_tasks.popleft()
            trio.hazmat.reschedule(woken_task)