symfony: [Messenger] Consumers not showing in RabbitMQ admin?!

Symfony version(s) affected: 4.2.0 - 4.2.3

Description
When running the messenger:consume-messages command, the registered consumer(s) will not show in admin area of RabbitMQ, although the messages get consumed.

That’s the current state: Actual

And this is how it should look like: Expectation

Is this an actual bug, is it by design or am I doing something wrong?

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 24 (10 by maintainers)

Commits related to this issue

Most upvoted comments

The transport does not rely on \AmqpQueue::consume() because it is blocking. Having a blocking receiver makes the --time-limit/--memory-limit options of the messenger:consume command as well as the messenger:stop-workers command inefficient, as they all rely on the fact that the receiver returns immediately no matter if it finds a message or not. The consume worker is responsible for iterating until it receives a message to handle and/or until one of the stop conditions is reached. Thus, the worker’s stop logic cannot be reached if it is stuck in a blocking call.

So yes, that’s by design.

Would it possible to add an option in AMQP transport to use consume mod instead?

--time-limit and --memory-limit could then behave differently for this transport?

Hello @chalasr

The problem of consuming messages with the get method. If the consumer loses the connection, the message will remain permanently unacked. With the consume method, the message will return to the queue if the connection is lost after timeout.

There is a possibility of data loss and i don’t think --time-limit/–memory-limit features are a enough good reason to allow that. The documentation should include a disclaimer about this.

@chalasr Then it’s not true consumer, that not visible in rabbit admin panel.

You should design another command, that do what it should do – report that it’s consumer, be visible in admin panel but without those useless --time-limit/--memory-limit options.

Hi @gquemener

I have created a bundle to implement this solution and i use it in production. I was planning on pushing a PR but got new priorities. It is still on my TODO list because I am very interested that this functionality be maintained over the long term but … no ETA

@garex The hearthbeat configuration should be accessible, in any case I saw a heartbeat parameter somewhere.

The bundle https://github.com/Cafeine42/amqp-messenger

AFAIU, the problem is that consume() would block current thread until a message is consumed, thus preventing to perform checks like “memory” (memory usage should stay stable as long as no message is consumed, shouldn’t it?) or “time” limit in the context of a low throughput queue.

In regards to heartbeat, from amqp-ext documentation:

heartbeats are limited to blocking calls only, so if there are no any operations on a connection or no active consumer set, connection may be closed by the broker as dead.

This means that, when using this php extension, and when heartbeat has been configured in the amqp connection, then heartbeat frame will only be sent (if necessary) when calling methods like get and consume. This is hardly a good strategy as hearbeat frame would be tightly coupled to the consumer process duration or the throughput on the queue (which would most likely result in undesired connection closing). A more reasonable strategy would be not use heartbeat with amqp-ext or use a non blocking io strategy to periodically send the frames.

Is it correct?

If so, I’m wondering if one should not prefer using a non-blocking IO solution (like this) to consume message through the basic.consume strategy (and thus limit the symfony/amqp-messenger to basic.get, as it is today). I’m not sure I have seen such limitation notice in the documentation. If not, I can help to underline this fact.

Disclaimer: I am no expert in amqp protocol, nor in non blocking IO solution, so feel free to adjust my statements and take part in the conversation if I’m talking non-sense here! Plus, the issue is closed, so I am not sure it will ever reach any audience.

You should design another command, that do what it should do

@garex Those “useless” options is the best we could provide with all concerns taken into account around the design of the component (which has been experimental for 3 minor releases before being considered stable and proven useful).

Please keep in mind that nothing is owed here. Anyone can contribute and propose changes, you included. Our process for feature requests starts by opening a ticket describing the suggested feature and wait for feedback from the community. if you want something to change, I suggest you to follow that process.

Hi @Guikingone,

thank you for your reply. We’ve debugged this a little bit further now and the culprit lies in Symfony\Component\Messenger\Transport\AmqpExt\Connection::ack() resp. Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver::receive() in Line 62.

So - the messenger library doesn’t actually consume messages the “RabbitMQ-way”. Hence the Connection should provide a consume-Method, where the \AMQPQueue::consume()-method gets called. In this way we would be able to actually see the subscribed message consumers (as mocked up in my second screenshot).