kafka-node: Producer do not reconnect automatically

Questions?

I’m using a very simple code but it looks like if I shutdown kafka and start it again, the client will never reconnect. Is that normal ?

Environment

  • Node version: 10.14.0
  • Kafka-node version: 4.1.0
  • Kafka version: 2.2.0

For specific cases also provide

  • Number of Brokers: 1
  • Number partitions for topic: 1

Sample

const kafka = require('kafka-node');

const kafkaClient = new kafka.KafkaClient();
const kafkaProducer = new kafka.Producer(kafkaClient);
kafkaProducer.on('ready', () => {
  console.log('ready');
}).on('error', (err) => {
  console.error(err);
});

let i = 0;
setInterval(() => {
  i += 1;
  kafkaProducer.send([{
    topic: 'abcd',
    messages: [`abcd ${i}`],
  }], (error, data) => {
    console.log(error, data);
  });
}, 1000);

If you stop restart kafka while this script is running, it will always fail until you call kafkaClient.connect().

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Reactions: 2
  • Comments: 23

Commits related to this issue

Most upvoted comments

Just published 4.1.3 which should have this.

I may be running in to something similar.

I’m testing against a local single-broker Kafka set up with a serverside connection idle timeout of 30 seconds (for testing). Using 127.0.0.1 as the connection address (not localhost)

I set the client idleConnection timeout to 15 seconds (lower than server’s timeout)

Then I create a HighLevelProducer and let it sit idle (never producing anything).

After the 15 second client timeout period, the connection closes and I see the following:

[2019-04-29T20:02:42.691Z][level:DEBUG] kafka-node-producer socket closed 127.0.0.1:9092 (hadError: false)
[2019-04-29T20:02:43.690Z][level:DEBUG] kafka-node-producer is not reconnecting to 127.0.0.1:9092 invalid broker

All subsequent produce requests error out and the connection is never reestablished.

I believe the problem is that isValidBroker is checking for a host and port combination to exist in the brokerMetadata, but the port in the metaData is a number while the port that is passed in in this case is a string. I added some logging to the function:

KafkaClient.prototype.isValidBroker = function ({ host, port }) {
  console.log({host, port})
  console.log(_(this.brokerMetadata)
    .values().value())

  return (
    this.connecting ||
    _(this.brokerMetadata)
      .values()
      .some({ host, port })
  );
};

And right before the problem happens, this is the log output:

{ host: '127.0.0.1', port: '9092' }
[ { nodeId: 0, host: '127.0.0.1', port: 9092 } ]

The following modification allows the reconnection to happen succesfully:

KafkaClient.prototype.isValidBroker = function ({ host, port }) {
  return (
    this.connecting ||
    _(this.brokerMetadata)
      .values()
      .some(({host: mHost, port: mPort}) => `${host}:${port}` === `${mHost}:${mPort}`)
  );
};

However, it would probably be better to make sure the port has the same type upstream, but I don’t know enough about how this all works to figure out how to ensure that.

I also got a similar problem, i kept getting “Broker not available (sendRequest)” Error

I have similar issue, producer not reconnected: kafka-node-client is not reconnecting to localhost:9092 invalid broker