Testing made easier with trio.testing

The trio.testing module provides various utilities to make it easier to test trio code. Unlike the other submodules in the trio namespace, trio.testing is not automatically imported when you do import trio; you must import trio.testing explicitly.

Test harness integration

@trio.testing.trio_test

Time and timeouts

trio.testing.MockClock is a Clock with a few tricks up its sleeve to help you efficiently test code involving timeouts:

  • By default, it starts at time 0, and clock time only advances when you explicitly call jump(). This provides an extremely controllable clock for testing.
  • You can set rate to 1.0 if you want it to start running in real time like a regular clock. You can stop and start the clock within a test. You can set rate to 10.0 to make clock time pass at 10x real speed (so e.g. await trio.sleep(10) returns after 1 second).
  • But even more interestingly, you can set autojump_threshold to zero or a small value, and then it will watch the execution of the run loop, and any time things have settled down and everyone’s waiting for a timeout, it jumps the clock forward to that timeout. In many cases this allows natural-looking code involving timeouts to be automatically run at near full CPU utilization with no changes. (Thanks to fluxcapacitor for this awesome idea.)
  • And of course these can be mixed and matched at will.

Regardless of these shenanignas, from “inside” trio time still seems to pass normally, so long as you restrict yourself to trio’s time functions (see Time and clocks). Here’s an example demonstrating two different ways of making time pass quickly. Notice how in both cases, the two tasks keep a consistent view of reality and events happen in the expected order, despite being wildly divorced from actual real time:

# across-realtime.py

import time
import trio
import trio.testing

YEAR = 365 * 24 * 60 * 60  # seconds

async def task1():
    start = trio.current_time()

    print("task1: sleeping for 1 year")
    await trio.sleep(YEAR)

    duration = trio.current_time() - start
    print("task1: woke up; clock says I've slept {} years"
          .format(duration / YEAR))

    print("task1: sleeping for 1 year, 100 times")
    for _ in range(100):
        await trio.sleep(YEAR)

    duration = trio.current_time() - start
    print("task1: slept {} years total".format(duration / YEAR))

async def task2():
    start = trio.current_time()

    print("task2: sleeping for 5 years")
    await trio.sleep(5 * YEAR)

    duration = trio.current_time() - start
    print("task2: woke up; clock says I've slept {} years"
          .format(duration / YEAR))

    print("task2: sleeping for 500 years")
    await trio.sleep(500 * YEAR)

    duration = trio.current_time() - start
    print("task2: slept {} years total".format(duration / YEAR))

async def main():
    async with trio.open_nursery() as nursery:
        nursery.spawn(task1)
        nursery.spawn(task2)

def run_example(clock):
    real_start = time.time()
    trio.run(main, clock=clock)
    real_duration = time.time() - real_start
    print("Total real time elapsed: {} seconds".format(real_duration))

print("Clock where time passes at 100 years per second:\n")
run_example(trio.testing.MockClock(rate=100 * YEAR))

print("\nClock where time automatically skips past the boring parts:\n")
run_example(trio.testing.MockClock(autojump_threshold=0))

Output:

Clock where time passes at 100 years per second:

task2: sleeping for 5 years
task1: sleeping for 1 year
task1: woke up; clock says I've slept 1.0365006048232317 years
task1: sleeping for 1 year, 100 times
task2: woke up; clock says I've slept 5.0572111969813704 years
task2: sleeping for 500 years
task1: slept 104.77677842136472 years total
task2: slept 505.25014589075 years total
Total real time elapsed: 5.053582429885864 seconds

Clock where time automatically skips past the boring parts:

task2: sleeping for 5 years
task1: sleeping for 1 year
task1: woke up; clock says I've slept 1.0 years
task1: sleeping for 1 year, 100 times
task2: woke up; clock says I've slept 5.0 years
task2: sleeping for 500 years
task1: slept 101.0 years total
task2: slept 505.0 years total
Total real time elapsed: 0.019298791885375977 seconds
class trio.testing.MockClock(rate=0.0, autojump_threshold=inf)

A user-controllable clock suitable for writing tests.

Parameters:
rate

How many seconds of clock time pass per second of real time. Default is 0.0, i.e. the clock only advances through manuals calls to jump() or when the autojump_threshold is triggered. You can assign to this attribute to change it.

autojump_threshold

If all tasks are blocked for this many real seconds (i.e., according to the actual clock, not this clock), then this clock automatically jumps ahead to the run loop’s next scheduled timeout. Default is math.inf, i.e., to never autojump. You can assign to this attribute to change it.

