symfony: [Messenger] MessageDecodingFailedException should not delete message from queue

Symfony version(s) affected

All

Description

MessageDecodingFailedException should not delete a message from the queue.

try {
    $envelope = $this->serializer->decode([
        'body' => $sqsEnvelope['body'],
        'headers' => $sqsEnvelope['headers'],
    ]);
} catch (MessageDecodingFailedException $exception) {
    $this->connection->delete($sqsEnvelope['id']);

    throw $exception;
}

Issues it causes

  • Since decoding is not exactly reverse of encoding with arrays (int vs string keys), legitimate messages end up being dropped.
  • Any other kind of dev ops error can mess up the decoding temporarily.

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Comments: 26 (20 by maintainers)

Most upvoted comments

@julienfalque I abandoned the subject, not on priority anymore sorry 😕

I implemented a new transport with my idea but it’s hacky. Here is the code of the custom AmqpReceiver:

    private function getEnvelope(string $queueName): iterable
    {
        try {
            $amqpEnvelope = $this->connection->get($queueName);
        } catch (\AMQPException $exception) {
            throw new TransportException($exception->getMessage(), 0, $exception);
        }

        if (null === $amqpEnvelope) {
            return;
        }

        $body = $amqpEnvelope->getBody();

        try {
            $envelope = $this->serializer->decode([
                'body'    => $body,
                'headers' => $amqpEnvelope->getHeaders(),
            ]);
        } catch (\Throwable $exception) {
            $this->logger->warning(
                'Rejected message because decoding failed. It will be sent to the default failure transport.',
                ['exception' => $exception]
            );

            $message = new \stdClass();
            $message->rawBody = $body;
            $message->rawHeaders = $amqpEnvelope->getHeaders();
            $envelope = new Envelope($message, [
                new MessageDecodingFailedStamp(),
                ErrorDetailsStamp::create($exception),
                new SentToFailureTransportStamp($queueName),
                new DelayStamp(0),
                new RedeliveryStamp(0),
            ]);

            $this->defaultFailureSender->send($envelope);

            // invalid message of some type
            $this->rejectAmqpEnvelope($amqpEnvelope, $queueName);

            return;
        }

        yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
    }

In Symfony it could be implemented with a Listener I guess. That would be cleaner.

I have the same problem with AMQP.

Normally, the failed message would be routed to the dead-letter queue but because of the unhandled exception this doesn’t happen. I’d love to be able to route problem messages to a dead-letter queue. Perhaps we could wrap the decoding exception with a try…catch and dispatch a MessageHandlingFailed event so that these edge-cases can be handled by users as they see fit?

It would also be beneficial to provide the failure transports somehow, right now you can’t really get them in a service/listener…