pika: Blocking connection hanging on basic_consume when using a pacemaker

I’ve already posted this in a closed thread but I’m not sure if that’s the correct way to do these things so I’m asking again here. I’ve created a little library to bind some processing function to a rabbitMQ queue and then perform that processing once something is placed on the queue. The problem I had is that sometimes the execution of the code is sometimes longer than the heartbeat so I added a pacemaker as described here. The code for this pacemaker is below:-

class heart_runner():
    
    def __init__(self, connection):
        
        if "blockingconnection" not in str(connection).lower() and "mock" not in str(connection).lower():
            raise Exception("Heartbeat runner requires a connection to rabbitmq as connection, actually has %s, a %s"%(str(connection), type(connection)))
        
        if connection.is_open == False:
            raise Exception("Heart runner's connection to rabbitmq should be open, is actually closed")
        
        self.connection = connection
        self.internal_lock = threading.Lock()
    

    def _process_data_events(self):
            """Check for incoming data events.
            We do this on a thread to allow the flask instance to send
            asynchronous requests.
            It is important that we lock the thread each time we check for events.
            """
    
            while True:
                with self.internal_lock:
                    self.connection.process_data_events()
                    #This is how often to run the pacemaker
                    time.sleep(0.1)

and then ran it in a separate thread using

    #setup the pacemaker
    pacemaker = heart_runner(connection)

    #set up this deamon thread in order to avoid everything dying due to a heartbeat timeout
    thread = threading.Thread(target=pacemaker._process_data_events)
    thread.setDaemon(True)
    thread.start()

but I found when digesting large number of messages the thread would often hang on basic_consume. When I used ctrl+C to stop it the resulting error read:-

Traceback (most recent call last):
  File "./example_binding_main.py", line 121, in <module>
    bind(main, "example-function")
  File "/nanowire_plugin.py", line 287, in bind
    input_channel.start_consuming()
  File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1780, in start_consuming
    self.connection.process_data_events(time_limit=None)
  File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 707, in process_data_events
    self._flush_output(common_terminator)
  File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 455, in _flush_output
    self._impl.ioloop.poll()
  File "/usr/local/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 245, in poll
    self._poller.poll()
  File "/usr/local/lib/python3.6/site-packages/pika/adapters/select_connection.py", line 923, in poll
    events = self._poll.poll(self._get_next_deadline())
KeyboardInterrupt

Since I figured that the process_data_events() function must be causing the timeout I decided to add in a timeout of ten seconds, connection.process_data_events(10) however this lead to the error:-

     bind(main, "example-function")
  File "/binding_function_to_queue.py", line 287, in bind
    input_channel.start_consuming()
  File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1775, in start_consuming
    'start_consuming may not be called from the scope of '
pika.exceptions.RecursionError: start_consuming may not be called from the scope of another BlockingConnection or BlockingChannel callback 

Is there a way to either prevent the hanging problem or else to add a timeout that will trigger when this happens? Sorry if this question is in the wrong place

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 15 (8 by maintainers)

Most upvoted comments

@lukebakken, I think this one is safe to close. The approved mechanism add_callback_threadsafe is now available on all supported I/O loops in master.