kafka-node: Duplicate messages using ConsumerGroup with kafkaHost

Bug Report

When trying to connect directly to the kafka brokers, all messages are duplicated. If I connect to zookeeper (using host instead of kafkaHost, no other changes to config (except for the value of this field, of course)) I only get a single message, as expected.

Environment

  • Node version: 6.11.2
  • Kafka-node version: 2.2.0
  • Kafka version: 0.10

Include Sample Code to reproduce behavior

const kafka = require('kafka-node');
const consumer = new kafka.ConsumerGroup({ kafkaHost: 'some host' }, 'test');
const zooConsumer = new kafka.ConsumerGroup({ host: 'some host' }, 'test');
const producer = new kafka.HighLevelProducer(new kafka.KafkaClient({ kafkaHost: 'some host' }));

consumer.on('message', msg => console.log(`${msg.value}`));
zooConsumer.on('message', msg => console.log(`from zoo ${msg.value}`));

producer.send([{ topic: 'test', messages: [{ type: 'type', values: { foo: 'bar' } }] }]);

zooConsumer is invoked twice, whilst consumer is only invoked once.

Include output with Debug turned on

kafkaHost

  kafka-node:KafkaClient Connect attempt 1 +0ms
  kafka-node:KafkaClient Trying to connect to host: somehostport: 7794 +2ms
  kafka-node:KafkaClient Sending versions request to somehost:7794 +57ms
  kafka-node:KafkaClient broker socket connected {"host":"somehost","port":"7794"} +3ms
  kafka-node:KafkaClient Received versions response from somehost:7794 +30ms
  kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":2,"usable":2},"fetch":{"min":0,"max":3,"usable":0},"offset":{"min":0,"max":1,"usable":0},"metadata":{"min":0,"max":2,"usable":0},"leader":{"min":0,"max":0,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":3,"usable":false},"controlledShutdown":{"min":1,"max":1,"usable":false},"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":2,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":1,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":false},"listGroups":{"min":0,"max":0,"usable":false},"saslHandshake":{"min":0,"max":0,"usable":false},"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":{"min":0,"max":1,"usable":false},"deleteTopics":{"min":0,"max":0,"usable":false}} +1ms
  kafka-node:KafkaClient updating metadatas +874ms
  kafka-node:ConsumerGroup Connecting kafka-node-client +1ms
  kafka-node:ConsumerGroup GroupCoordinator Response: { coordinatorHost: 'somehost',
  coordinatorPort: 7794,
  coordinatorId: 2 } +64ms
  kafka-node:KafkaClient Sending versions request to somehost:7794 +54ms
  kafka-node:ConsumerGroup joinGroupResponse {"members":[{"subscription":["test"],"version":0,"id":"kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8"}],"generationId":43,"groupProtocol":"roundrobin","leaderId":"kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8","memberId":"kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8"} from kafka-node-client +27ms
  kafka-node:ConsumerGroup Assigning Partitions to members [ { subscription: [ 'test' ],
    version: 0,
    userData: undefined,
    id: 'kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8' } ] +1ms
  kafka-node:ConsumerGroup Using group protocol roundrobin +1ms
  kafka-node:ConsumerGroup loadingMetadata for topics: [ 'test' ] +2ms
  kafka-node:KafkaClient Received versions response from somehost:7794 +21ms
  kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":2,"usable":2},"fetch":{"min":0,"max":3,"usable":0},"offset":{"min":0,"max":1,"usable":0},"metadata":{"min":0,"max":2,"usable":0},"leader":{"min":0,"max":0,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":3,"usable":false},"controlledShutdown":{"min":1,"max":1,"usable":false},"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":2,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":1,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":false},"listGroups":{"min":0,"max":0,"usable":false},"saslHandshake":{"min":0,"max":0,"usable":false},"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":{"min":0,"max":1,"usable":false},"deleteTopics":{"min":0,"max":0,"usable":false}} +0ms
  kafka-node:ConsumerGroup mapTopicToPartitions { test: [ '0', '1', '2', '3', '4' ] } +5ms
  kafka-node:Roundrobin topicPartition: {"test":["0","1","2","3","4"]} +1ms
  kafka-node:Roundrobin groupMembers: [{"subscription":["test"],"version":0,"id":"kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8"}] +0ms
  kafka-node:Roundrobin members [ 'kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8' ] +1ms
  kafka-node:Roundrobin subscribers { 'kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8': [ 'test' ] } +0ms
  kafka-node:Roundrobin round robin on topic partition pairs:  [ { topic: 'test', partition: '0' },
  { topic: 'test', partition: '1' },
  { topic: 'test', partition: '2' },
  { topic: 'test', partition: '3' },
  { topic: 'test', partition: '4' } ] +1ms
  kafka-node:ConsumerGroup SyncGroup Request from kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8 +1ms
  kafka-node:ConsumerGroup SyncGroup Response +88ms
  kafka-node:ConsumerGroup kafka-node-client owns topics:  { test: [ 0, 1, 2, 3, 4 ] } +0ms
  kafka-node:ConsumerGroup kafka-node-client fetchOffset Response: {"test":{"0":79,"1":0,"2":0,"3":0,"4":0}} +34ms
  kafka-node:ConsumerGroup Has saved offsets +0ms
  kafka-node:ConsumerGroup generationId 43 +1ms
  kafka-node:ConsumerGroup kafka-node-client started heartbeats at every 10000 ms +3ms
  kafka-node:KafkaClient Sending versions request to somehost:7794 +25ms
  kafka-node:KafkaClient Sending versions request to somehost:7794 +1ms
  kafka-node:KafkaClient Sending versions request to somehost:7794 +22ms
  kafka-node:KafkaClient Received versions response from somehost:7794 +109ms
  kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":2,"usable":2},"fetch":{"min":0,"max":3,"usable":0},"offset":{"min":0,"max":1,"usable":0},"metadata":{"min":0,"max":2,"usable":0},"leader":{"min":0,"max":0,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":3,"usable":false},"controlledShutdown":{"min":1,"max":1,"usable":false},"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":2,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":1,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":false},"listGroups":{"min":0,"max":0,"usable":false},"saslHandshake":{"min":0,"max":0,"usable":false},"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":{"min":0,"max":1,"usable":false},"deleteTopics":{"min":0,"max":0,"usable":false}} +0ms
  kafka-node:KafkaClient Received versions response from somehost:7794 +2ms
  kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":2,"usable":2},"fetch":{"min":0,"max":3,"usable":0},"offset":{"min":0,"max":1,"usable":0},"metadata":{"min":0,"max":2,"usable":0},"leader":{"min":0,"max":0,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":3,"usable":false},"controlledShutdown":{"min":1,"max":1,"usable":false},"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":2,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":1,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":false},"listGroups":{"min":0,"max":0,"usable":false},"saslHandshake":{"min":0,"max":0,"usable":false},"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":{"min":0,"max":1,"usable":false},"deleteTopics":{"min":0,"max":0,"usable":false}} +0ms
  kafka-node:KafkaClient Received versions response from somehost:7794 +19ms
  kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":2,"usable":2},"fetch":{"min":0,"max":3,"usable":0},"offset":{"min":0,"max":1,"usable":0},"metadata":{"min":0,"max":2,"usable":0},"leader":{"min":0,"max":0,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":3,"usable":false},"controlledShutdown":{"min":1,"max":1,"usable":false},"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":2,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":1,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":false},"listGroups":{"min":0,"max":0,"usable":false},"saslHandshake":{"min":0,"max":0,"usable":false},"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":{"min":0,"max":1,"usable":false},"deleteTopics":{"min":0,"max":0,"usable":false}} +0ms

