kafka-node: Does KafkaClient tolerate failure of broker(s) and try to connect other brokers ?

Questions?

Does KafkaClient tolerate failure of broker(s) and try to connect other brokers ?

Bug Report

Hi team, I am using KafkaClient as client parameter of Producer, and I set kafkaHost to “host1:port1,host2:port2,host3:port3” , it works when I send a message. Then, I kill the broker of host1:port1 and send message again via using the cached producer, it is stucked until request timeout, error message is TimeoutError: Request timed out after 10000ms. Then I use debug mode, and check the log of kafka-node, it always display kafka-node:KafkaClient kafka-node-client reconnecting to host1:port1… will it try to connect to host2:port2 or host3:port3 that I provided ? Also, when I debug in client.js, it has a method called getBrokers, seems that it doesn’t include all brokers I provided, sometimes only host1:port1, but sometimes [host1:port1, host2:port2] ?

Environment

  • Node version: v6.5.9
  • Kafka-node version: 2.3.0
  • Kafka version: 1.0.0
  • Scala version: 2.11

For specific cases also provide

  • Number of Brokers: 3
  • Number partitions for topic: 1

Include Sample Code to reproduce behavior

var client = new kafka.KafkaClient({
    kafkaHost: "host1:port1,host2:port2,host3:port3",
    connectTimeout: 10000,
    requestTimeout: 10000
});
producer = new kafka.Producer(client);

Include output with Debug turned on

kafka-node:KafkaClient checking payload topic/partitions has leaders kafka-node:KafkaClient found leaders for all kafka-node:KafkaClient sending request kafka-node:KafkaClient grouped requests by 1 brokers [“0”] kafka-node:KafkaClient missing apiSupport waiting until broker is ready… kafka-node:KafkaClient kafka-node-client reconnecting to host1:port1 kafka-node:KafkaClient kafka-node-client reconnecting to host1:port1 kafka-node:KafkaClient kafka-node-client reconnecting to host1:port1 kafka-node:KafkaClient kafka-node-client reconnecting to host1:port1 … TimeoutError: Request timed out after 10000ms

Best Regards, Thanks!

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 15 (1 by maintainers)

Most upvoted comments

We leave it up to the user of the module to determine when they should retry requests or give up. It’s difficult for the module to do this because it could make a wrong decision which could lead to messages being produced in the wrong order. Order can be very important for certain use cases.

oh I see, so it will timeout if the leader broker of topic is broken, and now I have add my own retry logic to send again if it timeout. I have suggestion: could KafkaClient add a option to retry if it timeout in case it caused by leader broker broken (new leader was elected and new topic/broker metadata can be fetch)?, so no exception will raised, it will not end up my process and I don’t need to add my own retry feature, if I get time, I will try to achieve it.

Thanks!

@hyperlink I think there might actually be a problem reconnecting on version 2.6.1.

Description

I created a TestTopic with 1 partition and replication factor 3.

The producer produces a message at a regular interval. When I kill the leader, the producer isn’t able to recover, even in subsequent messages. From what I observed, the client didn’t round robin on the initial broker list to reconnect.

In the test consumer, when I kill two of the three brokers, it is unable to use the third broker to connect.

My Setup

Producer Debug Output

Producing message, to topic TestTopic
  kafka-node:KafkaClient compressing messages if needed +10s
  kafka-node:KafkaClient checking payload topic/partitions has leaders +1ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["1"] +1ms
  kafka-node:KafkaClient has apiSupport broker is ready +1ms
  kafka-node:KafkaClient Using V2 of produce +36ms
