sqlx: unknown message type: '\u{0}' for postgres

During benchmarking, I’ve got a lot of errors as next:

ERROR sqlx_core::pool::connection] error occurred while flushing the connection: encountered unexpected or invalid data: unknown message type: '\u{0}'

Idk is it related to this issue, but in postgres I also have next warning: [550] WARNING: there is no transaction in progress (in app I have a transaction, and seems for some reason it was not started)

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 23 (19 by maintainers)

Most upvoted comments

@jakedeichert you should be aware that #1099 is not a perfect fix: although it works well in practice, there’s still a small chance of the error occuring. Only 0.6 will truly fix the issue. (Of course that may be fine for your use case)

The next big release of SQLx (likely 0.6) is currently having its core rewritten from scratch and keeping this in mind while doing so.

This is honestly a nasty problem and a big footgun for new and experienced async Rust developers. I’m not sure what a good solution here (for long-term Rust) as it doesn’t look very trivial to lint against.

Sorry for the MySQL but that’s what I’ve chosen to do this to first (as it seemed to have the worst bugs being reported in relation to it):

Here is recv_packet (in MySQL) from master:

    // receive the next packet from the database server
    // may block (async) on more data from the server
    pub(crate) async fn recv_packet(&mut self) -> Result<Packet<Bytes>, Error> {
        // https://dev.mysql.com/doc/dev/mysql-server/8.0.12/page_protocol_basic_packets.html
        // https://mariadb.com/kb/en/library/0-packet/#standard-packet

        let mut header: Bytes = self.stream.read(4).await?;

        let packet_size = header.get_uint_le(3) as usize;
        let sequence_id = header.get_u8();

        self.sequence_id = sequence_id.wrapping_add(1);

        let payload: Bytes = self.stream.read(packet_size).await?;
        // BUG BUG BUG !!! if the above read never returns, the stream is now in a weird state as the packet header has been thrown out

        // TODO: packet compression
        // TODO: packet joining

        if payload[0] == 0xff {
            self.busy = Busy::NotBusy;

            // instead of letting this packet be looked at everywhere, we check here
            // and emit a proper Error
            return Err(
                MySqlDatabaseError(ErrPacket::decode_with(payload, self.capabilities)?).into(),
            );
        }

        Ok(Packet(payload))
    }

and here is it in next (likely 0.6):

    async fn read_packet_async<'de, T>(&'de mut self) -> Result<T>
    where
        T: Deserialize<'de, Capabilities>,
    {
        // https://dev.mysql.com/doc/internals/en/mysql-packet.html
        self.stream.read_async(4).await?;

        let payload_len: usize = self.stream.get(0, 3).get_uint_le(3) as usize;

        // FIXME: handle split packets
        assert_ne!(payload_len, 0xFF_FF_FF);

        self.sequence_id = self.stream.get(3, 1).get_u8();

        self.stream.read_async(4 + payload_len).await?;

        // wait until AFTER our last await to start telling the stream that we actually are done with these bytes

        self.stream.consume(4);
        let payload = self.stream.take(payload_len);

        if payload[0] == 0xff {
            // if the first byte of the payload is 0xFF and the payload is an ERR packet
            let err = ErrPacket::deserialize_with(payload, self.capabilities)?;
            return Err(Error::Connect(Box::new(MySqlDatabaseError(err))));
        }

        T::deserialize_with(payload, self.capabilities)
    }