node-rdkafka: Unknown Broker error when using seek
Using your docker-compose.yml file:
I run this producer to create this topic:
const kafka = require('node-rdkafka');
const producer = new kafka.Producer({
'metadata.broker.list': 'localhost:9092',
'broker.version.fallback': '0.10.0', // If kafka node doesn't have API, use this instead
'api.version.request': true // Request the api version of Kafka node
});
producer.connect();
producer.on('ready', () => {
console.log('producer is ready');
try {
producer.produce('TEST_TOPIC', null, Buffer.from(JSON.stringify({ test: 'test' })));
} catch (e) {
console.error(e);
}
});
I then boot up my consumer:
const kafka = require('node-rdkafka');
const consumer = new kafka.KafkaConsumer({
'group.id': 'consumer',
'metadata.broker.list': 'localhost:9092',
'broker.version.fallback': '0.10.0', // If kafka node doesn't have API, use this instead
'api.version.request': true // Request the api version of Kafka node
});
consumer.connect();
consumer
.on('ready', () => {
console.log('consumer is ready');
consumer.subscribe(['TEST_TOPIC']);
consumer.seek(
{
topic: 'TEST_TOPIC',
partition: 0,
offset: 0
},
0,
(err) => {
console.log('seek', err);
consumer.consume();
}
);
})
.on('data', (data) => {
console.log('DATA RECEIVED', data);
const dataMsg = JSON.parse(data.value.toString());
console.log('parsed data', dataMsg);
});
When I do this, I expect to process the first message that the producer created first. However, I instead get a “Unknown broker error”.
Is this not how seek is meant to be used?
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 18 (6 by maintainers)
I was also getting the
Local: Erroneous state
. I realized that in case of implicit assign (subscribe), you would have to wait for rebalance_cb to kick off and then call seek. And in case of explicit assign, you would have to specify the offset in your topicPartition object. Here is the sample solution I came up with: Implicit call to assign:In explicit call to assign, you would need to specify the offset and then consume it, or use seek to change the offset (without specifying the offset in topicPartition object, you would still get an error if you call seek function):
I hope this helps.
Thanks