Introspecting and extending Trio with trio.hazmat
¶
Warning
You probably don’t want to use this module.
trio.hazmat
is Trio’s “hazardous materials” layer: it contains
APIs useful for introspecting and extending Trio. If you’re writing
ordinary, everyday code, then you can ignore this module completely.
But sometimes you need something a bit lower level. Here are some
examples of situations where you should reach for trio.hazmat
:
- You want to implement a new synchronization primitive that Trio doesn’t (yet) provide, like a reader-writer lock.
- You want to extract low-level metrics to monitor the health of your application.
- You want to add support for a low-level operating system interface that Trio doesn’t (yet) expose, like watching a filesystem directory for changes.
- You want to implement an interface for calling between Trio and another event loop within the same process.
- You’re writing a debugger and want to visualize Trio’s task tree.
- You need to interoperate with a C library whose API exposes raw file descriptors.
Using trio.hazmat
isn’t really that hazardous; in fact you’re
already using it – it’s how most of the functionality described in
previous chapters is implemented. The APIs described here have
strictly defined and carefully documented semantics, and are perfectly
safe – if you read carefully and take proper precautions. Some of
those strict semantics have nasty big pointy teeth. If you make a
mistake, Trio may not be able to handle it gracefully; conventions and
guarantees that are followed strictly in the rest of Trio do not
always apply. Using this module makes it your responsibility to think
through and handle the nasty cases to expose a friendly Trio-style API
to your users.
Debugging and instrumentation¶
Trio tries hard to provide useful hooks for debugging and
instrumentation. Some are documented above (the nursery introspection
attributes, trio.Queue.statistics()
, etc.). Here are some more.
Global statistics¶
-
trio.hazmat.
current_statistics
()¶ Returns an object containing run-loop-level debugging information.
Currently the following fields are defined:
tasks_living
(int): The number of tasks that have been spawned and not yet exited.tasks_runnable
(int): The number of tasks that are currently queued on the run queue (as opposed to blocked waiting for something to happen).seconds_to_next_deadline
(float): The time until the next pending cancel scope deadline. May be negative if the deadline has expired but we haven’t yet processed cancellations. May beinf
if there are no pending deadlines.run_sync_soon_queue_size
(int): The number of unprocessed callbacks queued viatrio.hazmat.TrioToken.run_sync_soon()
.io_statistics
(object): Some statistics from trio’s I/O backend. This always has an attributebackend
which is a string naming which operating-system-specific I/O backend is in use; the other attributes vary between backends.
Instrument API¶
The instrument API provides a standard way to add custom instrumentation to the run loop. Want to make a histogram of scheduling latencies, log a stack trace of any task that blocks the run loop for >50 ms, or measure what percentage of your process’s running time is spent waiting for I/O? This is the place.
The general idea is that at any given moment, trio.run()
maintains a set of “instruments”, which are objects that implement the
trio.abc.Instrument
interface. When an interesting event
happens, it loops over these instruments and notifies them by calling
an appropriate method. The tutorial has a simple example of
using this for tracing.
Since this hooks into trio at a rather low level, you do have to be
careful. The callbacks are run synchronously, and in many cases if
they error out then there isn’t any plausible way to propagate this
exception (for instance, we might be deep in the guts of the exception
propagation machinery…). Therefore our current strategy for handling
exceptions raised by instruments is to (a) log an exception to the
"trio.abc.Instrument"
logger, which by default prints a stack
trace to standard error and (b) disable the offending instrument.
You can register an initial list of instruments by passing them to
trio.run()
. add_instrument()
and
remove_instrument()
let you add and remove instruments at
runtime.
-
trio.hazmat.
add_instrument
(instrument)¶ Start instrumenting the current run loop with the given instrument.
Parameters: instrument (trio.abc.Instrument) – The instrument to activate. If
instrument
is already active, does nothing.
-
trio.hazmat.
remove_instrument
(instrument)¶ Stop instrumenting the current run loop with the given instrument.
Parameters: instrument (trio.abc.Instrument) – The instrument to de-activate. Raises: KeyError
– if the instrument is not currently active. This could occur either because you never added it, or because you added it and then it raised an unhandled exception and was automatically deactivated.
And here’s the interface to implement if you want to build your own
Instrument
:
-
class
trio.abc.
Instrument
¶ The interface for run loop instrumentation.
Instruments don’t have to inherit from this abstract base class, and all of these methods are optional. This class serves mostly as documentation.
-
after_io_wait
(timeout)¶ Called after handling pending I/O.
Parameters: timeout (float) – The number of seconds we were willing to wait. This much time may or may not have elapsed, depending on whether any I/O was ready.
-
after_run
()¶ Called just before
trio.run()
returns.
-
after_task_step
(task)¶ Called when we return to the main run loop after a task has yielded.
Parameters: task (trio.hazmat.Task) – The task that just ran.
-
before_io_wait
(timeout)¶ Called before blocking to wait for I/O readiness.
Parameters: timeout (float) – The number of seconds we are willing to wait.
-
before_run
()¶ Called at the beginning of
trio.run()
.
-
before_task_step
(task)¶ Called immediately before we resume running the given task.
Parameters: task (trio.hazmat.Task) – The task that is about to run.
-
task_exited
(task)¶ Called when the given task exits.
Parameters: task (trio.hazmat.Task) – The finished task.
-
task_scheduled
(task)¶ Called when the given task becomes runnable.
It may still be some time before it actually runs, if there are other runnable tasks ahead of it.
Parameters: task (trio.hazmat.Task) – The task that became runnable.
-
task_spawned
(task)¶ Called when the given task is created.
Parameters: task (trio.hazmat.Task) – The new task.
-
The tutorial has a fully-worked example of defining a custom instrument to log trio’s internal scheduling decisions.
Low-level I/O primitives¶
Different environments expose different low-level APIs for performing
async I/O. trio.hazmat
exposes these APIs in a relatively
direct way, so as to allow maximum power and flexibility for higher
level code. However, this means that the exact API provided may vary
depending on what system trio is running on.
Universally available API¶
All environments provide the following functions:
-
await
trio.hazmat.
wait_socket_readable
(sock)¶ Block until the given
socket.socket()
object is readable.The given object must be exactly of type
socket.socket()
, nothing else.Raises: - TypeError – if the given object is not of type
socket.socket()
. - trio.ResourceBusyError – if another task is already waiting for the given socket to become readable.
- TypeError – if the given object is not of type
-
await
trio.hazmat.
wait_socket_writable
(sock)¶ Block until the given
socket.socket()
object is writable.The given object must be exactly of type
socket.socket()
, nothing else.Raises: - TypeError – if the given object is not of type
socket.socket()
. - trio.ResourceBusyError – if another task is already waiting for the given socket to become writable.
- TypeError – if the given object is not of type
Unix-specific API¶
Unix-like systems provide the following functions:
-
await
trio.hazmat.
wait_readable
(fd)¶ Block until the given file descriptor is readable.
Warning
This is “readable” according to the operating system’s definition of readable. In particular, it probably won’t tell you anything useful for on-disk files.
Parameters: fd – integer file descriptor, or else an object with a fileno()
methodRaises: trio.ResourceBusyError – if another task is already waiting for the given fd to become readable.
-
await
trio.hazmat.
wait_writable
(fd)¶ Block until the given file descriptor is writable.
Warning
This is “writable” according to the operating system’s definition of writable. In particular, it probably won’t tell you anything useful for on-disk files.
Parameters: fd – integer file descriptor, or else an object with a fileno()
methodRaises: trio.ResourceBusyError – if another task is already waiting for the given fd to become writable.
Unbounded queues¶
In the section Passing messages with Queue, we showed an example with two producers
and one consumer using the same queue, where the queue size would grow
without bound to produce unbounded latency and memory usage.
trio.Queue
avoids this by placing an upper bound on how big
the queue can get before put
starts blocking. But what if you’re
in a situation where put
can’t block?
There is another option: the queue consumer could get greedy. Each
time it runs, it could eagerly consume all of the pending items before
allowing another task to run. (In some other systems, this would
happen automatically because their queue’s get
method doesn’t
invoke the scheduler unless it has to block. But in trio, get is
always a checkpoint.) This works, but it’s a bit
risky: basically instead of applying backpressure to specifically the
producer tasks, we’re applying it to all the tasks in our system.
The danger here is that if enough items have built up in the queue,
then “stopping the world” to process them all may cause unacceptable
latency spikes in unrelated tasks. Nonetheless, this is still the
right choice in situations where it’s impossible to apply backpressure
more precisely. So this is the strategy implemented by
UnboundedQueue
. The main time you should use this is when
working with low-level APIs like monitor_kevent()
.
-
class
trio.hazmat.
UnboundedQueue
¶ An unbounded queue suitable for certain unusual forms of inter-task communication.
This class is designed for use as a queue in cases where the producer for some reason cannot be subjected to back-pressure, i.e.,
put_nowait()
has to always succeed. In order to prevent the queue backlog from actually growing without bound, the consumer API is modified to dequeue items in “batches”. If a consumer task processes each batch without yielding, then this helps achieve (but does not guarantee) an effective bound on the queue’s memory use, at the cost of potentially increasing system latencies in general. You should generally prefer to use atrio.Queue
instead if you can.Currently each batch completely empties the queue, but this may change in the future.
A
UnboundedQueue
object can be used as an asynchronous iterator, where each iteration returns a new batch of items. I.e., these two loops are equivalent:async for batch in queue: ... while True: obj = await queue.get_batch() ...
-
empty
()¶ Returns True if the queue is empty, False otherwise.
There is some subtlety to interpreting this method’s return value: see issue #63.
-
await
get_batch
()¶ Get the next batch from the queue, blocking as necessary.
Returns: - A list of dequeued items, in order. This list is always
- non-empty.
Return type: list
-
get_batch_nowait
()¶ Attempt to get the next batch from the queue, without blocking.
Returns: - A list of dequeued items, in order. On a successful call this
- list is always non-empty; if it would be empty we raise
WouldBlock
instead.
Return type: list Raises: WouldBlock
– if the queue is empty.
-
put_nowait
(obj)¶ Put an object into the queue, without blocking.
This always succeeds, because the queue is unbounded. We don’t provide a blocking
put
method, because it would never need to block.Parameters: obj (object) – The object to enqueue.
-
qsize
()¶ Returns the number of items currently in the queue.
-
statistics
()¶ Return an object containing debugging information.
Currently the following fields are defined:
qsize
: The number of items currently in the queue.tasks_waiting
: The number of tasks blocked on this queue’sget_batch()
method.
-
Global state: system tasks and run-local storage¶
-
class
trio.hazmat.
RunLocal
(**kwargs)¶ Run-local storage.
RunLocal
objects are very similar totrio.TaskLocal
objects, except that attributes are shared across all the tasks within a single call totrio.run()
. They’re also very similar tothreading.local
objects, except thatRunLocal
objects are automatically wiped clean whentrio.run()
returns.
-
trio.hazmat.
spawn_system_task
(async_fn, *args, name=None)¶ Spawn a “system” task.
System tasks have a few differences from regular tasks:
- They don’t need an explicit nursery; instead they go into the internal “system nursery”.
- If a system task raises an exception, then it’s converted into a
TrioInternalError
and all tasks are cancelled. If you write a system task, you should be careful to make sure it doesn’t crash. - System tasks are automatically cancelled when the main task exits.
- By default, system tasks have
KeyboardInterrupt
protection enabled. If you want your task to be interruptible by control-C, then you need to usedisable_ki_protection()
explicitly.
Parameters: - async_fn – An async callable.
- args – Positional arguments for
async_fn
. If you want to pass keyword arguments, usefunctools.partial()
. - name – The name for this task. Only used for debugging/introspection
(e.g.
repr(task_obj)
). If this isn’t a string,spawn_system_task()
will try to make it one. A common use case is if you’re wrapping a function before spawning a new task, you might pass the original function as thename=
to make debugging easier.
Returns: the newly spawned task
Return type:
Trio tokens¶
-
class
trio.hazmat.
TrioToken
¶ An opaque object representing a single call to
trio.run()
.It has no public constructor; instead, see
current_trio_token()
.This object has two uses:
- It lets you re-enter the Trio run loop from external threads or signal
handlers. This is the low-level primitive that
trio.run_sync_in_worker_thread()
uses to receive results from worker threads, thattrio.catch_signals()
uses to receive notifications about signals, and so forth. - Each call to
trio.run()
has exactly one associatedTrioToken
object, so you can use it to identify a particular call.
-
run_sync_soon
(sync_fn, *args, idempotent=False)¶ Schedule a call to
sync_fn(*args)
to occur in the context of a trio task.This is safe to call from the main thread, from other threads, and from signal handlers. This is the fundamental primitive used to re-enter the Trio run loop from outside of it.
The call will happen “soon”, but there’s no guarantee about exactly when, and no mechanism provided for finding out when it’s happened. If you need this, you’ll have to build your own.
The call is effectively run as part of a system task (see
spawn_system_task()
). In particular this means that:KeyboardInterrupt
protection is enabled by default; if you wantsync_fn
to be interruptible by control-C, then you need to usedisable_ki_protection()
explicitly.- If
sync_fn
raises an exception, then it’s converted into aTrioInternalError
and all tasks are cancelled. You should be careful thatsync_fn
doesn’t crash.
All calls with
idempotent=False
are processed in strict first-in first-out order.If
idempotent=True
, thensync_fn
andargs
must be hashable, and trio will make a best-effort attempt to discard any call submission which is equal to an already-pending call. Trio will make an attempt to process these in first-in first-out order, but no guarantees. (Currently processing is FIFO on CPython 3.6 and PyPy, but not CPython 3.5.)Any ordering guarantees apply separately to
idempotent=False
andidempotent=True
calls; there’s no rule for how calls in the different categories are ordered with respect to each other.Raises: trio.RunFinishedError – if the associated call to trio.run()
has already exited. (Any call that doesn’t raise this error is guaranteed to be fully processed beforetrio.run()
exits.)
- It lets you re-enter the Trio run loop from external threads or signal
handlers. This is the low-level primitive that
-
trio.hazmat.
current_trio_token
()¶ Retrieve the
TrioToken
for the current call totrio.run()
.
Safer KeyboardInterrupt handling¶
Trio’s handling of control-C is designed to balance usability and
safety. On the one hand, there are sensitive regions (like the core
scheduling loop) where it’s simply impossible to handle arbitrary
KeyboardInterrupt
exceptions while maintaining our core
correctness invariants. On the other, if the user accidentally writes
an infinite loop, we do want to be able to break out of that. Our
solution is to install a default signal handler which checks whether
it’s safe to raise KeyboardInterrupt
at the place where the
signal is received. If so, then we do; otherwise, we schedule a
KeyboardInterrupt
to be delivered to the main task at the next
available opportunity (similar to how Cancelled
is
delivered).
So that’s great, but – how do we know whether we’re in one of the sensitive parts of the program or not?
This is determined on a function-by-function basis. By default, a function is protected if its caller is, and not if its caller isn’t; this is helpful because it means you only need to override the defaults at places where you transition from protected code to unprotected code or vice-versa.
These transitions are accomplished using two function decorators:
-
@
trio.hazmat.
disable_ki_protection
¶ Decorator that marks the given regular function, generator function, async function, or async generator function as unprotected against
KeyboardInterrupt
, i.e., the code inside this function can be rudely interrupted byKeyboardInterrupt
at any moment.If you have multiple decorators on the same function, then this should be at the bottom of the stack (closest to the actual function).
An example of where you’d use this is in implementing something like
run_in_trio_thread
, which usescall_soon_thread_and_signal_safe
to get into the trio thread.call_soon_thread_and_signal_safe
callbacks are run withKeyboardInterrupt
protection enabled, andrun_in_trio_thread
takes advantage of this to safely set up the machinery for sending a response back to the original thread, and then usesdisable_ki_protection()
when entering the user-provided function.
-
@
trio.hazmat.
enable_ki_protection
¶ Decorator that marks the given regular function, generator function, async function, or async generator function as protected against
KeyboardInterrupt
, i.e., the code inside this function won’t be rudely interrupted byKeyboardInterrupt
. (Though if it contains any checkpoints, then it can still receiveKeyboardInterrupt
at those. This is considered a polite interruption.)Warning
Be very careful to only use this decorator on functions that you know will either exit in bounded time, or else pass through a checkpoint regularly. (Of course all of your functions should have this property, but if you mess it up here then you won’t even be able to use control-C to escape!)
If you have multiple decorators on the same function, then this should be at the bottom of the stack (closest to the actual function).
An example of where you’d use this is on the
__exit__
implementation for something like aLock
, where a poorly-timedKeyboardInterrupt
could leave the lock in an inconsistent state and cause a deadlock.
-
trio.hazmat.
currently_ki_protected
()¶ Check whether the calling code has
KeyboardInterrupt
protection enabled.It’s surprisingly easy to think that one’s
KeyboardInterrupt
protection is enabled when it isn’t, or vice-versa. This function tells you what trio thinks of the matter, which makes it useful forassert
s and unit tests.Returns: True if protection is enabled, and False otherwise. Return type: bool
Result objects¶
Trio provides some simple classes for representing the result of a Python function call, so that it can be passed around. The basic rule is:
result = Result.capture(f, *args)
x = result.unwrap()
is the same as:
x = f(*args)
even if f
raises an error. And there’s also
Result.acapture()
, which is like await f(*args)
.
There’s nothing really dangerous about this system – it’s actually
very general and quite handy! But mostly it’s used for things like
implementing trio.run_sync_in_worker_thread()
, or for getting
values to pass to reschedule()
, so we put it in
trio.hazmat
to avoid cluttering up the main API.
Since Result
objects are simple immutable data structures
that don’t otherwise interact with the trio machinery, it’s safe to
create and access Result
objects from any thread you like.
-
class
trio.hazmat.
Result
¶ An abstract class representing the result of a Python computation.
This class has two concrete subclasses:
Value
representing a value, andError
representing an exception.In addition to the methods described below, comparison operators on
Value
andError
objects (==
,<
, etc.) check that the other object is also aValue
orError
object respectively, and then compare the contained objects.Result
objects are hashable if the contained objects are hashable.-
staticmethod await
acapture
(async_fn, *args)¶ Run
await async_fn(*args)
and capture the result.Returns: Either a Value
orError
as appropriate.
-
abstractmethod await
asend
(agen)¶ Send or throw the contained value or exception into the given async generator object.
Parameters: agen – An async generator object supporting .asend()
and.athrow()
methods.
-
staticmethod
capture
(sync_fn, *args)¶ Run
sync_fn(*args)
and capture the result.Returns: Either a Value
orError
as appropriate.
-
abstractmethod
send
(gen)¶ Send or throw the contained value or exception into the given generator object.
Parameters: gen – A generator object supporting .send()
and.throw()
methods.
-
abstractmethod
unwrap
()¶ Return or raise the contained value or exception.
These two lines of code are equivalent:
x = fn(*args) x = Result.capture(fn, *args).unwrap()
-
staticmethod await
Sleeping and waking¶
Wait queue abstraction¶
-
class
trio.hazmat.
ParkingLot
¶ A fair wait queue with cancellation and requeueing.
This class encapsulates the tricky parts of implementing a wait queue. It’s useful for implementing higher-level synchronization primitives like queues and locks.
In addition to the methods below, you can use
len(parking_lot)
to get the number of parked tasks, andif parking_lot: ...
to check whether there are any parked tasks.-
await
park
()¶ Park the current task until woken by a call to
unpark()
orunpark_all()
.
-
repark
(new_lot, *, count=1)¶ Move parked tasks from one
ParkingLot
object to another.This dequeues
count
tasks from one lot, and requeues them on another, preserving order. For example:async def parker(lot): print("sleeping") await lot.park() print("woken") async def main(): lot1 = trio.hazmat.ParkingLot() lot2 = trio.hazmat.ParkingLot() async with trio.open_nursery() as nursery: nursery.start_soon(lot1) await trio.testing.wait_all_tasks_blocked() assert len(lot1) == 1 assert len(lot2) == 0 lot1.repark(lot2) assert len(lot1) == 0 assert len(lot2) == 1 # This wakes up the task that was originally parked in lot1 lot2.unpark()
If there are fewer than
count
tasks parked, then reparks as many tasks as are available and then returns successfully.Parameters: - new_lot (ParkingLot) – the parking lot to move tasks to.
- count (int) – the number of tasks to move.
-
repark_all
(new_lot)¶ Move all parked tasks from one
ParkingLot
object to another.See
repark()
for details.
-
statistics
()¶ Return an object containing debugging information.
Currently the following fields are defined:
tasks_waiting
: The number of tasks blocked on this lot’spark()
method.
-
unpark
(*, count=1)¶ Unpark one or more tasks.
This wakes up
count
tasks that are blocked inpark()
. If there are fewer thancount
tasks parked, then wakes as many tasks are available and then returns successfully.Parameters: count (int) – the number of tasks to unpark.
-
unpark_all
()¶ Unpark all parked tasks.
-
await
Low-level checkpoint functions¶
-
await
trio.hazmat.
checkpoint
()¶ A pure checkpoint.
This checks for cancellation and allows other tasks to be scheduled, without otherwise blocking.
Note that the scheduler has the option of ignoring this and continuing to run the current task if it decides this is appropriate (e.g. for increased efficiency).
Equivalent to
await trio.sleep(0)
(which is implemented by callingcheckpoint()
.)
The next two functions are used together to make up a checkpoint:
-
await
trio.hazmat.
checkpoint_if_cancelled
()¶ Issue a checkpoint if the calling context has been cancelled.
Equivalent to (but potentially more efficient than):
if trio.current_deadline() == -inf: await trio.hazmat.checkpoint()
This is either a no-op, or else it allow other tasks to be scheduled and then raises
trio.Cancelled
.Typically used together with
cancel_shielded_checkpoint()
.
-
await
trio.hazmat.
cancel_shielded_checkpoint
()¶ Introduce a schedule point, but not a cancel point.
This is not a checkpoint, but it is half of a checkpoint, and when combined with
checkpoint_if_cancelled()
it can make a full checkpoint.Equivalent to (but potentially more efficient than):
with trio.open_cancel_scope(shield=True): await trio.hazmat.checkpoint()
These are commonly used in cases where you have an operation that might-or-might-not block, and you want to implement trio’s standard checkpoint semantics. Example:
async def operation_that_maybe_blocks():
await checkpoint_if_cancelled()
try:
ret = attempt_operation()
except BlockingIOError:
# need to block and then retry, which we do below
pass
except:
# some other error, finish the checkpoint then let it propagate
await cancel_shielded_checkpoint()
raise
else:
# operation succeeded, finish the checkpoint then return
await cancel_shielded_checkpoint()
return ret
while True:
await wait_for_operation_to_be_ready()
try:
return attempt_operation()
except BlockingIOError:
pass
This logic is a bit convoluted, but accomplishes all of the following:
- Every execution path passes through a checkpoint (assuming that
wait_for_operation_to_be_ready
is an unconditional checkpoint) - Our cancellation semantics say that
Cancelled
should only be raised if the operation didn’t happen. Usingcancel_shielded_checkpoint()
on the early-exit branches accomplishes this. - On the path where we do end up blocking, we don’t pass through any schedule points before that, which avoids some unnecessary work.
- Avoids implicitly chaining the
BlockingIOError
with any errors raised byattempt_operation
orwait_for_operation_to_be_ready
, by keeping thewhile True:
loop outside of theexcept BlockingIOError:
block.
These functions can also be useful in other situations. For example,
when trio.run_sync_in_worker_thread()
schedules some work to run
in a worker thread, it blocks until the work is finished (so it’s a
schedule point), but by default it doesn’t allow cancellation. So to
make sure that the call always acts as a checkpoint, it calls
checkpoint_if_cancelled()
before starting the thread.
Low-level blocking¶
-
await
trio.hazmat.
wait_task_rescheduled
(abort_func)¶ Put the current task to sleep, with cancellation support.
This is the lowest-level API for blocking in trio. Every time a
Task
blocks, it does so by calling this function (usually indirectly via some higher-level API).This is a tricky interface with no guard rails. If you can use
ParkingLot
or the built-in I/O wait functions instead, then you should.Generally the way it works is that before calling this function, you make arrangements for “someone” to call
reschedule()
on the current task at some later point.Then you call
wait_task_rescheduled()
, passing inabort_func
, an “abort callback”.(Terminology: in trio, “aborting” is the process of attempting to interrupt a blocked task to deliver a cancellation.)
There are two possibilities for what happens next:
“Someone” calls
reschedule()
on the current task, andwait_task_rescheduled()
returns or raises whatever value or error was passed toreschedule()
.The call’s context transitions to a cancelled state (e.g. due to a timeout expiring). When this happens, the
abort_func
is called. It’s interface looks like:def abort_func(raise_cancel): ... return trio.hazmat.Abort.SUCCEEDED # or FAILED
It should attempt to clean up any state associated with this call, and in particular, arrange that
reschedule()
will not be called later. If (and only if!) it is successful, then it should returnAbort.SUCCEEDED
, in which case the task will automatically be rescheduled with an appropriateCancelled
error.Otherwise, it should return
Abort.FAILED
. This means that the task can’t be cancelled at this time, and still has to make sure that “someone” eventually callsreschedule()
.At that point there are again two possibilities. You can simply ignore the cancellation altogether: wait for the operation to complete and then reschedule and continue as normal. (For example, this is what
trio.run_sync_in_worker_thread()
does if cancellation is disabled.) The other possibility is that theabort_func
does succeed in cancelling the operation, but for some reason isn’t able to report that right away. (Example: on Windows, it’s possible to request that an async (“overlapped”) I/O operation be cancelled, but this request is also asynchronous – you don’t find out until later whether the operation was actually cancelled or not.) To report a delayed cancellation, then you should reschedule the task yourself, and call theraise_cancel
callback passed toabort_func
to raise aCancelled
(or possiblyKeyboardInterrupt
) exception into this task. Either of the approaches sketched below can work:# Option 1: # Catch the exception from raise_cancel and inject it into the task. # (This is what trio does automatically for you if you return # Abort.SUCCEEDED.) trio.hazmat.reschedule(task, Result.capture(raise_cancel)) # Option 2: # wait to be woken by "someone", and then decide whether to raise # the error from inside the task. outer_raise_cancel = None def abort(inner_raise_cancel): nonlocal outer_raise_cancel outer_raise_cancel = inner_raise_cancel TRY_TO_CANCEL_OPERATION() return trio.hazmat.Abort.FAILED await wait_task_rescheduled(abort) if OPERATION_WAS_SUCCESSFULLY_CANCELLED: # raises the error outer_raise_cancel()
In any case it’s guaranteed that we only call the
abort_func
at most once per call towait_task_rescheduled()
.
Warning
If your
abort_func
raises an error, or returns any value other thanAbort.SUCCEEDED
orAbort.FAILED
, then trio will crash violently. Be careful! Similarly, it is entirely possible to deadlock a trio program by failing to reschedule a blocked task, or cause havoc by callingreschedule()
too many times. Remember what we said up above about how you should use a higher-level API if at all possible?
-
class
trio.hazmat.
Abort
¶ enum.Enum
used as the return value from abort functions.See
wait_task_rescheduled()
for details.
-
trio.hazmat.
reschedule
(task, next_send=Value(None))¶ Reschedule the given task with the given
Result
.See
wait_task_rescheduled()
for the gory details.There must be exactly one call to
reschedule()
for every call towait_task_rescheduled()
. (And when counting, keep in mind that returningAbort.SUCCEEDED
from an abort callback is equivalent to callingreschedule()
once.)Parameters: - task (trio.hazmat.Task) – the task to be rescheduled. Must be blocked
in a call to
wait_task_rescheduled()
. - next_send (trio.hazmat.Result) – the value (or error) to return (or
raise) from
wait_task_rescheduled()
.
- task (trio.hazmat.Task) – the task to be rescheduled. Must be blocked
in a call to
Here’s an example lock class implemented using
wait_task_rescheduled()
directly. This implementation has a
number of flaws, including lack of fairness, O(n) cancellation,
missing error checking, failure to insert a checkpoint on the
non-blocking path, etc. If you really want to implement your own lock,
then you should study the implementation of trio.Lock
and use
ParkingLot
, which handles some of these issues for you. But
this does serve to illustrate the basic structure of the
wait_task_rescheduled()
API:
class NotVeryGoodLock:
def __init__(self):
self._blocked_tasks = collections.deque()
self._held = False
async def acquire(self):
while self._held:
task = trio.current_task()
self._blocked_tasks.append(task)
def abort_fn(_):
self._blocked_tasks.remove(task)
return trio.hazmat.Abort.SUCCEEDED
await trio.hazmat.wait_task_rescheduled(abort_fn)
self._held = True
def release(self):
self._held = False
if self._blocked_tasks:
woken_task = self._blocked_tasks.popleft()
trio.hazmat.reschedule(woken_task)
Task API¶
-
trio.hazmat.
current_task
()¶ Return the
Task
object representing the current task.Returns: the Task
that calledcurrent_task()
.Return type: Task
-
class
trio.hazmat.
Task
¶ A
Task
object represents a concurrent “thread” of execution. It has no public constructor; Trio internally creates aTask
object for each call tonursery.start(...)
ornursery.start_soon(...)
.Its public members are mostly useful for introspection and debugging:
-
name
¶ String containing this
Task
’s name. Usually the name of the function thisTask
is running, but can be overridden by passingname=
tostart
orstart_soon
.
-
coro
¶ This task’s coroutine object. Example usage: extracting a stack trace:
import traceback def walk_coro_stack(coro): while coro is not None: if hasattr(coro, "cr_frame"): # A real coroutine yield coro.cr_frame, coro.cr_frame.f_lineno coro = coro.cr_await else: # A generator decorated with @types.coroutine yield coro.gi_frame, coro.gi_frame.f_lineno coro = coro.gi_yieldfrom def print_stack_for_task(task): ss = traceback.StackSummary.extract(walk_coro_stack(task.coro)) print("".join(ss.format()))
-
parent_nursery
¶ The nursery this task is inside (or None if this is the “init” take).
Example use case: drawing a visualization of the task tree in a debugger.
-
child_nurseries
¶ The nurseries this task contains.
This is a list, with outer nurseries before inner nurseries.
-