kafka-python: KafkaConsumer stuck in infinite loop on connection error

It seems to be stuck in this loop https://github.com/dpkp/kafka-python/blob/34dc9dd2fe6b47f4542c5a131e0e0cbc1b00ed80/kafka/conn.py#L294

The consumer filled up ~1TB logs over the course of 3 days, but did not throw an exception. Example logs:

kafka.cluster INFO     Group coordinator for my-group is BrokerMetadata(nodeId=102, host=u'kafka-2-broker.example.com', port=9092, rack=None)
kafka.cluster INFO     Group coordinator for my-group is BrokerMetadata(nodeId=102, host=u'kafka-2-broker.example.com', port=9092, rack=None)
kafka.conn   ERROR    Unable to connect to any of the names for kafka-4-broker.example.com:9092
kafka.conn   WARNING  <BrokerConnection node_id=104 host=kafka-4-broker.example.com/kafka-4-broker.example.com port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-4-broker.example.com:9092
kafka.cluster INFO     Group coordinator for my-group is BrokerMetadata(nodeId=102, host=u'kafka-2-broker.example.com', port=9092, rack=None)
kafka.cluster INFO     Group coordinator for my-group is BrokerMetadata(nodeId=102, host=u'kafka-2-broker.example.com', port=9092, rack=None)
kafka.coordinator INFO     Discovered coordinator 102 for group my-group
kafka.conn   ERROR    Unable to connect to any of the names for kafka-1-broker.example.com:9092
kafka.coordinator INFO     Discovered coordinator 102 for group my-group
kafka.cluster INFO     Group coordinator for my-group is BrokerMetadata(nodeId=102, host=u'kafka-2-broker.example.com', port=9092, rack=None)
kafka.conn   ERROR    Unable to connect to any of the names for kafka-2-broker.example.com:9092
kafka.conn   WARNING  <BrokerConnection node_id=104 host=kafka-4-broker.example.com/kafka-4-broker.example.com port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-4-broker.example.com:9092
kafka.cluster INFO     Group coordinator for my-group is BrokerMetadata(nodeId=102, host=u'kafka-2-broker.example.com', port=9092, rack=None)
kafka.conn   WARNING  <BrokerConnection node_id=101 host=kafka-1-broker.example.com/kafka-1-broker.example.com port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-1-broker.example.com:9092
kafka.conn   ERROR    Unable to connect to any of the names for kafka-2-broker.example.com:9092
kafka.conn   ERROR    Unable to connect to any of the names for kafka-2-broker.example.com:9092
kafka.coordinator INFO     Discovered coordinator 102 for group my-group
kafka.conn   ERROR    Unable to connect to any of the names for kafka-3-broker.example.com:9092
kafka.coordinator INFO     Discovered coordinator 102 for group my-group
kafka.conn   WARNING  <BrokerConnection node_id=102 host=kafka-2-broker.example.com/kafka-2-broker.example.com port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-2-broker.example.com:9092
kafka.conn   WARNING  <BrokerConnection node_id=103 host=kafka-3-broker.example.com/kafka-3-broker.example.com port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-3-broker.example.com:9092
kafka.coordinator INFO     Discovered coordinator 102 for group my-group
kafka.coordinator INFO     Discovered coordinator 102 for group my-group
kafka.conn   WARNING  <BrokerConnection node_id=102 host=kafka-2-broker.example.com/kafka-2-broker.example.com port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-2-broker.example.com:9092

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Reactions: 3
  • Comments: 24 (3 by maintainers)

Most upvoted comments

@dpkp Does the fix for this bug was release on version 1.4? I’m using version 1.4.6 and when shutting down Kafka but with a worker that calls KafkaConsumer, the infinite loop still happen.

        try:
            cls._consumer = KafkaConsumer(
                ...
            )
        except KafkaError as err:
            raise FailedToConnect(f"{err}")

And because of the infinite loop, the except KafkaError never happen.

Some logs:

kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to kafka:9092 [('127.0.0.1', 9092) IPv4]
kafka.conn - ERROR - Connect attempt to <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]> returned error 111. Disconnecting.
kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED

I have observed same issue. I am using kafka 1.4.6. Please reopen this issue.

same error on kafka-python==2.0.2

@dpkp Does the fix for this bug was release on version 1.4? I’m using version 1.4.6 and when shutting down Kafka but with a worker that calls KafkaConsumer, the infinite loop still happen.

        try:
            cls._consumer = KafkaConsumer(
                ...
            )
        except KafkaError as err:
            raise FailedToConnect(f"{err}")

And because of the infinite loop, the except KafkaError never happen.

Some logs:

kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to kafka:9092 [('127.0.0.1', 9092) IPv4]
kafka.conn - ERROR - Connect attempt to <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]> returned error 111. Disconnecting.
kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED

I met the same question with kafka-python==2.0.1. Is this a bug? Looking forward to your reply, thanks@dpkp

@dpkp Does the fix for this bug was release on version 1.4? I’m using version 1.4.6 and when shutting down Kafka but with a worker that calls KafkaConsumer, the infinite loop still happen.

        try:
            cls._consumer = KafkaConsumer(
                ...
            )
        except KafkaError as err:
            raise FailedToConnect(f"{err}")

And because of the infinite loop, the except KafkaError never happen.

Some logs:

kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to kafka:9092 [('127.0.0.1', 9092) IPv4]
kafka.conn - ERROR - Connect attempt to <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]> returned error 111. Disconnecting.
kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED

I had a similar connection error when a Python Kafka App (client) was running in docker and the Kafka broker was running on my machine and not in docker or any container, listening on localhost:9092. If you inspect the server config, you’d find this

listeners=PLAINTEXT://localhost:9092

and nothing set for advertised.listeners. In this case, since the advertised.listeners isn’t set to anything, it will use the same value as listeners.

So, once a client (be it consumer/producer) connects to the broker using the right hostname and port, it happens to be host.docker.internal and 9092 in my case, the broker sends back metadata saying that you should connect to localhost:9092 to connect to all brokers, but localhost on docker is not the same as the localhost on your machine/workstation/laptop, due to how docker network abstraction works. So, using the following config for broker works

listeners=PLAINTEXT://:9092,DOCKER://:19092
advertised.listeners=PLAINTEXT://localhost:9092,DOCKER://host.docker.internal:19092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,DOCKER:PLAINTEXT

Once this config is in place and the broker is restarted, you can connect to it from any docker app using host.docker.internal:9092. And app running on your local can connect using localhost:9092 or 127.0.0.1:9092 or <YOUR_LOCAL_IP>:9092.

For better understanding, please read this excellent blog post https://www.confluent.io/blog/kafka-listeners-explained/

@darkprisco if you are hit by this bug, use 1.3.3 (or master); there’s yet no release that contains this fix.