aiortc: Seems datachannel.send() blocked by reading channel

while True in blocks event loop if queue full on receive here

My suggestion is to rewrite it using asyncio.Queue but it is not trivial

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 15 (5 by maintainers)

Most upvoted comments

while True has no await so asyncio loop blocked. No data can be sended at this moment.

My english is not good. And i not fully anderstand your code. Need to go deeper)

^ FYI: I might actually pick this up at some point and implement it myself. Don’t rely on it though. 😺

Something like this:

class DataChannel:
    def __init__():
        self._send_queue = asyncio.Queue(maxsize=1)
        self._receive_queue = asyncio.Queue(maxsize=1)

    async def send(message: bytes = None) -> Union[None, asyncio.StreamWriter]:
        result = None
        if message is None:
            result = message = asyncio.StreamWriter(...)
            # This would need to create a task
            message = bytes_to_stream(message)
        await self._send_queue.put(message)
        return result

    async def receive(as_bytes: bool = False) -> Union[bytes, asyncio.StreamReader]:
        message = await self._receive_queue.get()
        if as_bytes:
            message = await stream_to_bytes(message)
        return message

If the user is interested in controlling the flow in its fullest form, it needs to use streams for messages. Otherwise, if it expects message size to not be an issue, it can also utilise bytes.

send could also be split up into send(message: Union[bytes, asyncio.StreamWriter]) and create_message() -> asyncio.StreamWriter.

The need to control the flow is only required for SCTP since DTLS/UDP has no flow control. I would propose to add a drain method. You can take the reader/writer classes of asyncio as inspiration.

Edit 1: But please don’t treat a data channel as a stream because it isn’t. It’s a message stream of byte streams. If you create an API that resembles that, you can easily transfer a large file per message with proper flow control. 😃

Edit 2: Using asyncio, I would create the API in the following way: On the sender side, allow to create a OutgoingMessage instance which is a subclass of asyncio.StreamWriter. Once the stream is EOF, the message is complete. Only one instance is allowed to exist at the same time per channel (alternatively, use an asyncio.Queue for enqueuing these instances). On the receiver side, raise an event (or better, use an asyncio.Queue) to await new IncomingMessage instances which are subclasses of asyncio.StreamReader. Read until EOF, then the message is complete. This API for example allows you to easily pipe a message’s content into an HTTP request using aiohttp while avoiding memory pressure.

@nickaknudson that’s unrelated: as mentioned in the changelog send() is now a coroutine so you need to do:

await channel.send(“Foo”)