symfony: [Messenger] Could not acknowledge redis message with BatchHandlerInterface

Symfony version(s) affected

5.4.0, 6.0.0

How to reproduce

I have MessengerHandler which makes http requests (with only one parallel consumer):

messenger.yaml:

transports:
  test:
    dsn: '%env(MESSENGER_TRANSPORT_DSN)%/test'
    options:
      delete_after_ack: true
      consumer: '%env(MESSENGER_CONSUMER_NAME)%'
    retry_strategy:
      max_retries: 0

routing:
     'TestMessage': test

TestMessageHandler.php:

<?php
class TestMessageHandler implements MessageHandlerInterface, BatchHandlerInterface
{
    use BatchHandlerTrait;

    public function __construct(
        private HttpClientInterface $client
    ) { }

    public function __invoke(TestMessage $message, Acknowledger &$ack = null)
    {        
        return $this->handle($message, $ack);
    }
    
    private function shouldFlush(): bool
    {
        return 5 <= \count($this->jobs);
    }

    private function process(array $jobs): void
    {
        $responses = [];
        
        foreach ($jobs as [$job, $ack]) {          
            try {
                [$headers, $content] = prepareRequest();
                
                $responses[] = $this->client->request('POST', $job->getEndpoint(), [
                    'headers' => $headers,
                    'body' => $content
                ]);

                $ack->ack($job);
            } catch (\Exception $e) {
                $ack->nack($e);
            }
        }
        
        if(0 === count($responses)) {
          return;
        }

        foreach ($this->client->stream($responses) as $response => $chunk) {
            if ($chunk->isFirst()) {
                var_dump($response->getStatusCode());
            } else if ($chunk->isLast()) {
            }
        }
    }
}

cli:

MESSENGER_CONSUMER_NAME=test php bin/console messenger:consume test

Code works okay, but when i hit ctrl+c before script finished and start it again, it throws me an error:

In Connection.php line 441:

  Could not acknowledge redis message "1638374074049-0".

I noticed that after consumer crashes or i hit ctrl+c, same message comes to __invoke a few times. When removing BatchHandlerInterface it starts to work as excepected without any issue.

Possible Solution 1 (a bad one)

  1. Add uuid to every Message
  2. Destroy Acknowledger and create replacement for it (HandleMessageMiddleware.php:88)

Test.php:

class Test
{
  public function isAcknowledged()
  {
    return false;
  }
}

TestMessageHandler.php:

public function __invoke(SendPushMessage $message, Acknowledger &$ack = null)
{
    $uid = $message->getUuid();
    
    if(isset($this->jobs[$uid])) {
      $this->flush(true);
      
      try {
        $ack = null;
        unset($ack);
      } catch(\Exception) { }
      
      $ack = new Test();

      return 0;
    }
    
    return $this->handle($message, $ack);
}

private function handle(object $message, ?Acknowledger $ack)
{
    $uid = $message->getUuid();

    if (null === $ack) {
        $ack = new Acknowledger(get_debug_type($this));
        $this->jobs[$uid] = [$message, $ack];
        $this->flush(true);

        return $ack->getResult();
    }

    $this->jobs[$uid] = [$message, $ack];
    if (!$this->shouldFlush()) {
        return \count($this->jobs);
    }

    $this->flush(true);

    return 0;
}

Possible Solution 2

Comment __destruct in Symfony\Component\Messenger\Handler\Acknowledger.php

Handler.php:

public function __invoke(SendPushMessage $message, Acknowledger $ack = null)
{
    $uid = $message->getUuid();
    
    if(isset($this->jobs[$uid])) {
      $this->flush(true);
    
      return 0;
    }
    
    return $this->handle($message, $ack);
}

private function handle(object $message, ?Acknowledger $ack)
{
    $uid = $message->getUuid();

    if (null === $ack) {
        $ack = new Acknowledger(get_debug_type($this));
        $this->jobs[$uid] = [$message, $ack];
        $this->flush(true);

        return $ack->getResult();
    }

    $this->jobs[$uid] = [$message, $ack];
    if (!$this->shouldFlush()) {
        return \count($this->jobs);
    }

    $this->flush(true);

    return 0;
}

Additional Context

No response

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Reactions: 1
  • Comments: 23 (15 by maintainers)

Commits related to this issue

Most upvoted comments

@zip-fa since I’ve properly configured the Supervisor application to register to Redis on a unique DSN this issue seems resolved for me.

After translating this Russian blogpost by Алексей Альшенецкий I’ve came to the following configuration:

# messenger.yaml
parameters:
    env(CONSUMER_ID): '0'

framework:
    messenger:
        transports:
            async: "redis://%env(REDIS_HOST)%:%env(REDIS_PORT)%/messages/symfony/consumer-%env(CONSUMER_ID)%?auto_setup=true"
# supervisor.conf
[program:messenger-consume]
command=php /var/www/html/bin/console messenger:consume async
numprocs=10
autostart=true
autorestart=true
environment = CONSUMER_ID=%(process_num)d

Relevant details

  • You need to know that Symfony Messenger for working with Redis uses a special data type that appeared in Redis 5.0 - streams (Streams)
  • All consumers that we want to run in parallel must have a unique name so as not to conflict while “parsing” messages.
  • In the above example, symfony is the name of the consumer group. The auto-setup flag allows Redis to create it automatically.
  • In this group, we dynamically (based on the CONSUMER_ID environment variable) generate the name of the consumer so that each has a unique name (consumer-0, consumer-1, consumer-2, e.t.c).
  • Above, in parameters, we declare a default value for the CONSUMER_ID variable so that the application does not throw an error when parsing messenger.yaml if you use the same application image outside of the messenger:consume process.

