pika: Acknowledging a single message from a different thread may fail or get stuck

According to the documents I’ve seen, ack’ing a single message from a different thread should be OK. However, my code generates errors, which seems to indicate there is a bug somewhere.

The minimal code that can reproduce the error is like the following (simulating my real logic but removed irrelevant code:

#!/usr/bin/env python3

import json
import pika
import queue
import signal
import threading

send_queue = None
send_channel = None
consumer_tag = None
access_tokens = {}
exit_event = threading.Event()
total_count = 10000

rabbitmq_credentials = pika.PlainCredentials('guest', 'guest')
rabbitmq_host = 'localhost'
rabbitmq_port = 5672
rabbitmq_queue = 'test_queue'


def stop_consume():
    global consumer_tag
    if consumer_tag is not None:
        send_channel.basic_cancel(consumer_tag)
        consumer_tag = None


def send_from_queue():
    is_ending = False
    while not exit_event.is_set() and not is_ending:
        count = 0
        ack_list = []
        while count < 100 and not send_queue.empty():
            message = send_queue.get(block=False)
            ack_list.append(message['delivery_tag'])
            count += 1
            global total_count
            total_count -= 1

        if ack_list:
            print('Acking messages')
            for tag in ack_list:
                print('Acking', tag)
                send_channel.basic_ack(delivery_tag=tag)
            print('Acking done')

        if send_queue.empty():
            if total_count <= 0:
                exit_event.set()
                stop_consume()
            exit_event.wait(0.25)


def rabbitmq_callback(ch, method, properties, body):
    try:
        message = json.loads(body.decode())
        message['delivery_tag'] = method.delivery_tag
        send_queue.put(message)
    except ValueError:
        print('Non-JSON message encountered:', body)
        ch.basic_ack(delivery_tag=method.delivery_tag)


def handle_signal(signum, stack):
    exit_event.set()

    global consumer_tag
    if consumer_tag is not None:
        print('Signal', signum,
              'is received.  Stopped receiving requests...')
        stop_consume()


def repeat_send(channel, tries):
    message = {
        "id": 0,
        "content": ""
    }
    for i in range(tries):
        message['id'] = i
        message['content'] = 'Hello World' + str(i)
        channel.basic_publish(exchange='',
                              routing_key=rabbitmq_queue,
                              body=json.dumps(message),
                              properties=pika.BasicProperties(
                                  delivery_mode=2,  # make message persistent
                              ))


def main():
    signal.signal(signal.SIGTERM, handle_signal)

    global send_queue
    global send_channel
    global consumer_tag

    connection = pika.BlockingConnection(pika.ConnectionParameters(
        rabbitmq_host, rabbitmq_port, '/', rabbitmq_credentials))
    send_channel = connection.channel()
    send_channel.queue_declare(queue=rabbitmq_queue,
                               durable=True)

    repeat_send(send_channel, total_count)

    send_queue = queue.Queue(10000)
    send_thread = threading.Thread(target=send_from_queue)
    send_thread.start()

    send_channel.basic_qos(prefetch_count=10)
    consumer_tag = send_channel.basic_consume(
        rabbitmq_callback, queue=rabbitmq_queue)
    send_channel.start_consuming()


signal.signal(signal.SIGINT, handle_signal)

if __name__ == "__main__":
    main()

A sample error generated on macOS Sierra 10.12.6 + Anaconda Python 3.6.2 + pika 0.11.2 + rabbitmq 3.7.3:

…
Acking messages
Acking 9354
Acking 9355
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/Users/yongwei/anaconda/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/yongwei/anaconda/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "./test_pika.py", line 53, in send_from_queue
    send_channel.basic_ack(delivery_tag=tag)
  File "/Users/yongwei/anaconda/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1988, in basic_ack
    self._flush_output()
  File "/Users/yongwei/anaconda/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1250, in _flush_output
    *waiters)
  File "/Users/yongwei/anaconda/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 455, in _flush_output
    self._impl.ioloop.poll()
  File "/Users/yongwei/anaconda/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 245, in poll
    self._poller.poll()
  File "/Users/yongwei/anaconda/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 816, in poll
    self._dispatch_fd_events(fd_event_map)
  File "/Users/yongwei/anaconda/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 625, in _dispatch_fd_events
    handler(fileno, events)
  File "/Users/yongwei/anaconda/lib/python3.6/site-packages/pika/adapters/base_connection.py", line 392, in _handle_events
    self._manage_event_state()
  File "/Users/yongwei/anaconda/lib/python3.6/site-packages/pika/adapters/base_connection.py", line 514, in _manage_event_state
    self.ioloop.update_handler(self.socket.fileno(), self.event_state)
  File "/Users/yongwei/anaconda/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 194, in update_handler
    self._poller.update_handler(fileno, events)
  File "/Users/yongwei/anaconda/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 401, in update_handler
    events_to_set=events_set)
  File "/Users/yongwei/anaconda/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 874, in _modify_fd_events
    self._kqueue.control(kevents, 0)
BlockingIOError: [Errno 36] Operation now in progress

The number of successful acknowledgements before the error is not deterministic. I’ve seen something like 3321, but it may go on to 10000 without any problems.

Actually I first encountered a similar problem (‘NoneType’ object has no attribute ‘body_size’) in a CentOS 7 Docker container + Python 3.6.4 + pika 0.11.2 (running on an old CentOS 6.9 box) + rabbitmq-server 3.1.5. I have not been able to reproduce the error again, but I am seeing apparent pauses while acknowledging messages, sometimes like being stuck forever.

I have not been able to reproduce such problem on a vanilla CentOS 7 system.

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 30 (18 by maintainers)

Most upvoted comments

@stugots - yes it applies to all operations.

Please give Pika 0.12.0b2 a try, which contains #956 and allows methods to be executed on the correct thread via add_callback_threadsafe.

Pika 0.12.0b2 can be installed with this command:

pip install pika --pre

Thank you!

Beta will be announced on the rabbitmq-users google group and available from pypi. However, once desired changes - like #956 - are merged into master, you can always try pika from the master branch (at your own peril 😃.

It’s not thread-safe to ACK directly from another thread in pika.

I believe @lukebakken is targeting #956 for the 1.0.0 milestone. I don’t know if any stable releases are planned before then.