laravel-queue-rabbitmq: Queue listener enters an endless loop error state when dispatching a delayed job

  • Laravel/Lumen version: 6.3.3
  • RabbitMQ version: 3.8.6 (I think)
  • Package version: 10.2.2

Describe the bug

The connection or channel seems to get closed when dispatching a delayed job, leading to a cascade of issues that result in a listener running into an endless loop of error messages.

Steps To Reproduce

A running job dispatches another job with a delay.

Current behavior

First I get an ACCESS_DENIED exception, then an endless loop of AMQPChannelClosedException exceptions. The job and lister do not exit, but enter this endless loop state.

Expected behavior

A job should be dispatched.

Additional context

The lost connection or channel, resulting in the ACCESS_DENIED error can be reproducred in artisan tinker. With some class names changed, it looks like this:

>>> $j = new App\Jobs\TestJob()
=> App\Jobs\TestJob {#3155
     +connection: null,
     +queue: null,
     +chainConnection: null,
     +chainQueue: null,
     +delay: null,
     +middleware: [],
     +chained: [],
   }
>>> dispatch($j->delay(Carbon\Carbon::now()->addSeconds(50)))
=> Laravel\Lumen\Bus\PendingDispatch {#3156}
>>> DB::commit()
=> null

Then the same again in the same tinker session:

>>> $j = new App\Jobs\TestJob()
=> App\Jobs\TestJob {#3198
     +connection: null,
     +queue: null,
     +chainConnection: null,
     +chainQueue: null,
     +delay: null,
     +middleware: [],
     +chained: [],
   }
>>> dispatch($j->delay(Carbon\Carbon::now()->addSeconds(50)))
=> Laravel\Lumen\Bus\PendingDispatch {#3199}
>>> DB::commit()
PhpAmqpLib/Exception/AMQPProtocolChannelException with message 'ACCESS_REFUSED - access to exchange 'amq.default' in vhost 'myhost' refused for user 'myuser''

Notes:

  • We build all the queues in advance.
  • Our default queue is jobs.default.
  • In the first instance above a queue is correctly created called jobs.default.delay.48000.
  • I do NOT see the job added to the delay queue - no jobs appear to go through it.
  • The user myuser does not have permission to use exchange amq.default and I don’t know why it is trying to do so.
  • The error appears to come from the creation of the delay queue the second time (it does get created the first time, but the second run through does not seem to see it).
  • With debug turned on, I see the connection created right at the start, but do not see the connection close being logged until we get to the ACCESS_DENIED point - I think you have an aoption authentication_failure_close=1 set for the connection, so I can see why it all goes wrong after that.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 23

Most upvoted comments

Ohw and when delaying messages the messages are put into an queue “directly” via the exchange amq.direct. this queue name is based on the queue your publishing to and death letters to the queue in the connection config or the queue set in the JobClass.

So the queue must exist to be correcly routed by DLX

Example:

When Your Job is dispaching to queue: orders on connection rabbitmq with a delay of 30 seconds.

An delay queue is created with the name: orders.x.delay.30000 and with an DLX set to: orders.

The message must arrive in the delay queue (under the hood, via the default direct exchange).

After 30000 miliseconds the message death-letters into the DLX queue, but only when it exists. (This is done by RabbitMQ) In order to pick up the message when the message death-letters… the orders queue must exist. And finally your message is picked-up by the worker.

Important!!! When your process, is starting off with the delaying of a message, the orders queue is missing. So you have to pre declare this queue if its not present.