anyio: A way to collect results upon exiting task group

In asyncio, there’s await asyncio.gather(..a..lot..of..tasks).

Is it possible to somehow emulate this with anyio?

I’m thinking of something like this maybe:

async with create_task_group() as tg:
    await tg.spawn(t1)
    await tg.spawn(t2)

print(tg.results)  # t1_res, t2_res

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 3
  • Comments: 26 (14 by maintainers)

Most upvoted comments

It’s well within your rights to wontfix this, but I will chip in my $0.02 and say when I found out anyio/trio don’t have good gather ergonomics my interest waned pretty significantly. In other words, I find gather in asyncio very, very good. Which is a little unfortunate because anyio/trio cancellation scopes are so awesome.

    from anyio import create_queue
except ImportError:
    from anyio import create_memory_object_stream as _cmos

    class Queue:
        def __init__(self, length=0):
            self._s, self._r = _cmos(length)

        def put(self, x):
            return self._s.send(x)

        def get(self):
            return self._r.receive()

        def qsize(self):
            return len(self._s._state.buffer)  # ugh

        def empty(self):
            return not len(self._s._state.buffer)  # ugh

        def __aiter__(self):
            return self

        def __anext__(self):
            return self._r.__anext__()

    def create_queue(length=0):
        return Queue(length)

This discussion already links to a reasonably simple implementation of gather on top of anyio.

Meanwhile, I’ve implemented an asyncio.gather() interface as follows:

(Posting it here in case somebody else googles this issue)

As with trio, and threads, you’re supposed to use either a queue/memory channel, deque, dict or something else to store the results of the tasks. I have no plans of implementing any other means of task result retrieval.