kafkajs: producer.connect() - KafkaJSError: The producer is disconnected

Describe the bug Somewhere is problem in 1.14.0 version. I tried to connect and then produce message, but sometimes I got error:

(node:78713) UnhandledPromiseRejectionWarning: KafkaJSError: The producer is disconnected
    at validateConnectionStatus (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:30:15)
    at sendBatch (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:81:5)
    at Object.send (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:153:12)
    at EventHubClient.produce (/Users/g.karabinosova/ares/node_packages/eventhub/src/EventHubClient.js:111:25)
    at processTicksAndRejections (internal/process/next_tick.js:81:5)

Also producer.event.CONNECTED is not emitted.

I tried to produce 2 messages at once and to log KafkaJS connectionStatus into console and result is:

2020-10-05T16:29:20.854Z level=info label=eventhub-client message="Registered handler for all events."
CONNECTING.................  (Output from producer.connect method )
CONNECTING.................
CONNECTED.................
2020-10-05T16:29:20.879Z level=info label=eventhub-client message="Producing 1 message(s)=[{'key':'partition-key1','value':'{\'id\':\'739ec863-be56-43b1-99bd-f4ef2184081c\',\'time\':\'2020-10-05T16:29:20.855Z\',\'specversion\':\'0.2\',\'type\':\'event1\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar1\'}}','headers':{}}] test"
STATUS....................... disconnected (Output from validateConnectionStatus in sentBatch method )
(node:78713) UnhandledPromiseRejectionWarning: KafkaJSError: The producer is disconnected
    at validateConnectionStatus (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:30:15)
    at sendBatch (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:81:5)
    at Object.send (/Users/g.karabinosova/ares/node_packages/eventhub/node_modules/kafkajs/src/producer/messageProducer.js:153:12)
    at EventHubClient.produce (/Users/g.karabinosova/ares/node_packages/eventhub/src/EventHubClient.js:111:25)
    at processTicksAndRejections (internal/process/next_tick.js:81:5)
(node:78713) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 2)
(node:78713) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
CONNECTED.................
2020-10-05T16:29:20.882Z level=info label=eventhub-client message="Producing 2 message(s)=[{'key':'partition-key2','value':'{\'id\':\'996b7604-e715-4a31-97ce-d058abac721d\',\'time\':\'2020-10-05T16:29:20.858Z\',\'specversion\':\'0.2\',\'type\':\'event2\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar2\'}}','headers':{}},{'key':'partition-key3','value':'{\'id\':\'f1fcc9e2-1905-417d-a54e-b624e5b299b6\',\'time\':\'2020-10-05T16:29:20.858Z\',\'specversion\':\'0.2\',\'type\':\'event3\',\'source\':\'/test\',\'contenttype\':\'application/json\',\'data\':{\'foo\':\'bar3\'}}','headers':{}}] test2"
STATUS....................... connected

Code sample:

if (!this.producerConnected) {
      this.producer = this.kafka.producer();
      try {
        await this.producer.connect();
        this.producerConnected = true;
      }catch(error){
        throw new Error(`Couldn't connect producer.`);
      }
    }
await this.producer.send({ topic, messages });

To Reproduce Please provide either a link to a:

  1. failing test in a KafkaJS fork
  2. repository with an example project reproducing the issue

If none of the above are possible to provide, please write down the exact steps to reproduce the behavior:

  1. Run a producer that continuously produces messages to a topic
  2. Run a consumer that subscribes to that topic and logs each topic offset
  3. After the consumer has consumed 100 messages, it…

Expected behavior resolve connect then produce messages

Observed behavior Even when connection is resolved, producing first message failed and connectionStatus is not updated, other messages are produced.

Environment:

  • OS: Mac OS 18.7.0
  • KafkaJS version 1.14.0
  • Kafka version 5.1.2
  • NodeJS version 11.10.0

Additional context Add any other context about the problem here.

About this issue

  • Original URL
  • State: open
  • Created 4 years ago
  • Reactions: 9
  • Comments: 25 (4 by maintainers)

Most upvoted comments

You should just connect at startup and disconnect at shutdown.

There seems to be a bug introduced with the latest version related to reconnects, but regardless, nothing is changing about the intended usage.

It is probably because you submit message without connection built at the beginning , it happened in parallel coding but no problem in serial coding. So just make sure there is a success connection initialized before sending msg. Plus, You can use debug logging to check the disconnected connection information when sending messages in parallel which causes this issue. Good luck

