kafkajs: Connection error to kafka hosted on confluent cloud with v1.14

Describe the bug Bumping kafkajs from 1.12.0 to 1.14.0 breaks with a kafka hosted on confluent cloud.

** Details ** Application is a simple producer, but fail when connecting to the broker.

Producer is connecting like this :

  const conf = {
    clientId: 'backend',
    brokers: KAFKA.bootstrapServers,
    ssl: true,
    sasl: {
      mechanism: 'plain' as SASLMechanism,
      username: ENV.CONFLUENT_KAFKA_API_KEY,
      password: ENV.CONFLUENT_KAFKA_API_SECRET,
    },
  };
  const kafka = new Kafka(conf);
  const producer = kafka.producer({
    createPartitioner: Partitioners.JavaCompatiblePartitioner,
  });
  await producer.connect();

And fails with:

2020-10-20T09:36:22.149Z 792bad59-e2e3-43de-a089-46b2b98bf354 ERROR {"level":"ERROR","timestamp":"2020-10-20T09:36:22.149Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"pkc-lq8v7.eu-central-1.aws.confluent.cloud:9092","clientId":"backend"} 
2020-10-20T09:36:22.150Z 792bad59-e2e3-43de-a089-46b2b98bf354 ERROR {"level":"ERROR","timestamp":"2020-10-20T09:36:22.150Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":196} 

// etc ... until max retry

Environment:

  • env: lambda in AWS (amz linux)
  • KafkaJS 1.14.0
  • Kafka 2.6.0 (confluent v6.0.0)
  • NodeJS 12.13.1

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 24 (2 by maintainers)

Most upvoted comments

Hey there, have you tried to increase the timeout on the broker config?

new kafkajs.Kafka({
    connectionTimeout: 10_000,
    authenticationTimeout: 10_000,
    ....
});

Here on my tests it worked, a bit shaky but it works.

I’m going to close this issue, since this appears to not be an issue with KafkaJS, but feel free to keep adding updates in case there is some information from Confluent regarding this issue.

Added some more debug output by running with NODE_DEBUG=tls as well as a custom socket factory with tracing enabled on the socket.

debug: ┏ [Connection] Connecting
debug: ┃ [0] {
debug: ┃ [1]   "timestamp": "2020-10-21T09:02:11.239Z",
debug: ┃ [2]   "logger": "kafkajs",
debug: ┃ [3]   "broker": "pkc-lq8v7.eu-central-1.aws.confluent.cloud:9092",
debug: ┃ [4]   "clientId": "test-admin-id",
debug: ┃ [5]   "ssl": true,
debug: ┃ [6]   "sasl": true
debug: ┗ [7] }
TLS 49146: client _init handle? true
TLS 49146: client initRead handle? true buffered? false
TLS 49146: client _start handle? true connecting? false requestOCSP? false
Sent Record
Header:
  Version = TLS 1.0 (0x301)
  Content Type = Handshake (22)
  Length = 261
    ClientHello, Length=257
      client_version=0x303 (TLS 1.2)
      Random:
        gmt_unix_time=0xEC4DD66B
        random_bytes (len=28): 0101D49BD07D6A0A417A07C681927EA9FC1E34366AC7EE60AD82195B
      session_id (len=32): 1C92B4BDF83CCFD0BFF901614333ABE6FCB870E4892116A0F7FA1C86FC486A5D
      cipher_suites (len=8)
        {0x13, 0x02} TLS_AES_256_GCM_SHA384
        {0x13, 0x03} TLS_CHACHA20_POLY1305_SHA256
        {0x13, 0x01} TLS_AES_128_GCM_SHA256
        {0x00, 0xFF} TLS_EMPTY_RENEGOTIATION_INFO_SCSV
      compression_methods (len=1)
        No Compression (0x00)
      extensions, length = 176
        extension_type=server_name(0), length=47
          0000 - 00 2d 00 00 2a 70 6b 63-2d 6c 71 38 76 37 2e   .-..*pkc-lq8v7.
          000f - 65 75 2d 63 65 6e 74 72-61 6c 2d 31 2e 61 77   eu-central-1.aw
          001e - 73 2e 63 6f 6e 66 6c 75-65 6e 74 2e 63 6c 6f   s.confluent.clo
          002d - 75 64                                          ud
        extension_type=ec_point_formats(11), length=4
          uncompressed (0)
          ansiX962_compressed_prime (1)
          ansiX962_compressed_char2 (2)
        extension_type=supported_groups(10), length=12
          ecdh_x25519 (29)
          secp256r1 (P-256) (23)
          ecdh_x448 (30)
          secp521r1 (P-521) (25)
          secp384r1 (P-384) (24)
        extension_type=session_ticket(35), length=0
        extension_type=encrypt_then_mac(22), length=0
        extension_type=extended_master_secret(23), length=0
        extension_type=signature_algorithms(13), length=30
          ecdsa_secp256r1_sha256 (0x0403)
          ecdsa_secp384r1_sha384 (0x0503)
          ecdsa_secp521r1_sha512 (0x0603)
          ed25519 (0x0807)
          ed448 (0x0808)
          rsa_pss_pss_sha256 (0x0809)
          rsa_pss_pss_sha384 (0x080a)
          rsa_pss_pss_sha512 (0x080b)
          rsa_pss_rsae_sha256 (0x0804)
          rsa_pss_rsae_sha384 (0x0805)
          rsa_pss_rsae_sha512 (0x0806)
          rsa_pkcs1_sha256 (0x0401)
          rsa_pkcs1_sha384 (0x0501)
          rsa_pkcs1_sha512 (0x0601)
        extension_type=supported_versions(43), length=3
          TLS 1.3 (772)
        extension_type=psk_key_exchange_modes(45), length=2
          psk_dhe_ke (1)
        extension_type=key_share(51), length=38
            NamedGroup: ecdh_x25519 (29)
            key_exchange:  (len=32): 71CEDCA193D9784DEA541D114F2BD03C7FB301E2CF33244E1925031435D7AC4F

Received Record
Header:
  Version = TLS 1.2 (0x303)
  Content Type = Alert (21)
  Length = 2
    Level=fatal(2), description=protocol version(70)

debug: ┏ [Connection] disconnecting...
debug: ┃ [0] {
debug: ┃ [1]   "timestamp": "2020-10-21T09:02:11.300Z",
debug: ┃ [2]   "logger": "kafkajs",
debug: ┃ [3]   "broker": "pkc-lq8v7.eu-central-1.aws.confluent.cloud:9092",
debug: ┃ [4]   "clientId": "test-admin-id"
debug: ┗ [5] }
error: ┏ [Connection] Connection error: Client network socket disconnected before secure TLS connection was established
error: ┃ [0] {
error: ┃ [1]   "timestamp": "2020-10-21T09:02:11.301Z",
error: ┃ [2]   "logger": "kafkajs",
error: ┃ [3]   "broker": "pkc-lq8v7.eu-central-1.aws.confluent.cloud:9092",
error: ┃ [4]   "clientId": "test-admin-id",
error: ┃ [5]   "stack": "Error: Client network socket disconnected before secure TLS connection was established\n    at connResetException (internal/errors.js:613:14)\n    at TLSSocket.onConnectEnd (_tls_wrap.js:1545:19)\n    at TLSSocket.emit (events.js:326:22)\n    at endReadableNT (_stream_readable.js:1226:12)\n    at processTicksAndRejections (internal/process/task_queues.js:80:21)"
error: ┗ [6] }
debug: ┏ [Connection] disconnecting...
debug: ┃ [0] {
debug: ┃ [1]   "timestamp": "2020-10-21T09:02:11.301Z",
debug: ┃ [2]   "logger": "kafkajs",
debug: ┃ [3]   "broker": "pkc-lq8v7.eu-central-1.aws.confluent.cloud:9092",
debug: ┃ [4]   "clientId": "test-admin-id"
debug: ┗ [5] }
debug: ┏ [Connection] disconnected
debug: ┃ [0] {
debug: ┃ [1]   "timestamp": "2020-10-21T09:02:11.301Z",
debug: ┃ [2]   "logger": "kafkajs",
debug: ┃ [3]   "broker": "pkc-lq8v7.eu-central-1.aws.confluent.cloud:9092",
debug: ┃ [4]   "clientId": "test-admin-id"
debug: ┗ [5] }
debug: ┏ [Connection] disconnected
debug: ┃ [0] {
debug: ┃ [1]   "timestamp": "2020-10-21T09:02:11.302Z",
debug: ┃ [2]   "logger": "kafkajs",
debug: ┃ [3]   "broker": "pkc-lq8v7.eu-central-1.aws.confluent.cloud:9092",
debug: ┃ [4]   "clientId": "test-admin-id"
debug: ┗ [5] }
TLS 49146: client onerror [Error: 4499246528:error:1409442E:SSL routines:ssl3_read_bytes:tlsv1 alert protocol version:../deps/openssl/openssl/ssl/record/rec_layer_s3.c:1544:SSL alert number 70
] {
  library: 'SSL routines',
  function: 'ssl3_read_bytes',
  reason: 'tlsv1 alert protocol version',
  code: 'ERR_SSL_TLSV1_ALERT_PROTOCOL_VERSION'
} had? true
^C

“alert number 70” from openssl is: “The protocol version the client attempted to negotiate is recognized, but not supported. For example, old protocol versions might be avoided for security reasons. This message is always fatal.”

If I’m reading the output right, it looks like the node process is saying that it wants to use TLS 1.2, which according to Confluent Clouds’ documentation is supported. https://docs.confluent.io/current/cloud/faq.html#:~:text=Security Controls whitepaper.-,What version of TLS is supported on Confluent Cloud,TLS version 1.2 is supported.&text=Effective March 15%2C 2020%2C connections,1.1 are no longer supported.

Doesn’t this look like kafkajs is trying to connect with TLS 1.0, and the response indicates that only 1.2 is supported…?

I’m not entirely sure. The output format is from openssl’s trace format, but it needs to be compiled with enable-ssl-trace for it to work, so I’m looking to see if there’s a pre-compiled binary with that that I can try with to see what the output looks like from there. EDIT: Now I’m compiling openssl myself. What a fun day this turned out to be.

My admittedly very basic understanding is that the initial handshake is done with TLS 1.0 but containing the supported TLS versions on the client, which the server and client then use to negotiate which protocol version to use:

ClientHello, Length=257
      client_version=0x303 (TLS 1.2)

We got the same issue since our migration to confluent cloud, we are facing troubles in many clients which return kafka timeout error. We have never got these errors before the migration. The errors appear randomly sometime at the beginning or while the service is running Below example of errors:

{"level":"ERROR","timestamp":"2023-10-05T12:20:35.333Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":5,"retryTime":11374}

[Connection] Connection error: connect ETIMEDOUT 34.79.12.25:9092
[Consumer] Crash: KafkaJSConnectionError: Connection timeout
[Producer] Failed to send messages: Connection timeout

Just increase timeout didn’t fix on our side. Need clear up-to-date