aiortc: Created audio tracks never call `recv()`

I’ll preface this by saying the default server example included with the repo works fine.

I have a webapp that originally used websockets to transfer audio along with other data. I’m trying to move the audio portion over to WebRTC, with the session signalling being done via the existing websocket connection. I’m essentially just adapting the server example to do bidirectional audio and removing the video portion entirely.

I might be misunderstanding how to do things correctly, but here is the python code on the server that responds to the SDP offer and sets up the audio tracks. It’s almost identical to the example server.py:

class MicStreamTrack(MediaStreamTrack):
    """
    An audio stream object for the mic audio from the client
    """
    kind = "audio"

    def __init__(self, track):
        super().__init__()
        self.track = track

    async def recv(self):
        logger.logVerbose("mic recv()")

        # Get a new PyAV frame
        frame = await self.track.recv()

        # Convert to float32 numpy array
        floatArray = frame.to_ndarray(format="float32")

        # Put these samples into the mic queue
        micSampleQueue.put_nowait(floatArray)

        logger.logVerbose("Put {} samples to mic queue".format(len(floatArray)))

class SpkrStreamTrack(MediaStreamTrack):
    """
    An audio stream object for the speaker data from the server
    """
    kind = "audio"

    def __init__(self):
        super().__init__()

    async def recv():
        logger.logVerbose("spkr recv()")

        # Get samples from speaker queue if available
        floatArray = await spkrSampleQueue.get()

        # Convert to audio 
        frame = AudioFrame.from_ndarray(floatArray, format='float', layout='mono')

        logger.logVerbose("Got {} speaker samples".format(frame.samples))

        # Return
        return frame

async def gotRtcOffer(offerObj):
    """
    Called when we receive a WebRTC offer from the client

    Args:
        offerObj (dict): WebRTC SDP offer object
    """

    global rtcPeer

    logger.logInfo("Got WebRTC offer")
    
    # Create SDP offer and peer connection objects
    offer = RTCSessionDescription(sdp=offerObj["sdp"], type=offerObj["type"])
    rtcPeer = RTCPeerConnection()

    # Create UUID for peer
    pcUuid = "PeerConnection({})".format(uuid.uuid4())
    logger.logVerbose("Creating peer connection {}".format(pcUuid))

    # ICE connection state callback
    @rtcPeer.on("iceconnectionstatechange")
    async def onIceConnectionStateChange():
        logger.logVerbose("Ice connection state is now {}".format(rtcPeer.iceConnectionState))
        if rtcPeer.iceConnectionState == "failed":
            await rtcPeer.close()
            logger.logError("WebRTC peer connection {} failed".format(pcUuid))
    
    # Audio track callback
    @rtcPeer.on("track")
    def onTrack(track):

        global micStream
        global rtcPeer

        logger.logVerbose("Got {} track from peer {}".format(track.kind, pcUuid))

        # make sure it's audio
        if track.kind != "audio":
            logger.logError("Got non-audio track from peer {}".format(pcUuid))
            return
        
        # Create the mic stream for this track
        micStream = MicStreamTrack(track)
        logger.logVerbose("Added mic track")

        # Send the speaker stream back
        spkrTrack = SpkrStreamTrack()
        rtcPeer.addTrack(spkrTrack)
        logger.logVerbose("Added speaker track")

        # Track ended handler
        @track.on("ended")
        async def onEnded():
            logger.logVerbose("Audio track from {} ended".format(pcUuid))

    # Handle the received offer
    logger.logVerbose("Creating remote description from offer")
    await rtcPeer.setRemoteDescription(offer)

    # Create answer
    logger.logVerbose("Creating WebRTC answer")
    answer = await rtcPeer.createAnswer()

    # Set local description
    logger.logVerbose("setting local SDP")
    await rtcPeer.setLocalDescription(answer)

    # Send answer
    logger.logVerbose("sending SDP answer")
    message = '{{ "webRtcAnswer": {{ "type": "{}", "sdp": {} }} }}'.format(rtcPeer.localDescription.type, json.dumps(rtcPeer.localDescription.sdp))
    messageQueue.put_nowait(message)

    logger.logVerbose("done")

