kafka-node: Wrong offset is being committed, messages are missing

Hi there, we are currently having problems running “kafka-node” in our production system.

Info:

  • Kafka-Broker Version 0.10.1
  • Node.js Version 7.4.0
  • kafka-node Version: 1.5.0
  • using ConsumerGroups
  • using a ConsumerGroup per topic
  • 3 topics with ~ 4,5 million messages each on 30 partitions each
  • Java and Scala consumer implementations have no problems consuming these topics

Implementation:

Problem:

  • the consumer reads messages just fine for a while, then suddenly stops @ about 800k incoming messages, sometimes makes it to ~ 2 million - however it never fully receives all messages (at least they are not emitted .on(“message”,…) and if we take a look at the consumer afterwards using Kafka Tool, we can see that the consumer-group has committed offsets to the latest state of the partitions - even though we are not committing manually and autCommit has been turned off.
  • (the reason why we see offset commits being made, is because we have turned on offset commit during close “consumer.close(true,…)” - yet the offsets shoud not be at the End of the partitions offset!)
  • if we consume other topics with fewer messages ~ 500k it works just fine

Questions:

  • is there any chance that using multiple ConsumerGroup instances in the same node process at once can break the consuming or offset committing process?
  • why is the final “consumer.close(true,…)” committing offsets for messages that we never recieved?
  • is there a chance (or better reason) for why we might drop messages on “.on(“message”,…)” ?
  • do we have to make any “batch” ack settings for consumers?
  • is there a chance that pausing & resuming the consumer too often kills the connection or partition offsets?

Configuration:

            ssl: false,
            groupId: "a-group-id",
            sessionTimeout: 30000,
            protocol: ["roundrobin"],
            fromOffset: "earliest",
            migrateHLC: false,
            migrateRolling: false,
            fetchMaxBytes: 1024 * 100,
            fetchMinBytes: 1,
            fetchMaxWaitMs: 100,
            autoCommit: false,
            autoCommitIntervalMs: 5000

We have hit some sort of wall here, thanks for any guesses in advance.

Chris

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Reactions: 3
  • Comments: 20

Most upvoted comments

@hyperlink thanks again for your support and attempts to ensure the trust in this lib, we were able to identify the reason for the problems. And I am happy to sum them up, for anyone else who might be stuck in a similar situation in the future.

  1. using 1 or 30 partitions does not matter, having a single instance or a cluster does not matter either, we ran the large-message-test against all kinds of setups.

  2. combining .setOffset with manual consumer.commits() is a bad idea

  3. expecting .setOffset(topic, partition, 0) to work on a running consumer, even paused, can be problematic and is probably a bad behavior for any kind of kafka consumer

  4. when “experiencing missing messages” and “committed high offsets” make sure to understand the topic you are consuming - we have a large amount of topics here, with different owners. Keyed partitions might be very well subject to log compaction in large fashion depending on the data and frequency of publishes the topic goes through.

@krystianity here’s the updated test I wrote earlier to consume from 30 partitions w/ autoCommit off. I ran it several times without issues.

I ran the test several times in a row (test need to be adjusted) without resetting the kafka container was able to consume up to 6.3 million messages in 527 seconds.

Setup topic

Kafka bin tools

./kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 30 --topic ThirtyPartitions

Test code

./test/large-cs-test.js

'use strict';

const host = process.env['KAFKA_TEST_HOST'] || '';
const ConsumerGroup = require('../lib/consumerGroup');
const _ = require('lodash');
const uuid = require('uuid');
const async = require('async');
const Client = require('../lib/client');
const Producer = require('../lib/highLevelProducer');
const assert = require('assert');

let topic, client, consumerGroup, consumed, interval;
const numberOfMessages = 2100000;
const chunkSize = 30;

function sendUUIDMessages (times, topic, done) {
  console.log(`Trying to send ${times} messages`);
  const producer = new Producer(client, { requireAcks: 1 });
  assert(producer.ready, 'Producer is not ready');
  console.log('producer ready');
  const allMessages = _.times(times, uuid.v4);
  const chunked = _.chunk(allMessages, times / chunkSize);
  console.log(`${allMessages.length} messages generated. Sending in ${chunked.length} chunks of ${times / chunkSize}`);
  let count = 0;
  async.eachSeries(chunked, function (messages, callback) {
    console.log(`sending ${++count}`);
    producer.send([{topic: topic, messages: messages}], callback);
  }, done);
  return producer;
}

function init (done) {
  topic = 'ThirtyPartitions'; // uuid.v4();
  async.series([
    function (callback) {
      client = new Client(host, uuid.v4());
      client.once('connect', callback);
    },
    function (callback) {
      client.refreshMetadata([topic], callback);
    },
    function (callback) {
      sendUUIDMessages(numberOfMessages, topic, function (error) {
        if (error) {
          return callback(error);
        }
        assert.equal(client.topicPartitions[topic].length, chunkSize);
        callback();
      });
    },
    function (callback) {
      client.close(callback);
    }
  ], function (error) {
    if (error) {
      return done(error);
    }
    console.log('All messages sent');
    done();
  });
}

function bench (done) {
  const groupId = uuid.v4();
  console.log(`starting to consume using groupId: ${groupId}`);
  const time = process.hrtime();
  consumerGroup = new ConsumerGroup({
    fetchMaxBytes: 1024 * 100,
    groupId: groupId,
    host: host,
    sessionTimeout: 8000,
    heartbeatInterval: 7000,
    retryMinTimeout: 250,
    autoCommit: false,
    fromOffset: 'earliest'
  }, topic);

  const stats = {}

  consumed = 0;
  consumerGroup.once('error', done);
  consumerGroup.on('message', function (message) {
    stats[message.partition] = message.offset;

    if (++consumed === numberOfMessages) {
      console.log(`consumed ${consumed}`);
      const [seconds, nanoseconds] = process.hrtime(time);
      console.log(`took ${seconds}s ${nanoseconds / 1e6}ms`);
      console.log(stats);
      clearInterval(interval);
      done();
    }
  });

  consumerGroup.once('connect', function () {
    interval = setInterval(function () {
      console.log(`consumed ${consumed}`);
    }, 1000);
  });
}

async.series([init, bench], function (error) {
  if (error) {
    console.error('failed with', error);
  }
  client.close();
  consumerGroup.close(function() {})
});

I wrote a simple test single consumer against a single broker, and partition. I published 2.1 million messages and was able to consume them without an issue. Ran it several times. https://github.com/SOHU-Co/kafka-node/commit/283e6a88492168ed86993b683e5fc0868a469e26

If you’re interested in running it checkout the large-message-test branch.

You will need to run the test w/o timeout since it could take about 14 minutes to run.

./node_modules/.bin/mocha --no-timeouts test/test.consumerGroup.js --grep "large"

How many consumers are your group?