pika: Socket Error: 104 consuming messages with task that take a long time

As in #418 I have problem consuming messages with task that take a long time. I am using git master setting heartbeat_interval=0.

My code is very similar to http://pika.readthedocs.io/en/0.10.0/examples/asynchronous_consumer_example.html but my on_message method take some minutes to “consume” the message.

after some time consuming message I get:

INFO 2016-05-27 16:28:51,083 __main__ connect  101 : Connecting to localhost
INFO 2016-05-27 16:28:51,084 pika.adapters.base_connection _create_and_connect_to_socket  216 : Connecting to ::1:5672
INFO 2016-05-27 16:28:51,089 __main__ on_connection_open  117 : Connection opened
INFO 2016-05-27 16:28:51,089 __main__ add_on_connection_close_callback  126 : Adding connection close callback
INFO 2016-05-27 16:28:51,089 __main__ open_channel  169 : Creating a new channel
INFO 2016-05-27 16:28:51,092 __main__ on_channel_open  181 : Channel opened
INFO 2016-05-27 16:28:51,092 __main__ add_on_channel_close_callback  193 : Adding channel close callback
INFO 2016-05-27 16:28:51,092 __main__ start_consuming  223 : Issuing consumer related RPC commands
INFO 2016-05-27 16:28:51,092 __main__ add_on_cancel_callback  233 : Adding consumer cancellation callback
INFO 2016-05-27 16:28:51,102 __main__ on_message  263 : Received message # 1 from None
INFO 2016-05-27 16:43:45,104 __main__ run  72  : Done
INFO 2016-05-27 16:43:45,883 __main__ acknowledge_message  312 : Acknowledging message 1
ERROR 2016-05-27 16:43:45,884 pika.adapters.base_connection _handle_error  362 : Socket Error: 104
INFO 2016-05-27 16:43:45,884 pika.connection _on_terminate  1891: Disconnected from RabbitMQ at localhost:5672 (-1): error(104, 'Connection reset by peer')
WARNING 2016-05-27 16:43:45,885 __main__ on_channel_closed  209 : Channel 1 was closed: (-1) error(104, 'Connection reset by peer')
WARNING 2016-05-27 16:43:45,886 pika.connection close  1135: Suppressing close request on <SelectConnection CLOSED socket=None params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>
WARNING 2016-05-27 16:43:45,887 __main__ on_connection_closed  144 : Connection closed, reopening in 5 seconds: (-1) error(104, 'Connection reset by peer')
INFO 2016-05-27 16:43:50,892 __main__ connect  101 : Connecting to localhost

On the log you can see function and line before meaage. I have tried to execute my task in thread maintaining the amqp comunication in the main process with:

self._connection.ioloop.stop()
while (thread.isAlive()):
            #self._connection.ioloop.poll(write_only=True)
            #self._connection.ioloop.poll()                                                                                               
            time.sleep(3)

write_only=True was available in previous version self._connection.ioloop.poll() create a recursion call with a lot of thread and problems self._connection.ioloop.start()

So I don’t have solution, I cannot consume messages and all is stalled. Is possible we need a (background) poll to maintain socket and do not get new messages from queue?

About this issue

  • Original URL
  • State: closed
  • Created 8 years ago
  • Comments: 25 (11 by maintainers)

Commits related to this issue

Most upvoted comments

I am going to close this as everything is working as expected -

The reproducer code opens a connection and channel to RabbitMQ, but does not specify basic.qos. This indicates to RabbitMQ that there should be no limit to the number of messages it can send to this client without acknowledgement. In addition, no_ack is False (the default setting) meaning that RabbitMQ will expect acknowledgement of every message delivered before de-queuing it.

In this scenario, the TCP connection can be thought of as a big pipe with no limits other than it’s size (i.e. buffer sizes) on how much data can be pushed through it. So, RabbitMQ keeps sending data to the client while it sleeps after receiving the first message. Depending on message size and total count, the TCP “pipe” is big enough to hold all of these interim messages so RabbitMQ does not time out during the send operations. When message size and count is big enough to fill all buffers, RabbitMQ blocks on the send and eventually times out after 30 seconds which is what you see in the logs ({writer,send_failed,{error,timeout}}). The connection is closed and RabbitMQ re-tries sending all of those messages because none were acknowledged prior to the timeout.

This is one reason why it is critical to use channel.basic_qos(prefetch_count=N). That will limit the number of unacknowledged messages RabbitMQ will send to that particular client and will prevent the TCP send timeouts. This is also why @eandersson’s comment about other client libraries applies since this not limited to Pika.

basic.qos docs

looking at: http://stackoverflow.com/questions/35438843/rabbitmq-error-timeout

adding

self._channel.basic_qos(prefetch_count=1)

before

self._consumer_tag = self._channel.basic_consume(self.on_message,queue)

solve the problem. Note that I have more than 1000 messages in queue and some are not little.

I think this configuration/example should be published somewhere as template for a very common user case application.

Also a ioloop heartbeat poll method should be useful for do not disable heartbeat and execute only consumer task in thread.

You can evaluate to close this issue.

As written some time ago the use of self._channel.basic_qos(prefetch_count=1) should be published somewhere as template for a very common user case application.

Also a ioloop heartbeat poll method should be useful for do not disable heartbeat and execute only consumer task in thread.

moving from time.sleep() to the busy wait:

def _safe_sleep(duration):
    deadline = time() + duration
    time_limit = duration

    while True:
        time_limit = deadline - time()

        if time_limit <= 0:
            break

worked for me 😃

To be clear this do not work: https://gist.github.com/pat1/4017d6565501b657731560af3d2e0b9e/3dc43a75c874d76ecdd1412f7b91c06febd848e1

this work: https://gist.github.com/pat1/4017d6565501b657731560af3d2e0b9e/7246b17d7b4b883d9ff68e53dc65657cdd2eedde

@vitaly-krugl I do not have a script to populates the queue, I think you can fill it with 1000 messages of 100k size

add_callback_threadsafe in pull request #956 might help with this. See this example