node-rdkafka: Consumer stops fetching messages

Hi,

Consumer stops fetching after getting following error { Error: Local: Broker transport failure at Error (native) origin: 'local', message: 'broker transport failure', code: -1, errno: -1, stack: 'Error: Local: Broker transport failure\n at Error (native)' }

All kafka servers are healthy even then I am getting the transport failure . After this error consumers stops fetching further messages

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 24 (7 by maintainers)

Most upvoted comments

Hi @webmakersteve, we are facing the same issue in our environment. Thanks

Hi @webmakersteve we facing the same problem here 😦 do we have another open issue for this or an ETA for its resolution? Thx in advance!!

Thanks, I’ll set that up. In my case, I am consuming messages for a time before it gives up. What I have noticed is that occasionally I will see this error:

{ Error: Local: Broker transport failure origin: ā€˜local’, message: ā€˜broker transport failure’, code: -1, errno: -1, stack: ā€˜Error: Local: Broker transport failure’ }

Most of the time the client recovers and will still consume messages, but at some point this error gets thrown and the consumer stops. I have been able to work around the issue by coding up this:

consumer.on(ā€˜ready’, function() { console.log(ā€œConsumer Readyā€); consumer.subscribe([topic]); consumer.consume(); }); consumer.on(ā€˜event.error’, function(err) { console.log(err); consumer.disconnect(); }); consumer.on(ā€˜disconnected’, function(data) { console.log(ā€œDisconnected. Reconnectingā€¦ā€); consumer.connect(); });

When I do this, the client reconnects to the broker and starts consuming the messages again.

Hi again. Attempting a re-consume on the error didn’t work for me. It has to be a full disconnect.

Hi

@webmakersteve I’ve experienced with the same problem. Consumer stops receiving messages from topic at some point. And in the log I see the only

2018-05-23T13:29:10.303Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:33:55.362Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:39:10.401Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:43:55.442Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:53:55.537Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:54:35.542Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}

node v9.11.1 node-rdkafka@2.3.2

My consumer

        this.consumer = new Kafka.KafkaConsumer({
            'debug':                    'all',
            'group.id':                 config.kafka.consumer.group_id,
            'metadata.broker.list':     config.kafka.consumer.brokers,
            'rebalance_cb': function(err, assignment) {
                log("+OK - rebalance cb");

                if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
                    log("+OK - rebalance cb. assign: %j", assignment);
                    // Note: this can throw when you are disconnected. Take care and wrap it in
                    // a try catch if that matters to you

                    try {
                        this.assign(assignment).committed(10000, (err, data) => {
                            log("+OK - rebalance_cb. committed data: %j", data);
                        });
                    } catch(err) {
                        error("+OK - rebalance_cb. can not assign assignments err: %j", err);
                    }

                } else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS){
                    log("+OK - rebalance cb. unassign: ", assignment);

                    try {
                        // Same as above
                        this.unassign();
                    } catch(err) {
                        error("+OK - rebalance_cb. can not unassign assignments err: %j", err);
                    }
                } else {
                    // We had a real error
                    error("+ERR - rebalance cb. err: ", err);
                }

            },
            'offset_commit_cb': (err, topicPartitions) => {

                if (err) {
                    error("+ERR - offset commit cb. err: ", err);
                    // There was an error committing
                } else {
                    log("+OK - offset commit cb. topicPartitions: ", topicPartitions);
                    // Commit went through. Let's log the topic partitions
                }
            }
        }, {});

        //logging debug messages, if debug is enabled
        this.consumer.on('event.log', ev => {
            // log("+OK - initServer.event.log ", ev);
        });

        //logging all errors
        this.consumer.on('event.error', err => {
            error("+ERR - initServer.event.error err: %j", err);
        });

        this.consumer.on('ready', (info, metadata) => {
            log('+OK - initServer.ready. info %j metadata: %j', info, metadata);

            this.consumer.subscribe(config.kafka.consumer.topics);

            //start consuming messages
            this.consumer.consume();

            if(cb) cb();
        });

        this.consumer.on('data', m => {

            let parsed_message;

            try { parsed_message = JSON.parse(m.value.toString()); } catch(e) {}

            // Do some stuff with message
        });


        this.consumer.on('disconnected', arg => {
            log('+OK - initServer.disconnected. %j', arg);
        });

        //starting the consumer
        this.consumer.connect();

I don’t see any ā€œrebalanceā€ or ā€œdisconnectā€ calls in logs and according to Kafka-Tool the consumer is still connected and subscribed on topic but doesn’t receive any messages.