nest: [RabbitMQ Microservice] "PRECONDITION_FAILED - reply consumer already set"

Bug Report

Current behavior

Sometimes I’m getting these errors and after that microcervice client stops working and no way to catch these errors and I have to restart application to work properly again.

Issue is similar to #2323 and #2337

AssertionError [ERR_ASSERTION]: "0 == 1"
    at Mux._readIncoming (app\\node_modules\\amqplib\\lib\\mux.js:92:12)
    at Immediate._onImmediate (app\\node_modules\\amqplib\\lib\\mux.js:112:12)
    at runCallback (timers.js:810:20)
    at tryOnImmediate (timers.js:768:5)
    at processImmediate [as _immediateCallback] (timers.js:745:5)
	
Error: Operation failed: BasicConsume; 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - reply consumer already set"
    at reply (app\\node_modules\\amqplib\\lib\\channel.js:134:29)
    at ConfirmChannel.C.accept (app\\node_modules\\amqplib\\lib\\channel.js:417:7)
	at Connection.mainAccept [as accept] (app\\node_modules\\amqplib\\lib\\connection.js:64:33)
    at Socket.go (app\\node_modules\\amqplib\\lib\\connection.js:478:48)
    at emitNone (events.js:106:13)
    at Socket.emit (events.js:208:7)
    at emitReadable_ (_stream_readable.js:513:10)
    at emitReadable (_stream_readable.js:507:7)
    at addChunk (_stream_readable.js:274:7)
    at readableAddChunk (_stream_readable.js:250:11)
    at Socket.Readable.push (_stream_readable.js:208:10)
    at TCP.onread (net.js:601:20)
	From previous event:
    at ConfirmChannel.C.consume (app\\node_modules\\amqplib\\lib\\channel_model.js:170:18)
    at ChannelWrapper.channel.addSetup (app\\node_modules\\@nestjs\\microservices\\client\\client-rmq.js:37:52)
    at app\\node_modules\\promise-breaker\\index.js:265:46
    at <anonymous>
    at process._tickCallback (internal/process/next_tick.js:189:7)

Input Code

To get this error every time, you need to wait some time while processing message for ex.: long running object serializer.

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'RMQ_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: [`amqp://localhost:5672`],
          queue: "log_queue",
          queueOptions: { durable: true },
          serializer: {
            serialize: (value) => {
              const dt = new Date();
              while (new Date().getTime() - dt.getTime() <= 15000) { /* Do nothing */ }

              return value;
            },
          },
        },
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
  exports: [AppService],
})
export class AppModule implements NestModule {
  configure(consumer: MiddlewareConsumer) {    
  }
}

@Injectable()
export class AppService {
  constructor(@Inject('RMQ_SERVICE') private readonly client: ClientProxy ) {  }

  async getRMQServiceStatus(): Promise<string> {
    const pattern = { cmd: 'status' };
    const payload = 1;
    return this.client.send<string>(pattern, payload).toPromise();
  }

  async logAction(model: SomeModel): Promise<void> {
    const pattern ='log_action'
    await this.client.emit<void>(pattern, model);
  }
}

Expected behavior

Client should catch this error and throw to application and should reconnect and start work again.

Possible Solution

Environment


Nest version: 6.8.2

 
For Tooling issues:
- Node version: v8.16.0  
- Platform:  Linux, Windows

Others:

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 18 (2 by maintainers)

Commits related to this issue

Most upvoted comments

Fixed in 6.10.9. Thanks @sergey-telpuk!