aiokafka: [QUESTION] Unable to request metadata from node with id 0: Unable to update metadata from [0]
Hello, I have problem with producer. I am using
aiokafka==0.6.0 and kafka-2.5.0 (Commit:66563e712b0b9f84)
First I connect and send data to kafka in AWS server successfully. Then I wait 10 minutes and get logs:
[MainThread][ERROR][aiokafka ]: Unable to request metadata from node with id 0:
[MainThread][ERROR][aiokafka ]: Unable to update metadata from [0]
and trying sending (same) data again it fails with :
[MainThread][WARNING][aiokafka.producer.sender]: Got error produce response: [Error 7] RequestTimedOutError
My setup is using FastApi, aiokafka and gunicorn (workers=4):
...
app = FastAPI()
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cadata=cdata)
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers=bootstrap_servers,
security_protocol="SSL",
ssl_context=ssl_context,
value_serializer=orjson.dumps,
)
@app.on_event("startup")
async def startup_event():
logger.info('Startup')
await producer.start()
@app.on_event("shutdown")
async def shutdown_event():
logger.info('Shutdown')
await producer.stop()
logger.info('Poducer Flushing ....')
# producer.flush()
pass
# inject producer object to each request
@app.middleware("http")
async def kafka_middleware(request: Request, call_next):
global producer
request.state.producer = producer
response = await call_next(request)
return response
@app.get("/")
async def home(request: Request):
data = {"Hello": "World", "time": time.time()}
producer = request.state.producer
f = await producer.send('my-topic', data )
return data
I could initialize/connect AIOKafkaProducer at each request but I thing it is not right way.
Kafka server.properties file
############################# Server Basics #############################
broker.id=0
listeners=SSL_INTERNAL://:9094,SSL_EXTERNAL://:9093
advertised.listeners=SSL_INTERNAL://:9094,SSL_EXTERNAL://ec2-ip-address.eu-central-1.compute.amazonaws.com:9093
listener.security.protocol.map=SSL_INTERNAL:SSL,SSL_EXTERNAL:SSL
inter.broker.listener.name=SSL_INTERNAL
ssl.keystore.location=/path_to/keystore/kafka.keystore.jks
ssl.keystore.password=securepass
ssl.key.password=securepass
ssl.truststore.location=/path_to/truststore/kafka.truststore.jks
ssl.truststore.password=securepass
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=
############################# Log Basics #############################
log.dirs=/var/log/kafka-logs
num.partitions=4
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=4
transaction.state.log.replication.factor=4
transaction.state.log.min.isr=4
All other kafka and Zookeeper configs are defaults.
About this issue
- Original URL
- State: open
- Created 4 years ago
- Comments: 15 (5 by maintainers)
I am using Azure Event Hub as managed service. I am also getting the same error
2022-01-11 05:44:59,604 ERROR – Unable to request metadata from node with id 0: TimeoutError() 2022-01-11 05:50:39,610 ERROR – Unable to request metadata from node with id 0: TimeoutError() 2022-01-11 05:50:39,610 ERROR – Unable to update metadata from [0] 2022-01-11 05:50:39,610 ERRO – Unable to update metadata from [0]
I am using fastapi polling consumers every sec by using getOne() message.