kafka-node: Duplicate messages using ConsumerGroup with kafkaHost
Bug Report
When trying to connect directly to the kafka brokers, all messages are duplicated. If I connect to zookeeper (using host instead of kafkaHost, no other changes to config (except for the value of this field, of course)) I only get a single message, as expected.
Environment
- Node version: 6.11.2
- Kafka-node version: 2.2.0
- Kafka version: 0.10
Include Sample Code to reproduce behavior
const kafka = require('kafka-node');
const consumer = new kafka.ConsumerGroup({ kafkaHost: 'some host' }, 'test');
const zooConsumer = new kafka.ConsumerGroup({ host: 'some host' }, 'test');
const producer = new kafka.HighLevelProducer(new kafka.KafkaClient({ kafkaHost: 'some host' }));
consumer.on('message', msg => console.log(`${msg.value}`));
zooConsumer.on('message', msg => console.log(`from zoo ${msg.value}`));
producer.send([{ topic: 'test', messages: [{ type: 'type', values: { foo: 'bar' } }] }]);
zooConsumer is invoked twice, whilst consumer is only invoked once.
Include output with Debug turned on
kafkaHost
kafka-node:KafkaClient Connect attempt 1 +0ms
kafka-node:KafkaClient Trying to connect to host: somehostport: 7794 +2ms
kafka-node:KafkaClient Sending versions request to somehost:7794 +57ms
kafka-node:KafkaClient broker socket connected {"host":"somehost","port":"7794"} +3ms
kafka-node:KafkaClient Received versions response from somehost:7794 +30ms
kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":2,"usable":2},"fetch":{"min":0,"max":3,"usable":0},"offset":{"min":0,"max":1,"usable":0},"metadata":{"min":0,"max":2,"usable":0},"leader":{"min":0,"max":0,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":3,"usable":false},"controlledShutdown":{"min":1,"max":1,"usable":false},"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":2,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":1,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":false},"listGroups":{"min":0,"max":0,"usable":false},"saslHandshake":{"min":0,"max":0,"usable":false},"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":{"min":0,"max":1,"usable":false},"deleteTopics":{"min":0,"max":0,"usable":false}} +1ms
kafka-node:KafkaClient updating metadatas +874ms
kafka-node:ConsumerGroup Connecting kafka-node-client +1ms
kafka-node:ConsumerGroup GroupCoordinator Response: { coordinatorHost: 'somehost',
coordinatorPort: 7794,
coordinatorId: 2 } +64ms
kafka-node:KafkaClient Sending versions request to somehost:7794 +54ms
kafka-node:ConsumerGroup joinGroupResponse {"members":[{"subscription":["test"],"version":0,"id":"kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8"}],"generationId":43,"groupProtocol":"roundrobin","leaderId":"kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8","memberId":"kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8"} from kafka-node-client +27ms
kafka-node:ConsumerGroup Assigning Partitions to members [ { subscription: [ 'test' ],
version: 0,
userData: undefined,
id: 'kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8' } ] +1ms
kafka-node:ConsumerGroup Using group protocol roundrobin +1ms
kafka-node:ConsumerGroup loadingMetadata for topics: [ 'test' ] +2ms
kafka-node:KafkaClient Received versions response from somehost:7794 +21ms
kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":2,"usable":2},"fetch":{"min":0,"max":3,"usable":0},"offset":{"min":0,"max":1,"usable":0},"metadata":{"min":0,"max":2,"usable":0},"leader":{"min":0,"max":0,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":3,"usable":false},"controlledShutdown":{"min":1,"max":1,"usable":false},"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":2,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":1,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":false},"listGroups":{"min":0,"max":0,"usable":false},"saslHandshake":{"min":0,"max":0,"usable":false},"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":{"min":0,"max":1,"usable":false},"deleteTopics":{"min":0,"max":0,"usable":false}} +0ms
kafka-node:ConsumerGroup mapTopicToPartitions { test: [ '0', '1', '2', '3', '4' ] } +5ms
kafka-node:Roundrobin topicPartition: {"test":["0","1","2","3","4"]} +1ms
kafka-node:Roundrobin groupMembers: [{"subscription":["test"],"version":0,"id":"kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8"}] +0ms
kafka-node:Roundrobin members [ 'kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8' ] +1ms
kafka-node:Roundrobin subscribers { 'kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8': [ 'test' ] } +0ms
kafka-node:Roundrobin round robin on topic partition pairs: [ { topic: 'test', partition: '0' },
{ topic: 'test', partition: '1' },
{ topic: 'test', partition: '2' },
{ topic: 'test', partition: '3' },
{ topic: 'test', partition: '4' } ] +1ms
kafka-node:ConsumerGroup SyncGroup Request from kafka-node-client-b68dd224-99b3-40ca-b41e-5c11e25d96f8 +1ms
kafka-node:ConsumerGroup SyncGroup Response +88ms
kafka-node:ConsumerGroup kafka-node-client owns topics: { test: [ 0, 1, 2, 3, 4 ] } +0ms
kafka-node:ConsumerGroup kafka-node-client fetchOffset Response: {"test":{"0":79,"1":0,"2":0,"3":0,"4":0}} +34ms
kafka-node:ConsumerGroup Has saved offsets +0ms
kafka-node:ConsumerGroup generationId 43 +1ms
kafka-node:ConsumerGroup kafka-node-client started heartbeats at every 10000 ms +3ms
kafka-node:KafkaClient Sending versions request to somehost:7794 +25ms
kafka-node:KafkaClient Sending versions request to somehost:7794 +1ms
kafka-node:KafkaClient Sending versions request to somehost:7794 +22ms
kafka-node:KafkaClient Received versions response from somehost:7794 +109ms
kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":2,"usable":2},"fetch":{"min":0,"max":3,"usable":0},"offset":{"min":0,"max":1,"usable":0},"metadata":{"min":0,"max":2,"usable":0},"leader":{"min":0,"max":0,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":3,"usable":false},"controlledShutdown":{"min":1,"max":1,"usable":false},"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":2,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":1,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":false},"listGroups":{"min":0,"max":0,"usable":false},"saslHandshake":{"min":0,"max":0,"usable":false},"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":{"min":0,"max":1,"usable":false},"deleteTopics":{"min":0,"max":0,"usable":false}} +0ms
kafka-node:KafkaClient Received versions response from somehost:7794 +2ms
kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":2,"usable":2},"fetch":{"min":0,"max":3,"usable":0},"offset":{"min":0,"max":1,"usable":0},"metadata":{"min":0,"max":2,"usable":0},"leader":{"min":0,"max":0,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":3,"usable":false},"controlledShutdown":{"min":1,"max":1,"usable":false},"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":2,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":1,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":false},"listGroups":{"min":0,"max":0,"usable":false},"saslHandshake":{"min":0,"max":0,"usable":false},"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":{"min":0,"max":1,"usable":false},"deleteTopics":{"min":0,"max":0,"usable":false}} +0ms
kafka-node:KafkaClient Received versions response from somehost:7794 +19ms
kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":2,"usable":2},"fetch":{"min":0,"max":3,"usable":0},"offset":{"min":0,"max":1,"usable":0},"metadata":{"min":0,"max":2,"usable":0},"leader":{"min":0,"max":0,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":3,"usable":false},"controlledShutdown":{"min":1,"max":1,"usable":false},"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":2,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":1,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":false},"listGroups":{"min":0,"max":0,"usable":false},"saslHandshake":{"min":0,"max":0,"usable":false},"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":{"min":0,"max":1,"usable":false},"deleteTopics":{"min":0,"max":0,"usable":false}} +0ms
host:
kafka-node:ConsumerGroup Connecting kafka-node-client +0ms
kafka-node:ConsumerGroup GroupCoordinator Response: { coordinatorHost: 'somehost',
coordinatorPort: 7794,
coordinatorId: 2 } +59ms
kafka-node:ConsumerGroup joinGroupResponse {"members":[{"subscription":["test"],"version":0,"id":"kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507"}],"generationId":45,"groupProtocol":"roundrobin","leaderId":"kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507","memberId":"kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507"} from kafka-node-client +98ms
kafka-node:ConsumerGroup Assigning Partitions to members [ { subscription: [ 'test' ],
version: 0,
userData: undefined,
id: 'kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507' } ] +1ms
kafka-node:ConsumerGroup Using group protocol roundrobin +1ms
kafka-node:ConsumerGroup loadingMetadata for topics: [ 'test' ] +2ms
kafka-node:ConsumerGroup mapTopicToPartitions { test: [ '0', '1', '2', '3', '4' ] } +33ms
kafka-node:Roundrobin topicPartition: {"test":["0","1","2","3","4"]} +0ms
kafka-node:Roundrobin groupMembers: [{"subscription":["test"],"version":0,"id":"kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507"}] +0ms
kafka-node:Roundrobin members [ 'kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507' ] +1ms
kafka-node:Roundrobin subscribers { 'kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507': [ 'test' ] } +0ms
kafka-node:Roundrobin round robin on topic partition pairs: [ { topic: 'test', partition: '0' },
{ topic: 'test', partition: '1' },
{ topic: 'test', partition: '2' },
{ topic: 'test', partition: '3' },
{ topic: 'test', partition: '4' } ] +0ms
kafka-node:ConsumerGroup SyncGroup Request from kafka-node-client-9ac3480b-f72c-4412-970d-ce52b1119507 +1ms
kafka-node:ConsumerGroup SyncGroup Response +93ms
kafka-node:ConsumerGroup kafka-node-client owns topics: { test: [ 0, 1, 2, 3, 4 ] } +0ms
kafka-node:ConsumerGroup kafka-node-client fetchOffset Response: {"test":{"0":81,"1":0,"2":0,"3":0,"4":0}} +29ms
kafka-node:ConsumerGroup Has saved offsets +0ms
kafka-node:ConsumerGroup generationId 45 +0ms
kafka-node:ConsumerGroup kafka-node-client started heartbeats at every 10000 ms +2ms
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 17 (10 by maintainers)
Commits related to this issue
- avoid doing versions request on longpolling sockets resolves #743 — committed to SOHU-Co/kafka-node by hyperlink 7 years ago
Great that you could reproduce it.
Thanks for the workaround, I’ll test it Monday 🙂