node-rdkafka: Unknown Broker error when using seek

Using your docker-compose.yml file:

I run this producer to create this topic:

const kafka = require('node-rdkafka');

const producer = new kafka.Producer({
  'metadata.broker.list': 'localhost:9092',
  'broker.version.fallback': '0.10.0', // If kafka node doesn't have API, use this instead
  'api.version.request': true // Request the api version of Kafka node
});

producer.connect();
producer.on('ready', () => {
  console.log('producer is ready');
  try {
    producer.produce('TEST_TOPIC', null, Buffer.from(JSON.stringify({ test: 'test' })));
  } catch (e) {
    console.error(e);
  }
});

I then boot up my consumer:

const kafka = require('node-rdkafka');

const consumer = new kafka.KafkaConsumer({
  'group.id': 'consumer',
  'metadata.broker.list': 'localhost:9092',
  'broker.version.fallback': '0.10.0', // If kafka node doesn't have API, use this instead
  'api.version.request': true // Request the api version of Kafka node
});

consumer.connect();

consumer
  .on('ready', () => {
    console.log('consumer is ready');
    consumer.subscribe(['TEST_TOPIC']);

    consumer.seek(
      {
        topic: 'TEST_TOPIC',
        partition: 0,
        offset: 0
      },
      0,
      (err) => {
        console.log('seek', err);
        consumer.consume();
      }
    );
  })
  .on('data', (data) => {
    console.log('DATA RECEIVED', data);
    const dataMsg = JSON.parse(data.value.toString());
    console.log('parsed data', dataMsg);
  });

When I do this, I expect to process the first message that the producer created first. However, I instead get a “Unknown broker error”.

Is this not how seek is meant to be used?

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 18 (6 by maintainers)

Most upvoted comments

I was also getting the Local: Erroneous state. I realized that in case of implicit assign (subscribe), you would have to wait for rebalance_cb to kick off and then call seek. And in case of explicit assign, you would have to specify the offset in your topicPartition object. Here is the sample solution I came up with: Implicit call to assign:

const topics = ['test'];
const consumer = new Kafka.KafkaConsumer({
    'group.id': 'kafka',
    'enable.auto.commit': false,
    'enable.auto.offset.store': false,
    'auto.offset.reset': 'earliest',
    'metadata.broker.list': 'localhost:9092',
    'rebalance_cb': function(err, assignments) {
        if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
           // stuff you wanna do before assignment
          this.assign(assignments);
          this.emit('rebalanced'); // emit event 
        } else if (err.code === Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
            // Same as above, this can throw if we are disconnected.
            this.unassign();
        } else {
            // We had a real error.
            console.error(err);
        }
    }
}, {});

consumer.connect();
consumer.on('ready', function() {
    consumer.subscribe(topics);
    // now listen to rebalanced event
    consumer.on('rebalanced', function () {
        consumer.seek({topic: 'test', partition: 0, offset: 100}, 10, function (err) {
            console.log(err);
        }).on('data', (data) => {
            console.log(`${data.topic}  :  ${data.offset}`);
        });
    });

    consumer.consume();
});

In explicit call to assign, you would need to specify the offset and then consume it, or use seek to change the offset (without specifying the offset in topicPartition object, you would still get an error if you call seek function):

const consumer = new Kafka.KafkaConsumer({
    'group.id': 'kafka',
    'enable.auto.commit': false,
    'enable.auto.offset.store': false,
    'auto.offset.reset': 'earliest',
    'metadata.broker.list': 'localhost:9092',
}, {});

consumer.connect();
consumer.on('ready', function() {
    consumer.assign([{topic: 'test', partition: 0, offset: 100}]);
   // this becomes unnecessary as we already explicitly assign
   // but still you can call seek with no error
   /* 
   consumer.seek({topic: 'test', partition: 0, offset: 100}, 10, function (err) {
            console.log(err);
    });
   */
    consumer.consume();
    consumer.on('data', (data) => {
            console.log(`${data.topic}  :  ${data.offset}`);
    })
});

I hope this helps.

Thanks