aiokafka: Topic leaders change/broker crash causes aiokafka to stop producing messages

Version information:

Kafka cluster: 0.10.2.1 aiokafka: 0.2.2 (kafka-python 1.3.1) Python: 3.5.2

Producer script:

import asyncio
from aiokafka import AIOKafkaProducer

@asyncio.coroutine
def produce(loop):
    while True:
        resp = yield from producer.send_and_wait('my-topic', value=b'some_message_bytes')
        print("Message produced: partition {}; offset {}".format(resp.partition, resp.offset))
        yield from asyncio.sleep(1)

loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='bootstrap-server1:9092,bootstrap-server2:9092,bootstrap-server3:9092')
loop.run_until_complete(producer.start())
loop.run_until_complete(produce(loop))
loop.run_until_complete(producer.stop())
loop.close()

Bug reproduce flow:

  1. Start producer script, it will send message every 1 second.
  2. Stop one of Kafka brokers(partitions leaders will change).
  3. Script stops producing messages, no logs, anything, just hangs.

If you send kill command to the script, you will see this exception:

Task exception was never retrieved
future: <Task finished coro=<AIOKafkaProducer._send_produce_req() done, defined at lib/python3.5/site-packages/aiokafka/producer.py:331> exception=ValueError(<class 'struct.error'>,)>
Traceback (most recent call last):
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 17, in _unpack
    (value,) = unpack(f, data)
struct.error: unpack requires a bytes object of length 4

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "lib/python3.5/site-packages/aiokafka/producer.py", line 365, in _send_produce_req
    response = yield from self.client.send(node_id, request)
  File "lib/python3.5/site-packages/aiokafka/client.py", line 375, in send
    request, expect_response=expect_response)
  File "lib/python3.5/site-packages/aiokafka/conn.py", line 141, in send
    message = header.encode() + request.encode()
  File "lib/python3.5/site-packages/kafka/protocol/struct.py", line 34, in _encode_self
    [self.__dict__[name] for name in self.SCHEMA.names]
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in encode
    for i, field in enumerate(self.fields)
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in <listcomp>
    for i, field in enumerate(self.fields)
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in encode
    [self.array_of.encode(item) for item in items]
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in <listcomp>
    [self.array_of.encode(item) for item in items]
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in encode
    for i, field in enumerate(self.fields)
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in <listcomp>
    for i, field in enumerate(self.fields)
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in encode
    [self.array_of.encode(item) for item in items]
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in <listcomp>
    [self.array_of.encode(item) for item in items]
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in encode
    for i, field in enumerate(self.fields)
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in <listcomp>
    for i, field in enumerate(self.fields)
  File "lib/python3.5/site-packages/kafka/protocol/message.py", line 154, in encode
    size = Int32.decode(items)
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 50, in decode
    return _unpack('>i', data.read(4))
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 20, in _unpack
    raise ValueError(error)
ValueError: <class 'struct.error'>

We are looking into this, but feel free to comment.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 20 (12 by maintainers)

Commits related to this issue

Most upvoted comments

@tvoinarovskyi Ok, so answer is to let the user decide what want to do in this case. I think it will be good to mention about it in docs/examples. Thank you for explanation.

I’m closing this issue. @tvoinarovskyi thanks again for quick response and fix.