pika: Many messages were not published after the program exit

Next snippet shows one example used to test other stuff not related with this issue, but I came across with some issue because of that. Using the next script not all messages are at last published.

import pika

# publish one message in each queue, the order doesn't matter
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
for i in range(0, 1000):
    channel.basic_publish('test.concurrence',
                          'test.{}'.format(i),
                          "%1.5f" % time.time(),
                          pika.BasicProperties(content_type='text/plain', delivery_mode=1))

It just publishes 1K messages and finishes the program. When I ran this code and I tried to find out if all messages were delivery to each queue - where each queue is bound to the exchange using the topic test.#num_queue - I realized that not all messages where published. Next snippet shows the command used to check that:

$ python publisher.py
$ ./rabbitmqadmin.py list queues name messages | grep "test." | grep " 0 " | wc -l
37
$ ./purge.sh
$ ./rabbitmqadmin.py list queues name messages | grep "test." | grep " 0 " | wc -l
1000
$ python publisher.py
$ ./rabbitmqadmin.py list queues name messages | grep "test." | grep " 0 " | wc -l
17

The number of queues that have no a message were randomly, and that point let me thought that it would be a bug in Pika.

Then I changed the previous code and I added a kind of synchronization method where it tries to get the process alive for an amount time using the sleep command. Next snippet shows this change:

import pika
import time

# publish one message in each queue, the order doesn't matter
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
for i in range(0, 1000):
    channel.basic_publish('test.concurrence',
                          'test.{}'.format(i),
                          "%1.5f" % time.time(),
                          pika.BasicProperties(content_type='text/plain', delivery_mode=1))

time.sleep(10)

After that I ran the code again, the queues were purged properly, and all messages were published rightly as you can see into the next output command:

$ python publisher.py
$ ./rabbitmqadmin.py list queues name messages | grep "test." | grep " 0 " | wc -l
0
$ ./purge.sh
$ python publisher.py
$ ./rabbitmqadmin.py list queues name messages | grep "test." | grep " 0 " | wc -l
0

About this issue

  • Original URL
  • State: closed
  • Created 9 years ago
  • Comments: 23 (14 by maintainers)

Most upvoted comments

@pfreixes, here is what must be happening, and it’s not a bug in pika: When you publish a bunch of messages without publisher acknowledgements, they get queued up in the socket buffer; after the app exits, the operating system (kernel) continues to deliver the content of the socket buffer to the destination (RabbitMQ broker); it will take some time to flush the socket buffer.

Try calling channel.close() instead of time.sleep(10); the channel.close() call will synchronously flush the socket buffer and complete when Channel.CloseOK is received from the broker.