Testing made easier with trio.testing
¶
The trio.testing
module provides various utilities to make it
easier to test Trio code. Unlike the other submodules in the
trio
namespace, trio.testing
is not automatically
imported when you do import trio
; you must import trio.testing
explicitly.
Test harness integration¶
- @trio.testing.trio_test¶
Time and timeouts¶
trio.testing.MockClock
is a Clock
with a
few tricks up its sleeve to help you efficiently test code involving
timeouts:
By default, it starts at time 0, and clock time only advances when you explicitly call
jump()
. This provides an extremely controllable clock for testing.You can set
rate
to 1.0 if you want it to start running in real time like a regular clock. You can stop and start the clock within a test. You can setrate
to 10.0 to make clock time pass at 10x real speed (so e.g.await trio.sleep(10)
returns after 1 second).But even more interestingly, you can set
autojump_threshold
to zero or a small value, and then it will watch the execution of the run loop, and any time things have settled down and everyone’s waiting for a timeout, it jumps the clock forward to that timeout. In many cases this allows natural-looking code involving timeouts to be automatically run at near full CPU utilization with no changes. (Thanks to fluxcapacitor for this awesome idea.)And of course these can be mixed and matched at will.
Regardless of these shenanigans, from “inside” Trio the passage of time still seems normal so long as you restrict yourself to Trio’s time functions (see Time and clocks). Below is an example demonstrating two different ways of making time pass quickly. Notice how in both cases, the two tasks keep a consistent view of reality and events happen in the expected order, despite being wildly divorced from real time:
# across-realtime.py
import time
import trio
import trio.testing
YEAR = 365 * 24 * 60 * 60 # seconds
async def task1():
start = trio.current_time()
print("task1: sleeping for 1 year")
await trio.sleep(YEAR)
duration = trio.current_time() - start
print(f"task1: woke up; clock says I've slept {duration / YEAR} years")
print("task1: sleeping for 1 year, 100 times")
for _ in range(100):
await trio.sleep(YEAR)
duration = trio.current_time() - start
print(f"task1: slept {duration / YEAR} years total")
async def task2():
start = trio.current_time()
print("task2: sleeping for 5 years")
await trio.sleep(5 * YEAR)
duration = trio.current_time() - start
print(f"task2: woke up; clock says I've slept {duration / YEAR} years")
print("task2: sleeping for 500 years")
await trio.sleep(500 * YEAR)
duration = trio.current_time() - start
print(f"task2: slept {duration / YEAR} years total")
async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(task1)
nursery.start_soon(task2)
def run_example(clock):
real_start = time.perf_counter()
trio.run(main, clock=clock)
real_duration = time.perf_counter() - real_start
print(f"Total real time elapsed: {real_duration} seconds")
print("Clock where time passes at 100 years per second:\n")
run_example(trio.testing.MockClock(rate=100 * YEAR))
print("\nClock where time automatically skips past the boring parts:\n")
run_example(trio.testing.MockClock(autojump_threshold=0))
Output:
Clock where time passes at 100 years per second:
task2: sleeping for 5 years
task1: sleeping for 1 year
task1: woke up; clock says I've slept 1.0365006048232317 years
task1: sleeping for 1 year, 100 times
task2: woke up; clock says I've slept 5.0572111969813704 years
task2: sleeping for 500 years
task1: slept 104.77677842136472 years total
task2: slept 505.25014589075 years total
Total real time elapsed: 5.053582429885864 seconds
Clock where time automatically skips past the boring parts:
task2: sleeping for 5 years
task1: sleeping for 1 year
task1: woke up; clock says I've slept 1.0 years
task1: sleeping for 1 year, 100 times
task2: woke up; clock says I've slept 5.0 years
task2: sleeping for 500 years
task1: slept 101.0 years total
task2: slept 505.0 years total
Total real time elapsed: 0.019298791885375977 seconds
- class trio.testing.MockClock(rate: float = 0.0, autojump_threshold: float = inf)¶
A user-controllable clock suitable for writing tests.
- Parameters:
autojump_threshold (float) – the initial
autojump_threshold
.
- rate¶
How many seconds of clock time pass per second of real time. Default is 0.0, i.e. the clock only advances through manuals calls to
jump()
or when theautojump_threshold
is triggered. You can assign to this attribute to change it.
- autojump_threshold¶
The clock keeps an eye on the run loop, and if at any point it detects that all tasks have been blocked for this many real seconds (i.e., according to the actual clock, not this clock), then the clock automatically jumps ahead to the run loop’s next scheduled timeout. Default is
math.inf
, i.e., to never autojump. You can assign to this attribute to change it.Basically the idea is that if you have code or tests that use sleeps and timeouts, you can use this to make it run much faster, totally automatically. (At least, as long as those sleeps/timeouts are happening inside Trio; if your test involves talking to external service and waiting for it to timeout then obviously we can’t help you there.)
You should set this to the smallest value that lets you reliably avoid “false alarms” where some I/O is in flight (e.g. between two halves of a socketpair) but the threshold gets triggered and time gets advanced anyway. This will depend on the details of your tests and test environment. If you aren’t doing any I/O (like in our sleeping example above) then just set it to zero, and the clock will jump whenever all tasks are blocked.
Note
If you use
autojump_threshold
andwait_all_tasks_blocked
at the same time, then you might wonder how they interact, since they both cause things to happen after the run loop goes idle for some time. The answer is:wait_all_tasks_blocked
takes priority. If there’s a task blocked inwait_all_tasks_blocked
, then the autojump feature treats that as active task and does not jump the clock.
- jump(seconds: float) None ¶
Manually advance the clock by the given number of seconds.
- Parameters:
seconds (float) – the number of seconds to jump the clock forward.
- Raises:
ValueError – if you try to pass a negative value for
seconds
.
Inter-task ordering¶
- class trio.testing.Sequencer¶
A convenience class for forcing code in different tasks to run in an explicit linear order.
Instances of this class implement a
__call__
method which returns an async context manager. The idea is that you pass a sequence number to__call__
to say where this block of code should go in the linear sequence. Block 0 starts immediately, and then block N doesn’t start until block N-1 has finished.Example
An extremely elaborate way to print the numbers 0-5, in order:
async def worker1(seq): async with seq(0): print(0) async with seq(4): print(4) async def worker2(seq): async with seq(2): print(2) async with seq(5): print(5) async def worker3(seq): async with seq(1): print(1) async with seq(3): print(3) async def main(): seq = trio.testing.Sequencer() async with trio.open_nursery() as nursery: nursery.start_soon(worker1, seq) nursery.start_soon(worker2, seq) nursery.start_soon(worker3, seq)
- await trio.testing.wait_all_tasks_blocked(cushion: float = 0.0) None ¶
Block until there are no runnable tasks.
This is useful in testing code when you want to give other tasks a chance to “settle down”. The calling task is blocked, and doesn’t wake up until all other tasks are also blocked for at least
cushion
seconds. (Setting a non-zerocushion
is intended to handle cases like two tasks talking to each other over a local socket, where we want to ignore the potential brief moment between a send and receive when all tasks are blocked.)Note that
cushion
is measured in real time, not the Trio clock time.If there are multiple tasks blocked in
wait_all_tasks_blocked()
, then the one with the shortestcushion
is the one woken (and this task becoming unblocked resets the timers for the remaining tasks). If there are multiple tasks that have exactly the samecushion
, then all are woken.You should also consider
trio.testing.Sequencer
, which provides a more explicit way to control execution ordering within a test, and will often produce more readable tests.Example
Here’s an example of one way to test that Trio’s locks are fair: we take the lock in the parent, start a child, wait for the child to be blocked waiting for the lock (!), and then check that we can’t release and immediately re-acquire the lock:
async def lock_taker(lock): await lock.acquire() lock.release() async def test_lock_fairness(): lock = trio.Lock() await lock.acquire() async with trio.open_nursery() as nursery: nursery.start_soon(lock_taker, lock) # child hasn't run yet, we have the lock assert lock.locked() assert lock._owner is trio.lowlevel.current_task() await trio.testing.wait_all_tasks_blocked() # now the child has run and is blocked on lock.acquire(), we # still have the lock assert lock.locked() assert lock._owner is trio.lowlevel.current_task() lock.release() try: # The child has a prior claim, so we can't have it lock.acquire_nowait() except trio.WouldBlock: assert lock._owner is not trio.lowlevel.current_task() print("PASS") else: print("FAIL")
- await trio.testing.wait_all_threads_completed() None ¶
Wait until no threads are still running tasks.
This is intended to be used when testing code with trio.to_thread to make sure no tasks are still making progress in a thread. See the following code for a usage example:
async def wait_all_settled(): while True: await trio.testing.wait_all_threads_complete() await trio.testing.wait_all_tasks_blocked() if trio.testing.active_thread_count() == 0: break
Streams¶
Connecting to an in-process socket server¶
- await trio.testing.open_stream_to_socket_listener(socket_listener: SocketListener) SocketStream ¶
Connect to the given
SocketListener
.This is particularly useful in tests when you want to let a server pick its own port, and then connect to it:
listeners = await trio.open_tcp_listeners(0) client = await trio.testing.open_stream_to_socket_listener(listeners[0])
- Parameters:
socket_listener (SocketListener) – The
SocketListener
to connect to.- Returns:
a stream connected to the given listener.
- Return type:
Virtual, controllable streams¶
One particularly challenging problem when testing network protocols is making sure that your implementation can handle data whose flow gets broken up in weird ways and arrives with weird timings: localhost connections tend to be much better behaved than real networks, so if you only test on localhost then you might get bitten later. To help you out, Trio provides some fully in-memory implementations of the stream interfaces (see The abstract Stream API), that let you write all kinds of interestingly evil tests.
There are a few pieces here, so here’s how they fit together:
memory_stream_pair()
gives you a pair of connected,
bidirectional streams. It’s like socket.socketpair()
, but
without any involvement from that pesky operating system and its
networking stack.
To build a bidirectional stream, memory_stream_pair()
uses
two unidirectional streams. It gets these by calling
memory_stream_one_way_pair()
.
memory_stream_one_way_pair()
, in turn, is implemented using the
low-ish level classes MemorySendStream
and
MemoryReceiveStream
. These are implementations of (you
guessed it) trio.abc.SendStream
and
trio.abc.ReceiveStream
that on their own, aren’t attached to
anything – “sending” and “receiving” just put data into and get data
out of a private internal buffer that each object owns. They also have
some interesting hooks you can set, that let you customize the
behavior of their methods. This is where you can insert the evil, if
you want it. memory_stream_one_way_pair()
takes advantage of
these hooks in a relatively boring way: it just sets it up so that
when you call send_all
, or when you close the send stream, then it
automatically triggers a call to memory_stream_pump()
, which is
a convenience function that takes data out of a
MemorySendStream
´s buffer and puts it into a
MemoryReceiveStream
´s buffer. But that’s just the default –
you can replace this with whatever arbitrary behavior you want.
Trio also provides some specialized functions for testing completely
unbuffered streams: lockstep_stream_one_way_pair()
and
lockstep_stream_pair()
. These aren’t customizable, but they do
exhibit an extreme kind of behavior that’s good at catching out edge
cases in protocol implementations.
API details¶
- class trio.testing.MemorySendStream(send_all_hook: Callable[[], Awaitable[object]] | None = None, wait_send_all_might_not_block_hook: Callable[[], Awaitable[object]] | None = None, close_hook: Callable[[], object] | None = None)¶
An in-memory
SendStream
.- Parameters:
send_all_hook – An async function, or None. Called from
send_all()
. Can do whatever you like.wait_send_all_might_not_block_hook – An async function, or None. Called from
wait_send_all_might_not_block()
. Can do whatever you like.close_hook – A synchronous function, or None. Called from
close()
andaclose()
. Can do whatever you like.
- send_all_hook¶
- wait_send_all_might_not_block_hook¶
- close_hook¶
All of these hooks are also exposed as attributes on the object, and you can change them at any time.
- close() None ¶
Marks this stream as closed, and then calls the
close_hook
(if any).
- await get_data(max_bytes: int | None = None) bytearray ¶
Retrieves data from the internal buffer, blocking if necessary.
- Parameters:
max_bytes (int or None) – The maximum amount of data to retrieve. None (the default) means to retrieve all the data that’s present (but still blocks until at least one byte is available).
- Returns:
If this stream has been closed, an empty bytearray. Otherwise, the requested data.
- get_data_nowait(max_bytes: int | None = None) bytearray ¶
Retrieves data from the internal buffer, but doesn’t block.
See
get_data()
for details.- Raises:
trio.WouldBlock – if no data is available to retrieve.
- await send_all(data: bytes | bytearray | memoryview) None ¶
Places the given data into the object’s internal buffer, and then calls the
send_all_hook
(if any).
- await wait_send_all_might_not_block() None ¶
Calls the
wait_send_all_might_not_block_hook
(if any), and then returns immediately.
- class trio.testing.MemoryReceiveStream(receive_some_hook: Callable[[], Awaitable[object]] | None = None, close_hook: Callable[[], object] | None = None)¶
An in-memory
ReceiveStream
.- Parameters:
receive_some_hook – An async function, or None. Called from
receive_some()
. Can do whatever you like.close_hook – A synchronous function, or None. Called from
close()
andaclose()
. Can do whatever you like.
- receive_some_hook¶
- close_hook¶
Both hooks are also exposed as attributes on the object, and you can change them at any time.
- put_data(data: bytes | bytearray | memoryview) None ¶
Appends the given data to the internal buffer.
- await receive_some(max_bytes: int | None = None) bytearray ¶
Calls the
receive_some_hook
(if any), and then retrieves data from the internal buffer, blocking if necessary.
- trio.testing.memory_stream_pump(memory_send_stream: MemorySendStream, memory_receive_stream: MemoryReceiveStream, *, max_bytes: int | None = None) bool ¶
Take data out of the given
MemorySendStream
’s internal buffer, and put it into the givenMemoryReceiveStream
’s internal buffer.- Parameters:
memory_send_stream (MemorySendStream) – The stream to get data from.
memory_receive_stream (MemoryReceiveStream) – The stream to put data into.
max_bytes (int or None) – The maximum amount of data to transfer in this call, or None to transfer all available data.
- Returns:
True if it successfully transferred some data, or False if there was no data to transfer.
This is used to implement
memory_stream_one_way_pair()
andmemory_stream_pair()
; see the latter’s docstring for an example of how you might use it yourself.
- trio.testing.memory_stream_one_way_pair() tuple[trio.testing.MemorySendStream, trio.testing.MemoryReceiveStream] ¶
Create a connected, pure-Python, unidirectional stream with infinite buffering and flexible configuration options.
You can think of this as being a no-operating-system-involved Trio-streamsified version of
os.pipe()
(except thatos.pipe()
returns the streams in the wrong order – we follow the superior convention that data flows from left to right).- Returns:
A tuple (
MemorySendStream
,MemoryReceiveStream
), where theMemorySendStream
has its hooks set up so that it callsmemory_stream_pump()
from itssend_all_hook
andclose_hook
.
The end result is that data automatically flows from the
MemorySendStream
to theMemoryReceiveStream
. But you’re also free to rearrange things however you like. For example, you can temporarily set thesend_all_hook
to None if you want to simulate a stall in data transmission. Or seememory_stream_pair()
for a more elaborate example.
- trio.testing.memory_stream_pair() tuple[trio.StapledStream[trio.testing.MemorySendStream, trio.testing.MemoryReceiveStream], trio.StapledStream[trio.testing.MemorySendStream, trio.testing.MemoryReceiveStream]] ¶
Create a connected, pure-Python, bidirectional stream with infinite buffering and flexible configuration options.
This is a convenience function that creates two one-way streams using
memory_stream_one_way_pair()
, and then usesStapledStream
to combine them into a single bidirectional stream.This is like a no-operating-system-involved, Trio-streamsified version of
socket.socketpair()
.- Returns:
A pair of
StapledStream
objects that are connected so that data automatically flows from one to the other in both directions.
After creating a stream pair, you can send data back and forth, which is enough for simple tests:
left, right = memory_stream_pair() await left.send_all(b"123") assert await right.receive_some() == b"123" await right.send_all(b"456") assert await left.receive_some() == b"456"
But if you read the docs for
StapledStream
andmemory_stream_one_way_pair()
, you’ll see that all the pieces involved in wiring this up are public APIs, so you can adjust to suit the requirements of your tests. For example, here’s how to tweak a stream so that data flowing from left to right trickles in one byte at a time (but data flowing from right to left proceeds at full speed):left, right = memory_stream_pair() async def trickle(): # left is a StapledStream, and left.send_stream is a MemorySendStream # right is a StapledStream, and right.recv_stream is a MemoryReceiveStream while memory_stream_pump(left.send_stream, right.recv_stream, max_bytes=1): # Pause between each byte await trio.sleep(1) # Normally this send_all_hook calls memory_stream_pump directly without # passing in a max_bytes. We replace it with our custom version: left.send_stream.send_all_hook = trickle
And here’s a simple test using our modified stream objects:
async def sender(): await left.send_all(b"12345") await left.send_eof() async def receiver(): async for data in right: print(data) async with trio.open_nursery() as nursery: nursery.start_soon(sender) nursery.start_soon(receiver)
By default, this will print
b"12345"
and then immediately exit; with our trickle stream it instead sleeps 1 second, then printsb"1"
, then sleeps 1 second, then printsb"2"
, etc.Pro-tip: you can insert sleep calls (like in our example above) to manipulate the flow of data across tasks… and then use
MockClock
and itsautojump_threshold
functionality to keep your test suite running quickly.If you want to stress test a protocol implementation, one nice trick is to use the
random
module (preferably with a fixed seed) to move random numbers of bytes at a time, and insert random sleeps in between them. You can also set up a customreceive_some_hook
if you want to manipulate things on the receiving side, and not just the sending side.
- trio.testing.lockstep_stream_one_way_pair() tuple[trio.abc.SendStream, trio.abc.ReceiveStream] ¶
Create a connected, pure Python, unidirectional stream where data flows in lockstep.
- Returns:
A tuple (
SendStream
,ReceiveStream
).
This stream has absolutely no buffering. Each call to
send_all()
will block until all the given data has been returned by a call toreceive_some()
.This can be useful for testing flow control mechanisms in an extreme case, or for setting up “clogged” streams to use with
check_one_way_stream()
and friends.In addition to fulfilling the
SendStream
andReceiveStream
interfaces, the return objects also have a synchronousclose
method.
- trio.testing.lockstep_stream_pair() tuple[trio.StapledStream[trio.abc.SendStream, trio.abc.ReceiveStream], trio.StapledStream[trio.abc.SendStream, trio.abc.ReceiveStream]] ¶
Create a connected, pure-Python, bidirectional stream where data flows in lockstep.
- Returns:
A tuple (
StapledStream
,StapledStream
).
This is a convenience function that creates two one-way streams using
lockstep_stream_one_way_pair()
, and then usesStapledStream
to combine them into a single bidirectional stream.
Testing custom stream implementations¶
Trio also provides some functions to help you test your custom stream implementations:
- await trio.testing.check_one_way_stream(stream_maker: Callable[[], Awaitable[Tuple[SendStream, ReceiveStream]]], clogged_stream_maker: Callable[[], Awaitable[Tuple[SendStream, ReceiveStream]]] | None) None ¶
Perform a number of generic tests on a custom one-way stream implementation.
- Parameters:
stream_maker – An async (!) function which returns a connected (
SendStream
,ReceiveStream
) pair.clogged_stream_maker – Either None, or an async function similar to stream_maker, but with the extra property that the returned stream is in a state where
send_all
andwait_send_all_might_not_block
will block untilreceive_some
has been called. This allows for more thorough testing of some edge cases, especially aroundwait_send_all_might_not_block
.
- Raises:
AssertionError – if a test fails.
- await trio.testing.check_two_way_stream(stream_maker: Callable[[], Awaitable[Tuple[Stream, Stream]]], clogged_stream_maker: Callable[[], Awaitable[Tuple[Stream, Stream]]] | None) None ¶
Perform a number of generic tests on a custom two-way stream implementation.
This is similar to
check_one_way_stream()
, except that the maker functions are expected to return objects implementing theStream
interface.This function tests a superset of what
check_one_way_stream()
checks – if you call this, then you don’t need to also callcheck_one_way_stream()
.
- await trio.testing.check_half_closeable_stream(stream_maker: Callable[[], Awaitable[Tuple[HalfCloseableStream, HalfCloseableStream]]], clogged_stream_maker: Callable[[], Awaitable[Tuple[HalfCloseableStream, HalfCloseableStream]]] | None) None ¶
Perform a number of generic tests on a custom half-closeable stream implementation.
This is similar to
check_two_way_stream()
, except that the maker functions are expected to return objects that implement theHalfCloseableStream
interface.This function tests a superset of what
check_two_way_stream()
checks – if you call this, then you don’t need to also callcheck_two_way_stream()
.
Virtual networking for testing¶
In the previous section you learned how to use virtual in-memory
streams to test protocols that are written against Trio’s
Stream
abstraction. But what if you have more
complicated networking code – the kind of code that makes connections
to multiple hosts, or opens a listening socket, or sends UDP packets?
Trio doesn’t itself provide a virtual in-memory network implementation
for testing – but trio.socket
module does provide the hooks you
need to write your own! And if you’re interested in helping implement
a reusable virtual network for testing, then please get in touch.
Note that these APIs are actually in trio.socket
and
trio.abc
, but we document them here because they’re primarily
intended for testing.
- trio.socket.set_custom_hostname_resolver(hostname_resolver: HostnameResolver | None) HostnameResolver | None ¶
Set a custom hostname resolver.
By default, Trio’s
getaddrinfo()
andgetnameinfo()
functions use the standard system resolver functions. This function allows you to customize that behavior. The main intended use case is for testing, but it might also be useful for using third-party resolvers like c-ares (though be warned that these rarely make perfect drop-in replacements for the system resolver). Seetrio.abc.HostnameResolver
for more details.Setting a custom hostname resolver affects all future calls to
getaddrinfo()
andgetnameinfo()
within the enclosing call totrio.run()
. All other hostname resolution in Trio is implemented in terms of these functions.Generally you should call this function just once, right at the beginning of your program.
- Parameters:
hostname_resolver (trio.abc.HostnameResolver or None) – The new custom hostname resolver, or None to restore the default behavior.
- Returns:
The previous hostname resolver (which may be None).
- class trio.abc.HostnameResolver¶
If you have a custom hostname resolver, then implementing
HostnameResolver
allows you to register this to be used by Trio.See
trio.socket.set_custom_hostname_resolver()
.- abstractmethod await getaddrinfo(host: bytes | None, port: bytes | str | int | None, family: int = 0, type: int = 0, proto: int = 0, flags: int = 0) list[tuple[socket.AddressFamily, socket.SocketKind, int, str, tuple[str, int] | tuple[str, int, int, int]]] ¶
A custom implementation of
getaddrinfo()
.Called by
trio.socket.getaddrinfo()
.If
host
is given as a numeric IP address, thengetaddrinfo()
may handle the request itself rather than calling this method.Any required IDNA encoding is handled before calling this function; your implementation can assume that it will never see U-labels like
"café.com"
, and only needs to handle A-labels likeb"xn--caf-dma.com"
.
- trio.socket.set_custom_socket_factory(socket_factory: SocketFactory | None) SocketFactory | None ¶
Set a custom socket object factory.
This function allows you to replace Trio’s normal socket class with a custom class. This is very useful for testing, and probably a bad idea in any other circumstance. See
trio.abc.HostnameResolver
for more details.Setting a custom socket factory affects all future calls to
socket()
within the enclosing call totrio.run()
.Generally you should call this function just once, right at the beginning of your program.
- Parameters:
socket_factory (trio.abc.SocketFactory or None) – The new custom socket factory, or None to restore the default behavior.
- Returns:
The previous socket factory (which may be None).
- class trio.abc.SocketFactory¶
If you write a custom class implementing the Trio socket interface, then you can use a
SocketFactory
to get Trio to use it.See
trio.socket.set_custom_socket_factory()
.- abstractmethod socket(family: socket.AddressFamily | int = AddressFamily.AF_INET, type: socket.SocketKind | int = SocketKind.SOCK_STREAM, proto: int = 0) SocketType ¶
Create and return a socket object.
Your socket object must inherit from
trio.socket.SocketType
, which is an empty class whose only purpose is to “mark” which classes should be considered valid Trio sockets.Called by
trio.socket.socket()
.Note that unlike
trio.socket.socket()
, this does not take afileno=
argument. If afileno=
is specified, thentrio.socket.socket()
returns a regular Trio socket object instead of calling this method.
Testing checkpoints¶
- with trio.testing.assert_checkpoints() AbstractContextManager[None] ¶
Use as a context manager to check that the code inside the
with
block either exits with an exception or executes at least one checkpoint.- Raises:
AssertionError – if no checkpoint was executed.
Example
Check that
trio.sleep()
is a checkpoint, even if it doesn’t block:with trio.testing.assert_checkpoints(): await trio.sleep(0)
- with trio.testing.assert_no_checkpoints() AbstractContextManager[None] ¶
Use as a context manager to check that the code inside the
with
block does not execute any checkpoints.- Raises:
AssertionError – if a checkpoint was executed.
Example
Synchronous code never contains any checkpoints, but we can double-check that:
send_channel, receive_channel = trio.open_memory_channel(10) with trio.testing.assert_no_checkpoints(): send_channel.send_nowait(None)
ExceptionGroup helpers¶
- class trio.testing.RaisesGroup(exception: type[E] | Matcher[E] | E, *other_exceptions: type[E] | Matcher[E] | E, allow_unwrapped: bool = False, flatten_subgroups: bool = False, match: str | Pattern[str] | None = None, check: Callable[[BaseExceptionGroup[E]], bool] | None = None, strict: None = None)¶
Contextmanager for checking for an expected
ExceptionGroup
. This works similar topytest.raises
, and a version of it will hopefully be added upstream, after which this can be deprecated and removed. See https://github.com/pytest-dev/pytest/issues/11538The catching behaviour differs from except* in multiple different ways, being much stricter by default. By using
allow_unwrapped=True
andflatten_subgroups=True
you can matchexcept*
fully when expecting a single exception.All specified exceptions must be present, and no others.
If you expect a variable number of exceptions you need to use
pytest.raises(ExceptionGroup)
and manually check the contained exceptions. Consider making use ofMatcher.matches()
.
It will only catch exceptions wrapped in an exceptiongroup by default.
With
allow_unwrapped=True
you can specify a single expected exception orMatcher
and it will match the exception even if it is not inside anExceptionGroup
. If you expect one of several different exception types you need to use aMatcher
object.
By default it cares about the full structure with nested
ExceptionGroup
’s. You can specify nestedExceptionGroup
’s by passingRaisesGroup
objects as expected exceptions.With
flatten_subgroups=True
it will “flatten” the raisedExceptionGroup
, extracting all exceptions inside any nestedExceptionGroup
, before matching.
It currently does not care about the order of the exceptions, so
RaisesGroups(ValueError, TypeError)
is equivalent toRaisesGroups(TypeError, ValueError)
.This class is not as polished as
pytest.raises
, and is currently not as helpful in e.g. printing diffs when strings don’t match, suggesting you usere.escape
, etc.Examples:
with RaisesGroups(ValueError): raise ExceptionGroup("", (ValueError(),)) with RaisesGroups(ValueError, ValueError, Matcher(TypeError, match="expected int")): ... with RaisesGroups(KeyboardInterrupt, match="hello", check=lambda x: type(x) is BaseExceptionGroup): ... with RaisesGroups(RaisesGroups(ValueError)): raise ExceptionGroup("", (ExceptionGroup("", (ValueError(),)),)) # flatten_subgroups with RaisesGroups(ValueError, flatten_subgroups=True): raise ExceptionGroup("", (ExceptionGroup("", (ValueError(),)),)) # allow_unwrapped with RaisesGroups(ValueError, allow_unwrapped=True): raise ValueError
RaisesGroup.matches
can also be used directly to check a standalone exception group.The matching algorithm is greedy, which means cases such as this may fail:
with RaisesGroups(ValueError, Matcher(ValueError, match="hello")): raise ExceptionGroup("", (ValueError("hello"), ValueError("goodbye")))
even though it generally does not care about the order of the exceptions in the group. To avoid the above you should specify the first ValueError with a Matcher as well.
It is also not typechecked perfectly, and that’s likely not possible with the current approach. Most common usage should work without issue though.
- matches(exc_val: BaseException | None) TypeGuard[BaseExceptionGroup[E]] ¶
Check if an exception matches the requirements of this RaisesGroup.
Example:
with pytest.raises(TypeError) as excinfo: ... assert RaisesGroups(ValueError).matches(excinfo.value.__cause__) # the above line is equivalent to myexc = excinfo.value.__cause assert isinstance(myexc, BaseExceptionGroup) assert len(myexc.exceptions) == 1 assert isinstance(myexc.exceptions[0], ValueError)
- class trio.testing.Matcher(exception_type: type[MatchE] | None = None, match: str | Pattern[str] | None = None, check: Callable[[MatchE], bool] | None = None)¶
Helper class to be used together with RaisesGroups when you want to specify requirements on sub-exceptions. Only specifying the type is redundant, and it’s also unnecessary when the type is a nested
RaisesGroup
since it supports the same arguments. The type is checked withisinstance
, and does not need to be an exact match. If that is wanted you can use thecheck
parameter.trio.testing.Matcher.matches()
can also be used standalone to check individual exceptions.Examples:
with RaisesGroups(Matcher(ValueError, match="string")) ... with RaisesGroups(Matcher(check=lambda x: x.args == (3, "hello"))): ... with RaisesGroups(Matcher(check=lambda x: type(x) is ValueError)): ...
- matches(exception: BaseException) TypeGuard[MatchE] ¶
Check if an exception matches the requirements of this Matcher.
Examples:
assert Matcher(ValueError).matches(my_exception): # is equivalent to assert isinstance(my_exception, ValueError) # this can be useful when checking e.g. the ``__cause__`` of an exception. with pytest.raises(ValueError) as excinfo: ... assert Matcher(SyntaxError, match="foo").matches(excinfo.value.__cause__) # above line is equivalent to assert isinstance(excinfo.value.__cause__, SyntaxError) assert re.search("foo", str(excinfo.value.__cause__)
- class trio.testing._raises_group._ExceptionInfo(excinfo: tuple[type[MatchE], MatchE, types.TracebackType] | None)¶
Minimal re-implementation of pytest.ExceptionInfo, only used if pytest is not available. Supports a subset of its features necessary for functionality of
trio.testing.RaisesGroup
andtrio.testing.Matcher
.- fill_unfilled(exc_info: tuple[type[MatchE], MatchE, types.TracebackType]) None ¶
Fill an unfilled ExceptionInfo created with
for_later()
.
- classmethod for_later() _ExceptionInfo[MatchE] ¶
Return an unfilled ExceptionInfo.
- property tb: types.TracebackType¶
The exception raw traceback.