pika: frame_error on basic_publish

I have code which does little more than publishing messages to a durable topic exchange:

channel.basic_publish(
            exchange=self.exchange,
            routing_key=routing_key,
            body=body,
            properties=pika.BasicProperties(delivery_mode=2))

This seems to result in frame_errors frequently. The rabbitmq server log says:

AMQP connection <0.2410.2> (running), channel 1 - error:
{amqp_error,frame_error,
            "type 3, first 16 octets = <<\"{\\\"uuid\\\": \\\"8d0c6f\">>: {invalid_frame_end_marker,\n                                                      115}",
            none}

This is using pika 0.9.13.

About this issue

  • Original URL
  • State: closed
  • Created 11 years ago
  • Comments: 44 (13 by maintainers)

Commits related to this issue

Most upvoted comments

Hi

We ran into this issue too using pika master from 2014/03/10 and rabbitmq 3.2.4. I was able to capture a tcpdump of the bad frame, examining the dump shows that a partial frame is written then another frame is getting written. This is what caused rabbitmq to throw the FRAME_ERROR because of a invalid_frame_end_marker.

I was able to track down the bug to the following code in pika/adapters/base_connection.py

   def _handle_write(self):
        """Handle any outbound buffer writes that need to take place."""
        total_written = 0
        if self.outbound_buffer:
            frame = self.outbound_buffer.popleft()
            while total_written < len(frame):
                try:
                    total_written += self.socket.send(frame[total_written:])
                except socket.timeout:
                    raise
                except socket.error as error:
                    return self._handle_error(error)
        return total_written

There are actually multiple bugs with this code that writes data to the socket. (For folks that don’t know pika has a default socket timeout of 250ms)

  1. On a blocking socket the socket buffer could be full or near full causing the send() to block, but then you hit the 250ms timeout. This results in only part of the frame being written to the socket, then raises the socket.timeout (which is caught within pika and somewhat ignored). If there are more frames that need to be written pika will continue to call _handle_write(), each time trying to write out the next frame. However rabbitmq is going to kill the connection from the partially written frame.
  2. On a non-blocking socket the socket buffer could be full or near full causing the send() to return an EAGAIN or EWOULDBLOCK socket error to let you know. However for any socket error code its doing a ‘return self._handle_error(error)’. This puts you in the same boat as the blocking socket with a partially written frame. The _handle_error() function has code to ignore these non-error socket errors, but its pointless since its doing a ‘return’ on the function call.

I believe both of these can also result in a stalled connection that we have seen a couple times. If you think of the situation where the last body frame is the one that not fully written. rabbitmq is waiting on the rest of the last body frame to come over the wire, but pika thinks it already sent it and is waiting on rabbitmq to reply about the message as a whole.

I think all adapters except for the evp one use a blocking socket.

The blocking adapter is most prone to the error since its trying to send frames as fast as possible. While select/tornado/twisted adapters will at least verify the socket it writable before trying to send a frame. However the socket being writable doesn’t guarantee you can send() an entire frame without blocking.

Knowing the issue makes is pretty trivial to reproduce. Have your pika client on a different machine from the rabbitmq server, setup a network rate limit to restrict the throughput between them, then attempt to send a large message using the blocking adapter.

One thing folks can do as a stop gap until a fix is made it to pass in ‘socket_timeout=10’ on your pika.ConnectionParameters(). This will allow the send() to block for up to 10 seconds before triggering the timeout. At least for us if rabbitmq can’t make space in its tcp buffer in that long something is likely truly wrong.