All of the expected SDP log messages occur, and it appears on both ends (the web browser and the python script) that everything is set up properly. Both tracks are added and it looks like the connection goes fully active:

[09/22 13:09:56] (  gotRtcOffer   ) INFO: Got WebRTC offer
[09/22 13:09:56] (  gotRtcOffer   ) VERB: Creating peer connection PeerConnection(514e2bad-a415-412f-b741-1e16ab82b023)
[09/22 13:09:56] (  gotRtcOffer   ) VERB: Creating remote description from offer
[09/22 13:09:56] (    onTrack     ) VERB: Got audio track from peer PeerConnection(514e2bad-a415-412f-b741-1e16ab82b023)
[09/22 13:09:56] (    onTrack     ) VERB: Added mic track
[09/22 13:09:56] (    onTrack     ) VERB: Added speaker track
[09/22 13:09:56] (  gotRtcOffer   ) VERB: Creating WebRTC answer
[09/22 13:09:56] (  gotRtcOffer   ) VERB: setting local SDP
[09/22 13:10:01] (  gotRtcOffer   ) VERB: sending SDP answer
[09/22 13:10:01] (  gotRtcOffer   ) VERB: done
[09/22 13:10:01] (producer_hander ) INFO: sending WebRTC answer to ::1
[09/22 13:10:01] (onIceConnectionStateChange) VERB: Ice connection state is now checking
[09/22 13:10:02] (onIceConnectionStateChange) VERB: Ice connection state is now completed

I can even see the data being sent from the webpage to the server via the chrome WebRTC internals. However, the recv() callbacks on the python side for either track are never called. I’d expect to see a ton of prints as data is flowing every time the tracks get new data, but nothing ever happens after the RTC session connects. No errors occur on either the webpage or the python script either.

My limited understanding of WebRTC is preventing me from being able to figure out what’s going wrong. Help would be greatly appreciated.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 24

Most upvoted comments

Good, congrats. I agree with the statement on PyAV. In my company, we have been working extensively on various frame shapes coming from Text-to-Speech engines and it was tricky to use that through PyAV. Also, the aiortc lib brings its own layer of complexity with elements like mediarecorder or mediarelays that are very sensitive to frame.pts gaps, leading to crashes when the internet connexion is not 100% stable… we are planning to release a blog article with tricks and how-tos related to this lib, if you’re interested I would be able to share a link when it’s released

Success!

PyAV is extremely picky with its input data format, and is also extremely vague about what that format is.

Turns out, for a mono s16 sample array, it needs to be a 2-dimensional array of single valued-arrays, like so:

[[0],[0],[0],...]

PyAV only reports errors via asserts with no actual error message. The relevant code to figure out what kind of array was needed is here:

https://github.com/PyAV-Org/PyAV/blob/9ac05d9ac902d71ecb2fe80f04dcae454008378c/av/audio/frame.pyx#L119-L127

Now my revised and working playback code is below:

    async def recv(self):

        # Handle timestamps properly
        if hasattr(self, "_timestamp"):
            self._timestamp += self.samples
            wait = self._start + (self._timestamp / self.samplerate) - time.time()
            await asyncio.sleep(wait)
        else:
            self._start = time.time()
            self._timestamp = 0

        # create empty data by default
        data = np.zeros(self.samples).astype(np.int16)

        # Only get speaker data if we have some in the buffer
        if spkrSampleQueue.qsize() > 1:
            try:
                logger.logVerbose("Getting speaker samples from queue")
                data = spkrSampleQueue.get_nowait()
            except queue.Empty:
                pass

        # To convert to a mono audio frame, we need the array to be an array of single-value arrays for each sample (annoying)
        data = data.reshape(data.shape[0], -1).T
        # Create audio frame
        frame = AudioFrame.from_ndarray(data, format='s16', layout='mono')

        # Update time stuff
        frame.pts = self._timestamp
        frame.sample_rate = self.samplerate
        frame.time_base = fractions.Fraction(1, self.samplerate)

        # Return
        return frame