pika: Socket Error: 104 consuming messages with task that take a long time
As in #418 I have problem consuming messages with task that take a long time. I am using git master setting heartbeat_interval=0.
My code is very similar to http://pika.readthedocs.io/en/0.10.0/examples/asynchronous_consumer_example.html but my on_message method take some minutes to “consume” the message.
after some time consuming message I get:
INFO 2016-05-27 16:28:51,083 __main__ connect 101 : Connecting to localhost
INFO 2016-05-27 16:28:51,084 pika.adapters.base_connection _create_and_connect_to_socket 216 : Connecting to ::1:5672
INFO 2016-05-27 16:28:51,089 __main__ on_connection_open 117 : Connection opened
INFO 2016-05-27 16:28:51,089 __main__ add_on_connection_close_callback 126 : Adding connection close callback
INFO 2016-05-27 16:28:51,089 __main__ open_channel 169 : Creating a new channel
INFO 2016-05-27 16:28:51,092 __main__ on_channel_open 181 : Channel opened
INFO 2016-05-27 16:28:51,092 __main__ add_on_channel_close_callback 193 : Adding channel close callback
INFO 2016-05-27 16:28:51,092 __main__ start_consuming 223 : Issuing consumer related RPC commands
INFO 2016-05-27 16:28:51,092 __main__ add_on_cancel_callback 233 : Adding consumer cancellation callback
INFO 2016-05-27 16:28:51,102 __main__ on_message 263 : Received message # 1 from None
INFO 2016-05-27 16:43:45,104 __main__ run 72 : Done
INFO 2016-05-27 16:43:45,883 __main__ acknowledge_message 312 : Acknowledging message 1
ERROR 2016-05-27 16:43:45,884 pika.adapters.base_connection _handle_error 362 : Socket Error: 104
INFO 2016-05-27 16:43:45,884 pika.connection _on_terminate 1891: Disconnected from RabbitMQ at localhost:5672 (-1): error(104, 'Connection reset by peer')
WARNING 2016-05-27 16:43:45,885 __main__ on_channel_closed 209 : Channel 1 was closed: (-1) error(104, 'Connection reset by peer')
WARNING 2016-05-27 16:43:45,886 pika.connection close 1135: Suppressing close request on <SelectConnection CLOSED socket=None params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>
WARNING 2016-05-27 16:43:45,887 __main__ on_connection_closed 144 : Connection closed, reopening in 5 seconds: (-1) error(104, 'Connection reset by peer')
INFO 2016-05-27 16:43:50,892 __main__ connect 101 : Connecting to localhost
On the log you can see function and line before meaage. I have tried to execute my task in thread maintaining the amqp comunication in the main process with:
self._connection.ioloop.stop()
while (thread.isAlive()):
#self._connection.ioloop.poll(write_only=True)
#self._connection.ioloop.poll()
time.sleep(3)
write_only=True was available in previous version self._connection.ioloop.poll() create a recursion call with a lot of thread and problems self._connection.ioloop.start()
So I don’t have solution, I cannot consume messages and all is stalled. Is possible we need a (background) poll to maintain socket and do not get new messages from queue?
About this issue
- Original URL
- State: closed
- Created 8 years ago
- Comments: 25 (11 by maintainers)
Commits related to this issue
- blocking_adapter: only consider processable events In process_data_events() the common_terminator that is passed to _flush_output() was also set to true for events that could not be processed because... — committed to westphahl/pika by westphahl 7 years ago
- queue_processors: Set a bounded prefetch size on rabbitmq queues. RabbitMQ clients have a setting called prefetch[1], which controls how many un-acknowledged events the server forwards to the local q... — committed to alexmv/zulip by alexmv 3 years ago
- queue_processors: Set a bounded prefetch size on rabbitmq queues. RabbitMQ clients have a setting called prefetch[1], which controls how many un-acknowledged events the server forwards to the local q... — committed to alexmv/zulip by alexmv 3 years ago
- queue_processors: Set a bounded prefetch size on rabbitmq queues. RabbitMQ clients have a setting called prefetch[1], which controls how many un-acknowledged events the server forwards to the local q... — committed to alexmv/zulip by alexmv 3 years ago
- queue_processors: Set a bounded prefetch size on rabbitmq queues. RabbitMQ clients have a setting called prefetch[1], which controls how many un-acknowledged events the server forwards to the local q... — committed to alexmv/zulip by alexmv 3 years ago
- queue_processors: Set a bounded prefetch size on rabbitmq queues. RabbitMQ clients have a setting called prefetch[1], which controls how many un-acknowledged events the server forwards to the local q... — committed to alexmv/zulip by alexmv 3 years ago
- queue_processors: Set a bounded prefetch size on rabbitmq queues. RabbitMQ clients have a setting called prefetch[1], which controls how many un-acknowledged events the server forwards to the local q... — committed to alexmv/zulip by alexmv 3 years ago
- queue_processors: Set a bounded prefetch size on rabbitmq queues. RabbitMQ clients have a setting called prefetch[1], which controls how many un-acknowledged events the server forwards to the local q... — committed to alexmv/zulip by alexmv 3 years ago
- queue_processors: Set a bounded prefetch size on rabbitmq queues. RabbitMQ clients have a setting called prefetch[1], which controls how many un-acknowledged events the server forwards to the local q... — committed to zulip/zulip by alexmv 3 years ago
- queue_processors: Set a bounded prefetch size on rabbitmq queues. RabbitMQ clients have a setting called prefetch[1], which controls how many un-acknowledged events the server forwards to the local q... — committed to danielpyon/zulip by alexmv 3 years ago
I am going to close this as everything is working as expected -
The reproducer code opens a connection and channel to RabbitMQ, but does not specify
basic.qos. This indicates to RabbitMQ that there should be no limit to the number of messages it can send to this client without acknowledgement. In addition,no_ackisFalse(the default setting) meaning that RabbitMQ will expect acknowledgement of every message delivered before de-queuing it.In this scenario, the TCP connection can be thought of as a big pipe with no limits other than it’s size (i.e. buffer sizes) on how much data can be pushed through it. So, RabbitMQ keeps sending data to the client while it sleeps after receiving the first message. Depending on message size and total count, the TCP “pipe” is big enough to hold all of these interim messages so RabbitMQ does not time out during the send operations. When message size and count is big enough to fill all buffers, RabbitMQ blocks on the send and eventually times out after 30 seconds which is what you see in the logs (
{writer,send_failed,{error,timeout}}). The connection is closed and RabbitMQ re-tries sending all of those messages because none were acknowledged prior to the timeout.This is one reason why it is critical to use
channel.basic_qos(prefetch_count=N). That will limit the number of unacknowledged messages RabbitMQ will send to that particular client and will prevent the TCP send timeouts. This is also why @eandersson’s comment about other client libraries applies since this not limited to Pika.basic.qosdocslooking at: http://stackoverflow.com/questions/35438843/rabbitmq-error-timeout
adding
before
solve the problem. Note that I have more than 1000 messages in queue and some are not little.
I think this configuration/example should be published somewhere as template for a very common user case application.
Also a ioloop heartbeat poll method should be useful for do not disable heartbeat and execute only consumer task in thread.
You can evaluate to close this issue.
As written some time ago the use of self._channel.basic_qos(prefetch_count=1) should be published somewhere as template for a very common user case application.
Also a ioloop heartbeat poll method should be useful for do not disable heartbeat and execute only consumer task in thread.
moving from time.sleep() to the busy wait:
worked for me 😃
To be clear this do not work: https://gist.github.com/pat1/4017d6565501b657731560af3d2e0b9e/3dc43a75c874d76ecdd1412f7b91c06febd848e1
this work: https://gist.github.com/pat1/4017d6565501b657731560af3d2e0b9e/7246b17d7b4b883d9ff68e53dc65657cdd2eedde
@vitaly-krugl I do not have a script to populates the queue, I think you can fill it with 1000 messages of 100k size
add_callback_threadsafein pull request #956 might help with this. See this example