On Thu, Jul 7, 2022 at 04:46 Manish Sharma @.***> wrote:

Hi all, I have the same issue, Is there any resolution for this?

— Reply to this email directly, view it on GitHub https://github.com/tulios/kafkajs/issues/907#issuecomment-1176717045, or unsubscribe https://github.com/notifications/unsubscribe-auth/ARI74VPWCRE3MM62YW2L36TVSXWB3ANCNFSM4SE6BQCA . You are receiving this because you commented.Message ID: @.***>

@crobinson42 try 1.15 but the cause is very likely to be you not connecting the producer, this was always a requirement but somehow worked on 1.12, give 1.15 a try otherwise we will need more context to investigate.

What happens when you call send after that? A producer is not meant to keep a constantly open connection. If the connection is idle, it should be closed. Then when you try to send, it should reconnect. If reconnecting doesn’t work, then there’s a problem.

What happens if no broker is available at startup?

producer.connect() will reject. If Kafka is not a requirement for your service to operate, then you’ll need to design your application such that it tries to connect when needed instead of connecting at startup.

I was looking into this issue and we definitely changed the behavior on 1.14, but it’s for the better. Without the changes, you would never be able to stop the producer without side-effects since it would always reconnect even if your server was going down. When you call connect or disconnect you express your intent and the consumer will operate on that, so a disconnect will prevent the consumer from reconnecting or producing new messages since this is signaling that the service or producer is going down.

If you have an API that is producing messages and you are shutting it down it usually means that the server will block new requests and start draining the remaining connections, which is also the case for the producer now.

It seems like there was an unannounced breaking change that requires producer.connect to be called before sending?

This has always been the case. We have never supported sending messages without first connecting the producer. It’s possible that a change in the network layer caused a change in behavior, but even if it somehow worked before, it was never supported. Otherwise there would be no point in having a connect method.

I tried to produce 2 messages at once and to log KafkaJS connectionStatus into console and result is:

Could you share a more complete example of what you are doing? I tried to reproduce it myself using the simple producer example from examples/producer.js and was not able to:

node examples/producer.js
info:  ┏ Sending 870 messages #0...
info:  ┃ [0] {
info:  ┃ [1]   "timestamp": "2020-10-06T06:52:26.007Z",
info:  ┃ [2]   "logger": "kafkajs"
info:  ┗ [3] }
info:  ┏ Sending 602 messages #1...
info:  ┃ [0] {
info:  ┃ [1]   "timestamp": "2020-10-06T06:52:26.013Z",
info:  ┃ [2]   "logger": "kafkajs"
info:  ┗ [3] }
info:  ┏ Messages sent #0
info:  ┃ [ 0] {
info:  ┃ [ 1]   "timestamp": "2020-10-06T06:52:27.093Z",
info:  ┃ [ 2]   "logger": "kafkajs",
info:  ┃ [ 3]   "response": [
info:  ┃ [ 4]     {
info:  ┃ [ 5]       "topicName": "topic-test",
info:  ┃ [ 6]       "partition": 0,
info:  ┃ [ 7]       "errorCode": 0,
info:  ┃ [ 8]       "baseOffset": "0",
info:  ┃ [ 9]       "logAppendTime": "-1",
info:  ┃ [10]       "logStartOffset": "0"
info:  ┃ [11]     }
info:  ┃ [12]   ],
info:  ┃ [13]   "msgNumber": 1472
info:  ┗ [14] }
info:  ┏ Messages sent #1
info:  ┃ [ 0] {
info:  ┃ [ 1]   "timestamp": "2020-10-06T06:52:27.105Z",
info:  ┃ [ 2]   "logger": "kafkajs",
info:  ┃ [ 3]   "response": [
info:  ┃ [ 4]     {
info:  ┃ [ 5]       "topicName": "topic-test",
info:  ┃ [ 6]       "partition": 0,
info:  ┃ [ 7]       "errorCode": 0,
info:  ┃ [ 8]       "baseOffset": "870",
info:  ┃ [ 9]       "logAppendTime": "-1",
info:  ┃ [10]       "logStartOffset": "0"
info:  ┃ [11]     }
info:  ┃ [12]   ],
info:  ┃ [13]   "msgNumber": 1472
info:  ┗ [14] }

Specifically, there’s clearly something fishy going on in /Users/g.karabinosova/ares/node_packages/eventhub/src/EventHubClient.js:111:25 since you’re having an unhandled promise rejection, meaning you’re not awaiting some promise.