kafkajs: producer.connect() - KafkaJSError: The producer is disconnected
Describe the bug Somewhere is problem in 1.14.0 version. I tried to connect and then produce message, but sometimes I got error:
(node:78713) UnhandledPromiseRejectionWarning: KafkaJSError: The producer is disconnected
at validateConnectionStatus (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:30:15)
at sendBatch (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:81:5)
at Object.send (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:153:12)
at EventHubClient.produce (/Users/g.karabinosova/ares/node_packages/eventhub/src/EventHubClient.js:111:25)
at processTicksAndRejections (internal/process/next_tick.js:81:5)
Also producer.event.CONNECTED is not emitted.
I tried to produce 2 messages at once and to log KafkaJS connectionStatus into console and result is:
2020-10-05T16:29:20.854Z level=info label=eventhub-client message="Registered handler for all events."
CONNECTING................. (Output from producer.connect method )
CONNECTING.................
CONNECTED.................
2020-10-05T16:29:20.879Z level=info label=eventhub-client message="Producing 1 message(s)=[{'key':'partition-key1','value':'{\'id\':\'739ec863-be56-43b1-99bd-f4ef2184081c\',\'time\':\'2020-10-05T16:29:20.855Z\',\'specversion\':\'0.2\',\'type\':\'event1\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar1\'}}','headers':{}}] test"
STATUS....................... disconnected (Output from validateConnectionStatus in sentBatch method )
(node:78713) UnhandledPromiseRejectionWarning: KafkaJSError: The producer is disconnected
at validateConnectionStatus (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:30:15)
at sendBatch (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:81:5)
at Object.send (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:153:12)
at EventHubClient.produce (/Users/g.karabinosova/ares/node_packages/eventhub/src/EventHubClient.js:111:25)
at processTicksAndRejections (internal/process/next_tick.js:81:5)
(node:78713) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 2)
(node:78713) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
CONNECTED.................
2020-10-05T16:29:20.882Z level=info label=eventhub-client message="Producing 2 message(s)=[{'key':'partition-key2','value':'{\'id\':\'996b7604-e715-4a31-97ce-d058abac721d\',\'time\':\'2020-10-05T16:29:20.858Z\',\'specversion\':\'0.2\',\'type\':\'event2\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar2\'}}','headers':{}},{'key':'partition-key3','value':'{\'id\':\'f1fcc9e2-1905-417d-a54e-b624e5b299b6\',\'time\':\'2020-10-05T16:29:20.858Z\',\'specversion\':\'0.2\',\'type\':\'event3\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar3\'}}','headers':{}}] test2"
STATUS....................... connected
Code sample:
if (!this.producerConnected) {
this.producer = this.kafka.producer();
try {
await this.producer.connect();
this.producerConnected = true;
}catch(error){
throw new Error(`Couldn't connect producer.`);
}
}
await this.producer.send({ topic, messages });
To Reproduce Please provide either a link to a:
If none of the above are possible to provide, please write down the exact steps to reproduce the behavior:
- Run a producer that continuously produces messages to a topic
- Run a consumer that subscribes to that topic and logs each topic offset
- After the consumer has consumed 100 messages, it…
Expected behavior resolve connect then produce messages
Observed behavior Even when connection is resolved, producing first message failed and connectionStatus is not updated, other messages are produced.
Environment:
- OS: Mac OS 18.7.0
- KafkaJS version 1.14.0
- Kafka version 5.1.2
- NodeJS version 11.10.0
Additional context Add any other context about the problem here.
About this issue
- Original URL
- State: open
- Created 4 years ago
- Reactions: 9
- Comments: 25 (4 by maintainers)
You should just connect at startup and disconnect at shutdown.
There seems to be a bug introduced with the latest version related to reconnects, but regardless, nothing is changing about the intended usage.
It is probably because you submit message without connection built at the beginning , it happened in parallel coding but no problem in serial coding. So just make sure there is a success connection initialized before sending msg. Plus, You can use debug logging to check the disconnected connection information when sending messages in parallel which causes this issue. Good luck
On Thu, Jul 7, 2022 at 04:46 Manish Sharma @.***> wrote:
@crobinson42 try
1.15but the cause is very likely to be you not connecting the producer, this was always a requirement but somehow worked on1.12, give1.15a try otherwise we will need more context to investigate.What happens when you call
sendafter that? A producer is not meant to keep a constantly open connection. If the connection is idle, it should be closed. Then when you try to send, it should reconnect. If reconnecting doesn’t work, then there’s a problem.producer.connect()will reject. If Kafka is not a requirement for your service to operate, then you’ll need to design your application such that it tries to connect when needed instead of connecting at startup.I was looking into this issue and we definitely changed the behavior on 1.14, but it’s for the better. Without the changes, you would never be able to stop the producer without side-effects since it would always reconnect even if your server was going down. When you call
connectordisconnectyou express your intent and the consumer will operate on that, so adisconnectwill prevent the consumer from reconnecting or producing new messages since this is signaling that the service or producer is going down.If you have an API that is producing messages and you are shutting it down it usually means that the server will block new requests and start draining the remaining connections, which is also the case for the producer now.
This has always been the case. We have never supported sending messages without first connecting the producer. It’s possible that a change in the network layer caused a change in behavior, but even if it somehow worked before, it was never supported. Otherwise there would be no point in having a connect method.
Could you share a more complete example of what you are doing? I tried to reproduce it myself using the simple producer example from
examples/producer.jsand was not able to:Specifically, there’s clearly something fishy going on in
/Users/g.karabinosova/ares/node_packages/eventhub/src/EventHubClient.js:111:25since you’re having an unhandled promise rejection, meaning you’re not awaiting some promise.