@lermontex, sure.

To anyone that tries to use this crutch, this is a part of the full solution, you need to add UUID to your messages to remove duplicates. The decorator allows to ack and delete messages that don’t exist in the redis stream anymore (and you will have to do this, redis transport will try to feed your worker duplicate messages, i.e. the messages that are collected but not processed by the worker at the moment). Also take a look at my previous comment to check other issues that might affect you.

src/Messenger/Bridge/Redis/RedisTransportFactoryDecorator.php

<?php

declare(strict_types=1);

namespace App\Messenger\Bridge\Redis;

use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransport;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class RedisTransportFactoryDecorator implements TransportFactoryInterface
{
    private TransportFactoryInterface $decorated;
    private LoggerInterface $logger;

    public function __construct(TransportFactoryInterface $decorated, LoggerInterface $logger)
    {
        $this->decorated = $decorated;
        $this->logger    = $logger;
    }

    public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
    {
        unset($options['transport_name']);

        return new RedisTransportDecorator(
            new RedisTransport(Connection::fromDsn($dsn, $options), $serializer),
            $this->logger
        );
    }

    public function supports(string $dsn, array $options): bool
    {
        return $this->decorated->supports($dsn, $options);
    }
}

src/Messenger/Bridge/Redis/RedisTransportFactoryDecorator.php

<?php

declare(strict_types=1);

namespace App\Messenger\Bridge\Redis;

use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\TransportInterface;

class RedisTransportDecorator implements TransportInterface
{
    private TransportInterface $decorated;
    private LoggerInterface $logger;

    public function __construct(TransportInterface $decorated, LoggerInterface $logger)
    {
        $this->decorated  = $decorated;
        $this->logger     = $logger;
    }

    public function get(): iterable
    {
        return $this->decorated->get();
    }

    public function ack(Envelope $envelope): void
    {
        try {
            $this->decorated->ack($envelope);
        } catch (TransportException $exception) {
            if (strpos($exception->getMessage(), 'Could not acknowledge') === false) {
                throw $exception;
            } else {
                $this->logger->error($exception->getMessage(), [
                    'exception' => $exception
                ]);
            }
        }
    }

    public function reject(Envelope $envelope): void
    {
        try {
            $this->decorated->reject($envelope);
        } catch (TransportException $exception) {
            if (strpos($exception->getMessage(), 'Could not delete message') === false) {
                throw $exception;
            } else {
                $this->logger->error($exception->getMessage(), [
                    'exception' => $exception
                ]);
            }
        }
    }

    public function send(Envelope $envelope): Envelope
    {
        return $this->decorated->send($envelope);
    }
}

config/services.yaml

    App\Messenger\Bridge\Redis\RedisTransportFactoryDecorator:
        decorates: 'messenger.transport.redis.factory'

There is also another issue that can be confusing for someone who switch from AWS SQS, for example, and described in https://github.com/symfony/symfony/issues/51604 and in https://github.com/symfony/symfony/issues/44400#issuecomment-1096806779. Without unique consumer names with multiple workers the Could not acknowledge redis message error will happen even without batch handlers. Would be nice if consumer name generation could be done automatically. Also note that ideally consumer groups should be garbage collected to prevent other issues https://stackoverflow.com/a/70335802.

While working on this https://github.com/symfony/symfony/pull/49028 PR I’ve realized that I can’t fix the issue fully for batch handlers without fixing this issue first. A solution that could be implemented is to store IDs of the messages that worker currently holds inside the Connection class .

Algorithm is the following:

  1. When a message is fetched its ID is stored inside Connection
  2. When Connection tries to fetch a pending message 2.1. Ask XPENDING to return count(IDs) + constant. Makes sense to hard cap the number of messages with some arbitrary value like 100 to decrease the load on Redis. 2.2. If there is a suitable message (forgotten AND not in the IDs store) then claim it if needed and fetch with XRANGE. 2.3. If not then call XREADGROUP >
  3. When a message is acked then it is removed from the IDs store in Connection.

If (big if) it is guaranteed that a message is always acked/noacked or workers dies if not, then this should work. But introducing a state in Connection will complicate things a lot, could cause memory leaks or some weird behavior if badly implemented.

@lermontex You have to add UUID to every message to be able to use Redis transport with batch handlers, take a look at @zip-fa’s solution for the implementation details. Basically Redis transport is not made with batch handlers in mind, that’s why these ugly crutches are needed to make it work properly. The transport at some point tries to feed your handlers the same message that was already handled but not yet acked. When that happens you can do the following:

  • Flush the current batch as soon as your handler is getting fed with duplicates (you need UUID to understand that the message is duplicate or some other way to know this).
  • Ack the duplicate messages until the batch is ready to be processed. Though this will cause the message to be deleted from the stream before the task is done. Again you need UUID for that or some other way to understand that you get the same messages.
  • Just collect duplicates until the batch is full, then dedupe the messages before processing them (and you might want to have UUIDs for that).

After that you need to ack every duplicate message or you’ll get the The acknowledger was not called by the "%s" batch handler. error. But if you try to do this your worker will crash with Could not acknowledge/Could not delete error because the message was already deleted by another Acknowledger. To ignore these errors you need the Redis transport decorator that I’ve provided.