kafka-python: Client spams with warnings "Unable to send to wakeup socket" endlessly

kafka-python version: 2.0.1 python version: 3.7.7

This issue is a duplicate of #1842, that issue is closed and I cannot reopen it. Thus I opened new one.

When I close a spider, kafka starts to spam with “Unable to send to wakeup socket” warning and does not want to stop. Watch attached spider.log file.

I went to sources and add one line which “fix” this issue. Here is original code.

# Source: kafka/client_async.py
# Class: KafkaClient

    def wakeup(self):
        with self._wake_lock:
            try:
                self._wake_w.sendall(b'x')
            except socket.timeout:
                log.warning('Timeout to send to wakeup socket!')
                raise Errors.KafkaTimeoutError()
            except socket.error:
                log.warning('Unable to send to wakeup socket!')

This is fixed.

    def wakeup(self):
        with self._wake_lock:
            try:
                self._wake_w.sendall(b'x')
            except socket.timeout:
                log.warning('Timeout to send to wakeup socket!')
                raise Errors.KafkaTimeoutError()
            except socket.error as e:
                log.warning('Unable to send to wakeup socket!')
                raise e

I do not know what causes the problem and why raising exception stops spam.

About this issue

  • Original URL
  • State: open
  • Created 4 years ago
  • Reactions: 15
  • Comments: 15

Most upvoted comments

Same here–this happens as soon as I try to gracefully shut down a consumer. As it stands, I have to forcefully kill the process to get it to stop.

@adityaraj-28

We have a custom context manager for setting up and closing a consumer, I was using the consumer after it had been closed. Fix is simply not to do that. Code was more complex than the example, but simplified, it boils down to this:

with create_consumer() as consumer:
    # Use consumer here.
    ...

# If you use it here, outside the managed context, it's closed
# and you get a bad file descriptor error.

I found another way to solve this problem without modifying source code of Kafka-python.

    def run(self) -> Iterator[Event]:
        self._consumer = KafkaConsumer(
            self._kafka_topic,
            bootstrap_servers=self._kafka_server,
            value_deserializer=self.data_deserializer,
            consumer_timeout_ms=16000)

        while not self._stop_receiver:
            try:
                for msg in self._consumer:
                    #  pdb.set_trace()
                    if isinstance(msg.value, Event):
                        yield msg.value
            except StopIteration:
                continue
    
    def close(self):
        if self._consumer is not None:
            #  self._consumer.unsubscribe()
            self._stop_receiver = True
            time.sleep(1)
            self._consumer.unsubscribe()
            self._consumer.close()

I am seeing the same behavior with nameko_kafka which uses this lib internally.