symfony: [Messenger] Could not acknowledge redis message with BatchHandlerInterface
Symfony version(s) affected
5.4.0, 6.0.0
How to reproduce
I have MessengerHandler which makes http requests (with only one parallel consumer):
messenger.yaml:
transports:
test:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%/test'
options:
delete_after_ack: true
consumer: '%env(MESSENGER_CONSUMER_NAME)%'
retry_strategy:
max_retries: 0
routing:
'TestMessage': test
TestMessageHandler.php:
<?php
class TestMessageHandler implements MessageHandlerInterface, BatchHandlerInterface
{
use BatchHandlerTrait;
public function __construct(
private HttpClientInterface $client
) { }
public function __invoke(TestMessage $message, Acknowledger &$ack = null)
{
return $this->handle($message, $ack);
}
private function shouldFlush(): bool
{
return 5 <= \count($this->jobs);
}
private function process(array $jobs): void
{
$responses = [];
foreach ($jobs as [$job, $ack]) {
try {
[$headers, $content] = prepareRequest();
$responses[] = $this->client->request('POST', $job->getEndpoint(), [
'headers' => $headers,
'body' => $content
]);
$ack->ack($job);
} catch (\Exception $e) {
$ack->nack($e);
}
}
if(0 === count($responses)) {
return;
}
foreach ($this->client->stream($responses) as $response => $chunk) {
if ($chunk->isFirst()) {
var_dump($response->getStatusCode());
} else if ($chunk->isLast()) {
}
}
}
}
cli:
MESSENGER_CONSUMER_NAME=test php bin/console messenger:consume test
Code works okay, but when i hit ctrl+c before script finished and start it again, it throws me an error:
In Connection.php line 441:
Could not acknowledge redis message "1638374074049-0".
I noticed that after consumer crashes or i hit ctrl+c, same message comes to __invoke a few times. When removing BatchHandlerInterface it starts to work as excepected without any issue.
Possible Solution 1 (a bad one)
- Add uuid to every Message
- Destroy Acknowledger and create replacement for it (HandleMessageMiddleware.php:88)
Test.php:
class Test
{
public function isAcknowledged()
{
return false;
}
}
TestMessageHandler.php:
public function __invoke(SendPushMessage $message, Acknowledger &$ack = null)
{
$uid = $message->getUuid();
if(isset($this->jobs[$uid])) {
$this->flush(true);
try {
$ack = null;
unset($ack);
} catch(\Exception) { }
$ack = new Test();
return 0;
}
return $this->handle($message, $ack);
}
private function handle(object $message, ?Acknowledger $ack)
{
$uid = $message->getUuid();
if (null === $ack) {
$ack = new Acknowledger(get_debug_type($this));
$this->jobs[$uid] = [$message, $ack];
$this->flush(true);
return $ack->getResult();
}
$this->jobs[$uid] = [$message, $ack];
if (!$this->shouldFlush()) {
return \count($this->jobs);
}
$this->flush(true);
return 0;
}
Possible Solution 2
Comment __destruct in Symfony\Component\Messenger\Handler\Acknowledger.php
Handler.php:
public function __invoke(SendPushMessage $message, Acknowledger $ack = null)
{
$uid = $message->getUuid();
if(isset($this->jobs[$uid])) {
$this->flush(true);
return 0;
}
return $this->handle($message, $ack);
}
private function handle(object $message, ?Acknowledger $ack)
{
$uid = $message->getUuid();
if (null === $ack) {
$ack = new Acknowledger(get_debug_type($this));
$this->jobs[$uid] = [$message, $ack];
$this->flush(true);
return $ack->getResult();
}
$this->jobs[$uid] = [$message, $ack];
if (!$this->shouldFlush()) {
return \count($this->jobs);
}
$this->flush(true);
return 0;
}
Additional Context
No response
About this issue
- Original URL
- State: open
- Created 3 years ago
- Reactions: 1
- Comments: 23 (15 by maintainers)
Commits related to this issue
- [Messenger] fixes for Redis transport fixes #49023, partially fixes #44400 — committed to AdamKatzDev/symfony by AdamKatzDev 6 months ago
@zip-fa since I’ve properly configured the Supervisor application to register to Redis on a unique DSN this issue seems resolved for me.
After translating this Russian blogpost by Алексей Альшенецкий I’ve came to the following configuration:
Relevant details
@lermontex, sure.
To anyone that tries to use this crutch, this is a part of the full solution, you need to add UUID to your messages to remove duplicates. The decorator allows to ack and delete messages that don’t exist in the redis stream anymore (and you will have to do this, redis transport will try to feed your worker duplicate messages, i.e. the messages that are collected but not processed by the worker at the moment). Also take a look at my previous comment to check other issues that might affect you.
src/Messenger/Bridge/Redis/RedisTransportFactoryDecorator.php
src/Messenger/Bridge/Redis/RedisTransportFactoryDecorator.php
config/services.yaml
There is also another issue that can be confusing for someone who switch from AWS SQS, for example, and described in https://github.com/symfony/symfony/issues/51604 and in https://github.com/symfony/symfony/issues/44400#issuecomment-1096806779. Without unique consumer names with multiple workers the
Could not acknowledge redis messageerror will happen even without batch handlers. Would be nice if consumer name generation could be done automatically. Also note that ideally consumer groups should be garbage collected to prevent other issues https://stackoverflow.com/a/70335802.While working on this https://github.com/symfony/symfony/pull/49028 PR I’ve realized that I can’t fix the issue fully for batch handlers without fixing this issue first. A solution that could be implemented is to store IDs of the messages that worker currently holds inside the
Connectionclass .Algorithm is the following:
ConnectionConnection.If (big if) it is guaranteed that a message is always acked/noacked or workers dies if not, then this should work. But introducing a state in
Connectionwill complicate things a lot, could cause memory leaks or some weird behavior if badly implemented.@lermontex You have to add UUID to every message to be able to use Redis transport with batch handlers, take a look at @zip-fa’s solution for the implementation details. Basically Redis transport is not made with batch handlers in mind, that’s why these ugly crutches are needed to make it work properly. The transport at some point tries to feed your handlers the same message that was already handled but not yet acked. When that happens you can do the following:
After that you need to ack every duplicate message or you’ll get the
The acknowledger was not called by the "%s" batch handler.error. But if you try to do this your worker will crash withCould not acknowledge/Could not deleteerror because the message was already deleted by another Acknowledger. To ignore these errors you need the Redis transport decorator that I’ve provided.