host:

  kafka-node:ConsumerGroup Connecting kafka-node-client +0ms
  kafka-node:ConsumerGroup GroupCoordinator Response: { coordinatorHost: 'somehost',
  coordinatorPort: 7794,
  coordinatorId: 2 } +59ms
  kafka-node:ConsumerGroup joinGroupResponse {"members":[{"subscription":["test"],"version":0,"id":"kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507"}],"generationId":45,"groupProtocol":"roundrobin","leaderId":"kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507","memberId":"kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507"} from kafka-node-client +98ms
  kafka-node:ConsumerGroup Assigning Partitions to members [ { subscription: [ 'test' ],
    version: 0,
    userData: undefined,
    id: 'kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507' } ] +1ms
  kafka-node:ConsumerGroup Using group protocol roundrobin +1ms
  kafka-node:ConsumerGroup loadingMetadata for topics: [ 'test' ] +2ms
  kafka-node:ConsumerGroup mapTopicToPartitions { test: [ '0', '1', '2', '3', '4' ] } +33ms
  kafka-node:Roundrobin topicPartition: {"test":["0","1","2","3","4"]} +0ms
  kafka-node:Roundrobin groupMembers: [{"subscription":["test"],"version":0,"id":"kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507"}] +0ms
  kafka-node:Roundrobin members [ 'kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507' ] +1ms
  kafka-node:Roundrobin subscribers { 'kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507': [ 'test' ] } +0ms
  kafka-node:Roundrobin round robin on topic partition pairs:  [ { topic: 'test', partition: '0' },
  { topic: 'test', partition: '1' },
  { topic: 'test', partition: '2' },
  { topic: 'test', partition: '3' },
  { topic: 'test', partition: '4' } ] +0ms
  kafka-node:ConsumerGroup SyncGroup Request from kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507 +1ms
  kafka-node:ConsumerGroup SyncGroup Response +93ms
  kafka-node:ConsumerGroup kafka-node-client owns topics:  { test: [ 0, 1, 2, 3, 4 ] } +0ms
  kafka-node:ConsumerGroup kafka-node-client fetchOffset Response: {"test":{"0":81,"1":0,"2":0,"3":0,"4":0}} +29ms
  kafka-node:ConsumerGroup Has saved offsets +0ms
  kafka-node:ConsumerGroup generationId 45 +0ms
  kafka-node:ConsumerGroup kafka-node-client started heartbeats at every 10000 ms +2ms

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 17 (10 by maintainers)

Commits related to this issue

Most upvoted comments

Great that you could reproduce it.

Thanks for the workaround, I’ll test it Monday 🙂