symfony: [Messenger] Consumers not showing in RabbitMQ admin?!
Symfony version(s) affected: 4.2.0 - 4.2.3
Description
When running the messenger:consume-messages command, the registered consumer(s) will not show in admin area of RabbitMQ, although the messages get consumed.
That’s the current state:

And this is how it should look like:

Is this an actual bug, is it by design or am I doing something wrong?
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 24 (10 by maintainers)
The transport does not rely on
\AmqpQueue::consume()because it is blocking. Having a blocking receiver makes the--time-limit/--memory-limitoptions of themessenger:consumecommand as well as themessenger:stop-workerscommand inefficient, as they all rely on the fact that the receiver returns immediately no matter if it finds a message or not. Theconsumeworker is responsible for iterating until it receives a message to handle and/or until one of the stop conditions is reached. Thus, the worker’s stop logic cannot be reached if it is stuck in a blocking call.So yes, that’s by design.
Would it possible to add an option in AMQP transport to use consume mod instead?
--time-limitand--memory-limitcould then behave differently for this transport?Hello @chalasr
The problem of consuming messages with the get method. If the consumer loses the connection, the message will remain permanently unacked. With the consume method, the message will return to the queue if the connection is lost after timeout.
There is a possibility of data loss and i don’t think --time-limit/–memory-limit features are a enough good reason to allow that. The documentation should include a disclaimer about this.
@chalasr Then it’s not true consumer, that not visible in rabbit admin panel.
You should design another command, that do what it should do – report that it’s consumer, be visible in admin panel but without those useless
--time-limit/--memory-limitoptions.Hi @gquemener
I have created a bundle to implement this solution and i use it in production. I was planning on pushing a PR but got new priorities. It is still on my TODO list because I am very interested that this functionality be maintained over the long term but … no ETA
@garex The hearthbeat configuration should be accessible, in any case I saw a heartbeat parameter somewhere.
The bundle https://github.com/Cafeine42/amqp-messenger
AFAIU, the problem is that
consume()would block current thread until a message is consumed, thus preventing to perform checks like “memory” (memory usage should stay stable as long as no message is consumed, shouldn’t it?) or “time” limit in the context of a low throughput queue.In regards to heartbeat, from amqp-ext documentation:
This means that, when using this php extension, and when heartbeat has been configured in the amqp connection, then heartbeat frame will only be sent (if necessary) when calling methods like
getandconsume. This is hardly a good strategy as hearbeat frame would be tightly coupled to the consumer process duration or the throughput on the queue (which would most likely result in undesired connection closing). A more reasonable strategy would be not use heartbeat with amqp-ext or use a non blocking io strategy to periodically send the frames.Is it correct?
If so, I’m wondering if one should not prefer using a non-blocking IO solution (like this) to consume message through the
basic.consumestrategy (and thus limit the symfony/amqp-messenger tobasic.get, as it is today). I’m not sure I have seen such limitation notice in the documentation. If not, I can help to underline this fact.Disclaimer: I am no expert in amqp protocol, nor in non blocking IO solution, so feel free to adjust my statements and take part in the conversation if I’m talking non-sense here! Plus, the issue is closed, so I am not sure it will ever reach any audience.
@garex Those “useless” options is the best we could provide with all concerns taken into account around the design of the component (which has been experimental for 3 minor releases before being considered stable and proven useful).
Please keep in mind that nothing is owed here. Anyone can contribute and propose changes, you included. Our process for feature requests starts by opening a ticket describing the suggested feature and wait for feedback from the community. if you want something to change, I suggest you to follow that process.
Hi @Guikingone,
thank you for your reply. We’ve debugged this a little bit further now and the culprit lies in
Symfony\Component\Messenger\Transport\AmqpExt\Connection::ack()resp.Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver::receive()in Line 62.So - the messenger library doesn’t actually consume messages the “RabbitMQ-way”. Hence the
Connectionshould provide aconsume-Method, where the\AMQPQueue::consume()-method gets called. In this way we would be able to actually see the subscribed message consumers (as mocked up in my second screenshot).