You should set this to the smallest value that lets you reliably avoid “false alarms” where some I/O is in flight (e.g. between two halves of a socketpair) but the threshold gets triggered and time gets advanced anyway. This will depend on the details of your tests and test environment. If you aren’t doing any I/O (like in our sleeping example above) then setting it to zero is fine.

Note that setting this attribute interacts with the run loop, so can only be done from inside a run context or (as a special case) before calling trio.run().

Warning

If you’re using wait_all_tasks_blocked() and autojump_threshold together, then you have to be careful. Setting autojump_threshold acts like a task calling:

while True:
    await wait_all_tasks_blocked(cushion=clock.autojump_threshold)

This means that if you call wait_all_tasks_blocked() with a cushion larger than your autojump threshold, then your call to wait_all_tasks_blocked() will never return, because the autojump task will keep waking up before your task does, and each time it does it’ll reset your task’s timer.

tl;dr: you should set autojump_threshold to be at least as large as the largest cushion you plan to pass to wait_all_tasks_blocked().

jump(seconds)

Manually advance the clock by the given number of seconds.

Parameters:seconds (float) – the number of seconds to jump the clock forward.
Raises:ValueError – if you try to pass a negative seconds

Inter-task ordering

class trio.testing.Sequencer

A convenience class for forcing code in different tasks to run in an explicit linear order.

Instances of this class implement a __call__ method which returns an async context manager. The idea is that you pass a sequence number to __call__ to say where this block of code should go in the linear sequence. Block 0 starts immediately, and then block N doesn’t start until block N-1 has finished.

Example

An extremely elaborate way to print the numbers 0-5, in order:

async def worker1(seq):
    async with seq(0):
        print(0)
    async with seq(4):
        print(4)

async def worker2(seq):
    async with seq(2):
        print(2)
    async with seq(5):
        print(5)

async def worker3(seq):
    async with seq(1):
        print(1)
    async with seq(3):
        print(3)

async def main():
   seq = trio.testing.Sequencer()
   async with trio.open_nursery() as nursery:
       nursery.spawn(worker1, seq)
       nursery.spawn(worker2, seq)
       nursery.spawn(worker3, seq)
await trio.testing.wait_all_tasks_blocked(cushion=0.0)

Block until there are no runnable tasks.

This is useful in testing code when you want to give other tasks a chance to “settle down”. The calling task is blocked, and doesn’t wake up until all other tasks are also blocked for at least cushion seconds. (Setting a non-zero cushion is intended to handle cases like two tasks talking to each other over a local socket, where we want to ignore the potential brief moment between a send and receive when all tasks are blocked.)

Note that cushion is measured in real time, not the trio clock time.

If there are multiple tasks blocked in wait_all_tasks_blocked(), then the one with the shortest cushion is the one woken (and the this task becoming unblocked resets the timers for the remaining tasks). If there are multiple tasks that have exactly the same cushion, then all are woken.

You should also consider trio.testing.Sequencer, which provides a more explicit way to control execution ordering within a test, and will often produce more readable tests.

Example

Here’s an example of one way to test that trio’s locks are fair: we take the lock in the parent, spawn a child, wait for the child to be blocked waiting for the lock (!), and then check that we can’t release and immediately re-acquire the lock:

async def lock_taker(lock):
    await lock.acquire()
    lock.release()

async def test_lock_fairness():
    lock = trio.Lock()
    await lock.acquire()
    async with trio.open_nursery() as nursery:
        nursery.spawn(lock_taker, lock)
        # child hasn't run yet
        assert not lock.locked()
        await trio.testing.wait_all_tasks_blocked()
        # now the child has run
        assert lock.locked()
        lock.release()
        try:
            # The child has a prior claim, so we can't have it
            lock.acquire_nowait()
        except trio.WouldBlock:
            print("PASS")
        else:
            print("FAIL")

Testing yield points

with trio.testing.assert_yields()

Use as a context manager to check that the code inside the with block executes at least one yield point.

Raises:AssertionError – if no yield point was executed.

Example

Check that trio.sleep() is a yield point, even if it doesn’t block:

with trio.testing.assert_yields():
    await trio.sleep(0)
with trio.testing.assert_no_yields()

Use as a context manager to check that the code inside the with block does not execute any yield points.

Raises:AssertionError – if a yield point was executed.

Example

Synchronous code never yields, but we can double-check that:

queue = trio.Queue(10)
with trio.testing.assert_no_yields():
    queue.put_nowait(None)