aiokafka: [QUESTION] Unable to request metadata from node with id 0: Unable to update metadata from [0]

Hello, I have problem with producer. I am using aiokafka==0.6.0 and kafka-2.5.0 (Commit:66563e712b0b9f84)

First I connect and send data to kafka in AWS server successfully. Then I wait 10 minutes and get logs:

[MainThread][ERROR][aiokafka       ]: Unable to request metadata from node with id 0: 
[MainThread][ERROR][aiokafka       ]: Unable to update metadata from [0]

and trying sending (same) data again it fails with :

[MainThread][WARNING][aiokafka.producer.sender]: Got error produce response: [Error 7] RequestTimedOutError

My setup is using FastApi, aiokafka and gunicorn (workers=4):

...

app = FastAPI()

ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cadata=cdata)
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers=bootstrap_servers, 
                                        security_protocol="SSL",
                                        ssl_context=ssl_context,
                                        value_serializer=orjson.dumps,                                       
                                        )

@app.on_event("startup")
async def startup_event():
    logger.info('Startup')
    await producer.start()

@app.on_event("shutdown")
async def shutdown_event():
    logger.info('Shutdown')
    await producer.stop()   
    logger.info('Poducer Flushing ....')
    # producer.flush()
    pass

# inject producer object to each request
@app.middleware("http")
async def kafka_middleware(request: Request, call_next):
    global producer 
    request.state.producer = producer
    response = await call_next(request)
    return response

@app.get("/")
async def home(request: Request):
    data = {"Hello": "World", "time": time.time()}
    producer = request.state.producer
    f = await producer.send('my-topic', data )
    return data

I could initialize/connect AIOKafkaProducer at each request but I thing it is not right way.

Kafka server.properties file

############################# Server Basics #############################
broker.id=0

listeners=SSL_INTERNAL://:9094,SSL_EXTERNAL://:9093
advertised.listeners=SSL_INTERNAL://:9094,SSL_EXTERNAL://ec2-ip-address.eu-central-1.compute.amazonaws.com:9093

listener.security.protocol.map=SSL_INTERNAL:SSL,SSL_EXTERNAL:SSL
inter.broker.listener.name=SSL_INTERNAL

ssl.keystore.location=/path_to/keystore/kafka.keystore.jks
ssl.keystore.password=securepass
ssl.key.password=securepass
ssl.truststore.location=/path_to/truststore/kafka.truststore.jks
ssl.truststore.password=securepass
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=

############################# Log Basics #############################
log.dirs=/var/log/kafka-logs
num.partitions=4

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=4
transaction.state.log.replication.factor=4
transaction.state.log.min.isr=4

All other kafka and Zookeeper configs are defaults.

About this issue

  • Original URL
  • State: open
  • Created 4 years ago
  • Comments: 15 (5 by maintainers)

Most upvoted comments

I am using Azure Event Hub as managed service. I am also getting the same error

AEH_PRODUCER_CONFIG = dict(
    bootstrap_servers=os.getenv('ENVIRONMENT_VARIABLE_AEH_SERVER_PORT'),
    sasl_plain_username=os.getenv('ENVIRONMENT_VARIABLE_AEH_USERNAME'),
    sasl_plain_password=get_secrets().sasl_pw,
    security_protocol='SASL_SSL',
    sasl_mechanism='PLAIN',
    client_id='python-publisher-client',
    acks=1)

AEH_CONSUMER_CONFIG = dict(
    bootstrap_servers=os.getenv('ENVIRONMENT_VARIABLE_AEH_SERVER_PORT'),
    group_id=os.getenv('ENVIRONMENT_VARIABLE_AEH_GROUP_ID'),
    sasl_plain_username=os.getenv('ENVIRONMENT_VARIABLE_AEH_USERNAME'),
    sasl_plain_password=get_secrets().sasl_pw,
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    sasl_mechanism='PLAIN',
    security_protocol='SASL_SSL')

2022-01-11 05:44:59,604 ERROR – Unable to request metadata from node with id 0: TimeoutError() 2022-01-11 05:50:39,610 ERROR – Unable to request metadata from node with id 0: TimeoutError() 2022-01-11 05:50:39,610 ERROR – Unable to update metadata from [0] 2022-01-11 05:50:39,610 ERRO – Unable to update metadata from [0]

I am using fastapi polling consumers every sec by using getOne() message.