aiokafka: Topic leaders change/broker crash causes aiokafka to stop producing messages
Version information:
Kafka cluster: 0.10.2.1 aiokafka: 0.2.2 (kafka-python 1.3.1) Python: 3.5.2
Producer script:
import asyncio
from aiokafka import AIOKafkaProducer
@asyncio.coroutine
def produce(loop):
while True:
resp = yield from producer.send_and_wait('my-topic', value=b'some_message_bytes')
print("Message produced: partition {}; offset {}".format(resp.partition, resp.offset))
yield from asyncio.sleep(1)
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='bootstrap-server1:9092,bootstrap-server2:9092,bootstrap-server3:9092')
loop.run_until_complete(producer.start())
loop.run_until_complete(produce(loop))
loop.run_until_complete(producer.stop())
loop.close()
Bug reproduce flow:
- Start producer script, it will send message every 1 second.
- Stop one of Kafka brokers(partitions leaders will change).
- Script stops producing messages, no logs, anything, just hangs.
If you send kill command to the script, you will see this exception:
Task exception was never retrieved
future: <Task finished coro=<AIOKafkaProducer._send_produce_req() done, defined at lib/python3.5/site-packages/aiokafka/producer.py:331> exception=ValueError(<class 'struct.error'>,)>
Traceback (most recent call last):
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 17, in _unpack
(value,) = unpack(f, data)
struct.error: unpack requires a bytes object of length 4
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "lib/python3.5/site-packages/aiokafka/producer.py", line 365, in _send_produce_req
response = yield from self.client.send(node_id, request)
File "lib/python3.5/site-packages/aiokafka/client.py", line 375, in send
request, expect_response=expect_response)
File "lib/python3.5/site-packages/aiokafka/conn.py", line 141, in send
message = header.encode() + request.encode()
File "lib/python3.5/site-packages/kafka/protocol/struct.py", line 34, in _encode_self
[self.__dict__[name] for name in self.SCHEMA.names]
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in encode
for i, field in enumerate(self.fields)
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in <listcomp>
for i, field in enumerate(self.fields)
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in encode
[self.array_of.encode(item) for item in items]
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in <listcomp>
[self.array_of.encode(item) for item in items]
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in encode
for i, field in enumerate(self.fields)
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in <listcomp>
for i, field in enumerate(self.fields)
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in encode
[self.array_of.encode(item) for item in items]
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in <listcomp>
[self.array_of.encode(item) for item in items]
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in encode
for i, field in enumerate(self.fields)
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in <listcomp>
for i, field in enumerate(self.fields)
File "lib/python3.5/site-packages/kafka/protocol/message.py", line 154, in encode
size = Int32.decode(items)
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 50, in decode
return _unpack('>i', data.read(4))
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 20, in _unpack
raise ValueError(error)
ValueError: <class 'struct.error'>
We are looking into this, but feel free to comment.
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 20 (12 by maintainers)
@tvoinarovskyi Ok, so answer is to let the user decide what want to do in this case. I think it will be good to mention about it in docs/examples. Thank you for explanation.
I’m closing this issue. @tvoinarovskyi thanks again for quick response and fix.