kombu: SQS Messages not being ack'd/deleted

I have a custom consumer reading off of an SQS queue. It is written as follows:

import logging

from django.conf import settings
from django.core.management.base import BaseCommand
from kombu import Queue
from kombu.async import Hub, set_event_loop
from kombu.mixins import ConsumerMixin

from tasks.celery import app

logger = logging.getLogger(__name__)


class Worker(ConsumerMixin):

    def __init__(self, connection, queues):
        self.connection = connection
        self.queues = queues
        super(Worker, self).__init__()
        logger.info("Started worker %r for queues %r", self, self.queues)

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=self.queues,
                         accept=['pickle', 'json'],
                         callbacks=[self.process_task])]

    def process_task(self, body, message):
        logger.info('Processing message: %r', body)
        try:
            # Do things
            pass
        finally:
            logger.info("ACKing message %r", message)
            message.ack()
            print('ack\'d')


class Command(BaseCommand):  # This is a Django command
    help = "Sync task and result messages with database."

    def add_arguments(self, parser):
        parser.add_argument('--queue', '-q', default=settings.PLATFORM_QUEUE)

    def handle(self, queue, *args, **options):
        set_event_loop(Hub())
        with app.connection() as conn:
            try:
                logger.info("Launching worker")
                worker = Worker(conn, queues=[Queue(queue)])
                worker.run()
            except KeyboardInterrupt:
                print('bye bye')

A root logger sends all logs to stdout. Reviewing the stdout, I can see that the requests to delete a message is generated, however it does not appear to ever be sent to AWS:

2017-06-24 10:13:56,611 tasks.consumer: INFO     ACKing message <Message object at 0x7fb07fc96168 with details {'properties': {'correlation_id': 'b62c944f-7811-438b-949e-7f9e598a8c44'}, 'body_length': 77, 'content_type': 'application/json', 'delivery_info': {'routing_key': 'export', 'exchange': 'task_exchange'}, 'state': 'RECEIVED', 'delivery_tag': 'AQEBqrM3jZ2n1CUKEmGiXms9Ro3efS+CgZ/KzAC1qRXwWbOiZQTXVXP1eyod6xzitfYE8OrcsmwVnJwfzMNOWsqn09iSIbvfK3WvkX0YN+pH81rSOOvx0RyKGLPwTzardlbqkQJb4LaNj15Q2OeRF9BlpQJ3gpVeO2feW23ZXaJ7+fzmduOXutW44IxFg8Sx4mXBZ0ieR84G01lDp3ReFl9nVpumfPGQvRqDDp+wVe6gN8NIYER3LV5PD8u+eUIbULwhNh6qKmLsxy4F7cxDkap1+6ueAoytE3fkvHD+eUdj7Lg='}>
2017-06-24 10:13:57,869 botocore.auth: DEBUG    Calculating signature using v4 auth.
2017-06-24 10:13:57,870 botocore.auth: DEBUG    CanonicalRequest:
GET
/12345/dev-platform.fifo
Action=DeleteMessage&ReceiptHandle=AQEBqrM3jZ2n1CUKEmGiXms9Ro3efS%2BCgZ%2FKzAC1qRXwWbOiZQTXVXP1eyod6xzitfYE8OrcsmwVnJwfzMNOWsqn09iSIbvfK3WvkX0YN%2BpH81rSOOvx0RyKGLPwTzardlbqkQJb4LaNj15Q2OeRF9BlpQJ3gpVeO2feW23ZXaJ7%2BfzmduOXutW44IxFg8Sx4mXBZ0ieR84G01lDp3ReFl9nVpumfPGQvRqDDp%2BwVe6gN8NIYER3LV5PD8u%2BeUIbULwhNh6qKmLsxy4F7cxDkap1%2B6ueAoytE3fkvHD%2BeUdj7Lg%3D
host:us-west-2.queue.amazonaws.com
x-amz-date:20170624T101357Z

host;x-amz-date
e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
2017-06-24 10:13:57,875 botocore.auth: DEBUG    StringToSign:
AWS4-HMAC-SHA256
20170624T101357Z
20170624/us-west-2/sqs/aws4_request
c674918d8890a427b39cf31b211c7e089d1de7a2c077825768f7c4625200aeb1
2017-06-24 10:13:57,877 botocore.auth: DEBUG    Signature:
9fc700844cac343ce92bd476249de09c6bef79e94ec5b2b9d880e825701d710c
ack'd
2017-06-24 10:13:59,786 botocore.endpoint: DEBUG    Making request for OperationModel(name=ReceiveMessage) (verify_ssl=True) with params: {'method': 'POST', 'body': {'Version': '2012-11-05', 'QueueUrl': 'https://us-west-2.queue.amazonaws.com/12345/dev-platform.fifo', 'WaitTimeSeconds': 20, 'Action': 'ReceiveMessage', 'MaxNumberOfMessages': 10}, 'query_string': '', 'headers': {'User-Agent': 'Boto3/1.4.1 Python/3.5.2 Linux/3.13.0-112-generic Botocore/1.4.80'}, 'url_path': '/', 'context': {'has_streaming_input': False, 'client_config': <botocore.config.Config object at 0x7fb082846518>, 'client_region': 'us-west-2'}, 'url': 'https://us-west-2.queue.amazonaws.com/'}

Inserting a breakpoint above message.ack(), I can see that getresponse() method of the AsyncHTTPSConnection class creates a request and adds that request the the PyCurl CurlClient instance. As said above, it appears that a web request request is never actually made to AWS.

Any ideas as to why this would be? I don’t really understand how the Hub object works, I only added it to get around the issue described in #746. Perhaps I need to give the hub instance a push to have it process the PyCurl requests?

As mentioned in #737, it feels like it would be nice to drop the PyCurl requirement in favor of Boto3 or Requests, however implementing that seems a bit beyond me at the moment.

About this issue

  • Original URL
  • State: open
  • Created 7 years ago
  • Reactions: 1
  • Comments: 18 (11 by maintainers)

Most upvoted comments

This still happening in 2018, this makes the use of Celery SQS broker causing tasks to be executed repeatedly forever, once they reach the visibility timeout.

Yes, @mikicz, I think this issue is still occurring. At my company also, we repeatedly see inexplicable issues with messages being stuck on the celery daemon without being ack’ed to sqs and I think the issue is a result of this.

I think this is still happening? I have tasks that get repeatedly executed over and over again and I think it’s because of this

@rohan-mo I believe it was 4.2.1. Like I wrote above, I used a hack and moved on.