node-red: MQTT node doesn't exit cleanly

Current Behavior

A Node red instance with a single flow consisting of a single mqtt-in node and associated broker node prevents the NR instance from exiting cleanly.

Expected Behavior

Calling RED. stop() should cleanly terminate all flows and allow node to exit,

The issue seems to be that the mqtt code (10-mqtt.js) doesn’t call client.close() during its disconnect handler.

https://github.com/node-red/node-red/blob/329008bf6d4325e2ed47437c4824f91202b7ca3f/packages/node_modules/%40node-red/nodes/core/network/10-mqtt.js#L831

Changing this to the following fixes the issue

if(node.closing) { node.client.end(); return _callback(); }

Steps To Reproduce

The following code reproduced the problem.

const http = require('http');
const express = require("express");
const RED = require("node-red");
const fs = require('fs');
const os = require('os');
const path = require('path');
const whyIsNodeRunning = require('why-is-node-running')

const mqttFlow = {
    label: 'mqtt',
    disabled: false,
    info: '',
    nodes: [
    {
        type: 'mqtt in',
        name: 'mqtt',
        id: 'b021cb01-cbd5-11ec-8a3d-47d2cdb2f658',
        topic: 'myTopic',
        broker: 'b021f210-cbd5-11ec-8a3d-47d2cdb2f658'
      },
      {
        type: 'mqtt-broker',
        name: '',
        id: 'b021f210-cbd5-11ec-8a3d-47d2cdb2f658',
        keepalive: 60,
        port: '1883',
        broker: 'localhost'
      },
    ]
}

process.once('SIGINT', function () {
    console.log('SIGINT received...');
    close();
  });

process.once('SIGTERM', function () {
    console.log('SIGTERM received...');
    close();
});

var server;
const userDir = fs.mkdtempSync(path.join(os.tmpdir(),'test-'));


async function start () {
    let app = express();
    server = http.createServer(app);
    
    var settings = {
        httpAdminRoot:"/red",
        httpNodeRoot: "/api",
        userDir: userDir,
    };

    RED.init(server, settings);

    app.use(settings.httpAdminRoot,RED.httpAdmin);
    app.use(settings.httpNodeRoot,RED.httpNode);

    // Start Node Red
    await RED.start()

    RED.runtime.events.once('flows:started', () => {
        RED.runtime.flows.addFlow({flow: mqttFlow});
    })

    server.listen(2000);
    console.log(`Node Red server running on port 2000`)
}


async function close() {
    await RED.stop();
    await server.close();
    fs.rmSync(userDir, {force: true, recursive: true})
    setTimeout(() => {whyIsNodeRunning(console)}, 1000)
}

start();

Running this from the command line and stopping it with Ctrl-C leads to the node process hanging and “why-IsNodeRunning” shows that the mqtt reconnect timer is still active.

 # Timeout
node:internal/async_hooks:203                           
node:internal/async_hooks:474                           
node:internal/timers:162                                
node:internal/timers:196                                
/home/phil/CT/NRBug/node_modules/mqtt/lib/client.js:1123 - that.reconnectTimer = setInterval(function () {
/home/phil/CT/NRBug/node_modules/mqtt/lib/client.js:390  - this._setupReconnect()
/home/phil/CT/NRBug/node_modules/mqtt/lib/client.js:477  - that.emit('close')

nrBug.tar.gz

Example flow

[{"id":"075db3f8c5ea6d56","type":"tab","label":"mqtt","disabled":false,"info":""},{"id":"b021cb01-cbd5-11ec-8a3d-47d2cdb2f658","type":"mqtt in","z":"075db3f8c5ea6d56","name":"mqtt","topic":"myTopic","broker":"b021f210-cbd5-11ec-8a3d-47d2cdb2f658","inputs":0,"x":0,"y":0,"wires":[[]]},{"id":"b021f210-cbd5-11ec-8a3d-47d2cdb2f658","type":"mqtt-broker","z":"075db3f8c5ea6d56","name":"","broker":"localhost","port":"1883","keepalive":60}]

Environment

  • Node-RED version: 2.2.2
  • Node.js version: v16.4.0
  • npm version: 7.18.1
  • Platform/OS: Ubuntu 20.04.1
  • Browser:Version 98.0.4758.102 (Official Build) (64-bit)

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 15 (15 by maintainers)

Most upvoted comments

Thanks - I will review now. Would like to get this merged today if possible. I am certain it will be fine but are you at hand in say 10 mins should there be any slight tweaks required?

Hi Steve,

Yes, tracking the event handlers and removing them explicitly is a solid fix for the issues I was seeing…

Like you I was a bit surprised that the mqtt code shares the Event Emiitter with the user like that; having their own internal one would have been cleaner but I guess the JS docs do warn of risks of using removeAllListeners().

Your comment about the callback passed to end() only being called and clearing the listeners after the mqtt client has finished getting the acks did make me look again at the code, and the issue is that the node.disconnect() method gets called twice.

The first call is in response to a ‘close’ event of the mqtt node which calls brokerConn.deregister() , and the broker detects that this is the final client connected and calls its own disconnect() method which calls the end() method of the client with the callback function, etc

But there is then a second call to the broker node disconnect() method which is in response to its own ‘close’ event, which hits this line which invokes the _callback() directly and clears the event handlers - and this happens before the client has finished processing the acks, hence its internal event doesn’t get through.

https://github.com/node-red/node-red/blob/9ed96de2373a8c07d6c9711e3da53c88bbe0af50/packages/node_modules/%40node-red/nodes/core/network/10-mqtt.js#L831

I guess the code could be modified to not have that second call - but to be honest now that we know that the event emitter is shared with the mqtt module it feels to me that trying to calling removeAllListeners() at only the right time is going to be pretty fragile compared to explicitly managing the event handlers.