Implementing timeouts in Python's asynchronous generators

Sep. 21, 2023

Every python programmer who worked over increasing the IO performance of their script knows about async and its powerful cooperative multitasking abilities. As you may know async in Python becomes possible because of enhanced generators implementation by PEP-342. Along 2 and 3 versions, the abilities of generators in Python grew up, and today we have a powerful mechanism that allows suspension and resuming tasks of the main application thread.

In this article I would like to tell you about the solution to a specific problem, that is placed between generators and coroutines. That problem can be defined as a lack of ability to natively apply timeouts to asynchronous generators. Each yield pushes value to the caller function, but the caller never knows if the next value is being yielded or not. This situation takes place when you build an RPC service over RabbitMQ. I’ll say a few words about it.

A client produces messages into a specific queue. The worker that performs any requested operation, listens to the queue, and, when it consumes a message, it starts processing it. When the process ends, the worker pushes the response message to an exclusive queue which exists only while the client holds the connection. The client may wait for a long time, especially if the worker is not sending a full response but splitting it into a few chunks instead. So, the client needs a mechanism that allows it to finish awaiting when a timeout occurs.

Python’s “async for” doesn’t have any timeout. Of course, you can wrap “async for” into function and run it with event_loop.wait_for, but this method applies timeout to the whole coroutine, but not for the time between chunks arrival. In my case, I need to set a timeout for the next chunk awaiting, not the whole process.

To implement this, we need some knowledge about async iterators. First of all, the object that needs to be iterated must implement the aiter() method. The aiter() method must return an object that implements anext() method, which returns the next value of the iterated sequence.

To implement a “timeoutable” iterator you need to perform the following steps:

Note that there are two possible ways of exiting the generator:

Here is some sample code.

class AsyncTimedIterable:
    def __init__(self, iterable, timeout=0):
        class AsyncTimedIterator:
            def __init__(self):
                self._iterator = iterable.__aiter__()

            async def __anext__(self):
                try:
                    result = await asyncio.wait_for(self._iterator.__anext__(), int(timeout))
					# if you want to stop the iteration just raise StopAsyncIteration using some conditions (when the last chunk arrives, for example)
					if not result:
						raise StopAsyncIteration
					return result
                except asyncio.TimeoutError as e:
                    raise e

        self._factory = AsyncTimedIterator

    def __aiter__(self):
        return self._factory()

Just wrap your iterable into that class and set timeout: timed_iter = AsyncTimedIterable(iter, 30) Then you may iterate through it:

async for r in timed_iter:
	pass

When timeout occurs, asyncio.TimeoutError will rise, in other case StopAsyncIteration will correctly exit async for loop.