kafkajs: UnhandledPromiseRejectionWarning is shown with exception KafkaJSNumberOfRetriesExceeded for connect()
This only actually happens when there are multiple consumers trying to connect at the same time. This warning is not shown when there is only one consumer.
Also, it is only in the first connect() and Kafka is not available
{"level":"ERROR","timestamp":"2019-07-06T17:27:49.140Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED 127.0.0.1:9092","retryCount":5,"retryTime":9178}
(node:13288) UnhandledPromiseRejectionWarning: KafkaJSNumberOfRetriesExceeded
Caused by: KafkaJSConnectionError: Connection error: connect ECONNREFUSED 127.0.0.1:9092
at Socket.onError (/Users/psilva/development/projects/manhattanportal/node_modules/kafkajs/src/network/connection.js:136:23)
at Socket.emit (events.js:194:15)
at emitErrorNT (internal/streams/destroy.js:82:8)
at emitErrorAndCloseNT (internal/streams/destroy.js:50:3)
at process._tickCallback (internal/process/next_tick.js:63:19)
(node:13288) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:13288) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
Code for connecting:
this._consumer.connect()
.then(r =>
this._consumer.subscribe({
fromBeginning: false,
topic: this._topic
}))
.then(r => {
return this._consumer.run({
autoCommit: true,
autoCommitInterval: global.kafkaAutoCommitInterval,
autoCommitThreshold: global.kafkaAutoCommitThreshold,
eachMessage: (p) => {
try {
this.processMessage(p);
}
catch(e) {
// log error
}
return Promise.resolve();
}
})
})
.catch(e => {
if(e.retriable === false) {
// consumer failed to connect and won't keep retrying
// perform shutdown and reconnect
setTimeoutPromise(global.kafkaConnectInterval).then(() => this._consumer.disconnect()).then(() => this.connect());
}
this.emit("error", e);
});
}
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Reactions: 1
- Comments: 21 (5 by maintainers)
The reasoning behind our release strategy is documented here: https://kafka.js.org/docs/pre-releases
As of yesterday, we publish pre-release versions on every push to master, that you can install via the beta tag.
The broker is correct; it should propagate the error. The fix is probably higher up.