message written back to Kafka {"TestTopic":{"0":126}}
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +8s
  kafka-node:KafkaClient Sending versions request to kb1.local.thsp.tech:19092 +4ms
  kafka-node:KafkaClient ApiVersions failed with unexpected error { BrokerNotAvailableError: Broker not available
    at new BrokerNotAvailableError (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
    at Socket.<anonymous> (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/kafkaClient.js:630:43)
    at emitOne (events.js:96:13)
    at Socket.emit (events.js:188:7)
    at TCP._handle.close [as _onclose] (net.js:497:12) message: 'Broker not available' } +2ms
  kafka-node:KafkaClient error initialize broker after connect { BrokerNotAvailableError: Broker not available
    at new BrokerNotAvailableError (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
    at Socket.<anonymous> (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/kafkaClient.js:630:43)
    at emitOne (events.js:96:13)
    at Socket.emit (events.js:188:7)
    at TCP._handle.close [as _onclose] (net.js:497:12) message: 'Broker not available' } +7ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +993ms
Producing message, to topic TestTopic
  kafka-node:KafkaClient compressing messages if needed +840ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +1ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["1"] +0ms
  kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +181ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
Producing message, to topic TestTopic
  kafka-node:KafkaClient compressing messages if needed +691ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +1ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["1"] +0ms
  kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +317ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s

Consumer Debug Output

  kafka-node:KafkaClient longpolling socket [BrokerWrapper kb2.local.thsp.tech:29092 (connected: true) (ready: true) (idle: false)] is waiting +0ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +2ms
  kafka-node:KafkaClient Sending versions request to kb2.local.thsp.tech:29092 +13ms
  kafka-node:KafkaClient Sending versions request to kb2.local.thsp.tech:29092 +2ms
  kafka-node:KafkaClient ApiVersions failed with unexpected error { BrokerNotAvailableError: Broker not available
    at new BrokerNotAvailableError (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
    at Socket.<anonymous> (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/kafkaClient.js:630:43)
    at emitOne (events.js:96:13)
    at Socket.emit (events.js:188:7)
    at TCP._handle.close [as _onclose] (net.js:497:12) message: 'Broker not available' } +1ms
  kafka-node:KafkaClient error initialize broker after connect { BrokerNotAvailableError: Broker not available
    at new BrokerNotAvailableError (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
    at Socket.<anonymous> (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/kafkaClient.js:630:43)
    at emitOne (events.js:96:13)
    at Socket.emit (events.js:188:7)
    at TCP._handle.close [as _onclose] (net.js:497:12) message: 'Broker not available' } +2ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +351ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +2ms
  kafka-node:KafkaClient getApiVersions request timedout probably less than 0.10 using base support +145ms
  kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":1,"usable":1},"fetch":{"min":0,"max":1,"usable":1},"offset":{"min":0,"max":0,"usable":0},"metadata":{"min":0,"max":0,"usable":0},"leader":null,"stopReplica":null,"updateMetadata":null,"controlledShutdown":null,"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":1,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":0,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":0},"listGroups":{"min":0,"max":0,"usable":0},"saslHandshake":null,"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":null,"deleteTopics":null} +0ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +499ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +43ms
  kafka-node:KafkaClient Sending versions request to kb2.local.thsp.tech:29092 +2ms
  kafka-node:KafkaClient ApiVersions failed with unexpected error { BrokerNotAvailableError: Broker not available
    at new BrokerNotAvailableError (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
    at Socket.<anonymous> (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/kafkaClient.js:630:43)
    at emitOne (events.js:96:13)
    at Socket.emit (events.js:188:7)
    at TCP._handle.close [as _onclose] (net.js:497:12) message: 'Broker not available' } +2ms
  kafka-node:KafkaClient error initialize broker after connect { BrokerNotAvailableError: Broker not available
    at new BrokerNotAvailableError (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
    at Socket.<anonymous> (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/kafkaClient.js:630:43)
    at emitOne (events.js:96:13)
    at Socket.emit (events.js:188:7)
    at TCP._handle.close [as _onclose] (net.js:497:12) message: 'Broker not available' } +2ms
  kafka-node:KafkaClient Sending versions request to kb2.local.thsp.tech:29092 +8ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +304ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +13ms
  kafka-node:KafkaClient getApiVersions request timedout probably less than 0.10 using base support +188ms
  kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":1,"usable":1},"fetch":{"min":0,"max":1,"usable":1},"offset":{"min":0,"max":0,"usable":0},"metadata":{"min":0,"max":0,"usable":0},"leader":null,"stopReplica":null,"updateMetadata":null,"controlledShutdown":null,"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":1,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":0,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":0},"listGroups":{"min":0,"max":0,"usable":0},"saslHandshake":null,"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":null,"deleteTopics":null} +0ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +485ms
  kafka-node:KafkaClient Sending versions request to kb2.local.thsp.tech:29092 +10ms
  kafka-node:KafkaClient ApiVersions failed with unexpected error { Error: read ECONNRESET
    at exports._errnoException (util.js:1020:11)
    at TCP.onread (net.js:568:26) code: 'ECONNRESET', errno: 'ECONNRESET', syscall: 'read' } +3ms
  kafka-node:KafkaClient error initialize broker after connect { Error: read ECONNRESET
    at exports._errnoException (util.js:1020:11)
    at TCP.onread (net.js:568:26) code: 'ECONNRESET', errno: 'ECONNRESET', syscall: 'read' } +3ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +1ms
  kafka-node:KafkaClient Sending versions request to kb2.local.thsp.tech:29092 +11ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +306ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +3ms
  kafka-node:KafkaClient getApiVersions request timedout probably less than 0.10 using base support +192ms
  kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":1,"usable":1},"fetch":{"min":0,"max":1,"usable":1},"offset":{"min":0,"max":0,"usable":0},"metadata":{"min":0,"max":0,"usable":0},"leader":null,"stopReplica":null,"updateMetadata":null,"controlledShutdown":null,"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":1,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":0,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":0},"listGroups":{"min":0,"max":0,"usable":0},"saslHandshake":null,"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":null,"deleteTopics":null} +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +489ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +13ms

How did the client get the new broker metadata before sending the second request? KafkaClient only requests metadata if the connection to the broker is broken or if it doesn’t have metadata about a topic/partition it’s trying to fetch which shouldn’t be the case here since the first request went through. If you have more details please attach them.

In this case you will need to send the second request again. The issue here is the client’s idea of who is the leader of that topic/partition has changed so that second request is no longer valid. Once metadata has been updated the client is able to send the produce request to the correct broker. Regarding the timeout-- TCP sockets tries to be very forgiving and tries to keep the connection going for a long time before a signal is sent to node. We implemented our own timeout for this case and you can configure a lower timeout if you’d like.

Hi, debug again, and found something.

If I kill the leader broker of a topic, it will be stuck, and KafkaClient would NOT try to refresh metadata. If I kill the broker which is not the leader of this topic, it still works. Then I divide into the code, in KafkaClient.js, from line:789 to line:801

KafkaClient.prototype.leaderLessPayloads = function (payloads) {
  return _.filter(payloads, payload => !this.hasMetadata(payload.topic, payload.partition));
};

const leaderLessPayloads = this.leaderLessPayloads(payloads);

  if (leaderLessPayloads.length === 0) {
    logger.debug('found leaders for all');
    return callback(null);
  }

and in Client.js from line:598 to line:603

Client.prototype.hasMetadata = function (topic, partition) {
  var brokerMetadata = this.brokerMetadata;
  var leader = this.leaderByPartition(topic, partition);

  return leader !== undefined && brokerMetadata[leader];
};

If I kill the leader of a topic, there still exist the leader broker in brokerMetadata of Client.js, so leaderLessPayloads.length === 0 is always satisfied, then it goes to reconnecting to the leader broker… I think kafka will elect a new leader of this topic if the old leader killed, isn’t it? but why the broker metadata is not refreshed if brokers changed ?

Thanks!