kafkajs: KafkaJS claims a broker does not host a topic-partition, even though it does
Describe the bug I receive lots of error messages like the following:
{
"level": "ERROR",
"timestamp": "2020-07-15T16:48:34.740Z",
"logger": "kafkajs",
"message": "[Connection] Response Metadata(key: 3, version: 5)",
"broker": "aws_msk_host_1:9092",
"clientId": "ti-qa",
"error": "This server does not host this topic-partition",
"correlationId": 16,
"size": 2120
}
However, a check of the topic metadata (using this topic as an example), speaks to the contrary:
"learningpaths" with 6 partition(s)
partition 0 leader: 2, replicas: [2, 3, 1], isrs: [3, 1, 2] errstr:
partition 1 leader: 1, replicas: [1, 2, 3], isrs: [3, 1, 2] errstr:
partition 2 leader: 3, replicas: [3, 1, 2], isrs: [3, 1, 2] errstr:
partition 3 leader: 2, replicas: [2, 1, 3], isrs: [3, 1, 2] errstr:
partition 4 leader: 1, replicas: [1, 3, 2], isrs: [3, 1, 2] errstr:
partition 5 leader: 3, replicas: [3, 2, 1], isrs: [3, 1, 2] errstr:
This happens across multiple topics.
Code
const awslog = require('lib/awslog');
const config = require('config');
const { Kafka } = require('kafkajs');
const BROKERS =
config.kafkaBrokers && config.kafkaBrokers.trim() !== '' ? config.kafkaBrokers.split(',') : null;
const USE_KAFKA = config.env !== 'test' && BROKERS !== null;
const kafka = USE_KAFKA
? new Kafka({
clientId: `ti-${config.env}`,
brokers: BROKERS,
retry: {
initialRetryTime: 1000,
retries: 9
}
})
: null;
const producer = USE_KAFKA
? kafka.producer({
metadataMaxAge: 60000
})
: null;
function push(name, records) {
if (USE_KAFKA && records && records.length) {
Promise.all(
records.map(record =>
// `Promise.resolve` here prevents invalid messages from throwing,
// just in case others in the same batch are valid.
Promise.resolve(keysToLowerCase(record)).then(
value => ({ value, key: record.id || record.requestId }),
err => {
config.bugsnag.notify(new Error('Failed to prepare record for Kafka'), {
message: err.message,
paths: err.paths,
record,
topic: name.toLowerCase(),
brokers: BROKERS
});
return null;
}
)
)
)
.then(encodedMessages => {
const validMessages = encodedMessages.filter(message => message);
if (validMessages.length) {
return producer.send({
topic: name.toLowerCase(),
messages: validMessages,
acks: 1
});
}
})
.catch(e => {
awslog.error(null, new Error('Failed to send record to Kafka'), {
message: e.message,
topic: name.toLowerCase(),
messages: records,
brokers: BROKERS,
paths: e.paths
});
});
}
}
function flush() {
if (USE_KAFKA) {
return producer.disconnect();
} else {
return Promise.resolve(true);
}
}
module.exports = {
push,
flush
};
function keysToLowerCase(obj) {
const newObj = {};
const keys = Object.keys(obj);
for (const key of keys) {
newObj[key.toLowerCase()] = obj[key];
}
return JSON.stringify(newObj);
}
Expected behavior Messages get sent and acknowledged by at least the topic-partition leader, unless an error occurs.
Observed behavior
KafkaJS producer throws above error claiming This server does not host this topic-partition, when it obviously does. It’s possible there’s another issue but the logic throws this error instead.
Environment:
- OS: Ubuntu 14.04.6 LTS
- KafkaJS version 1.12.0
- Kafka version 2.2.1 (Amazon MSK)
- NodeJS version 10.20.1
Additional context Any pointers on what might be wrong with my code, or wrong with the library would be helpful.
About this issue
- Original URL
- State: open
- Created 4 years ago
- Reactions: 35
- Comments: 42 (4 by maintainers)
I had the same issue. There is a bug in refreshMetadata. It is not able to handle external (outside of kafkajs) removal of the topic properly.
Problem is related to refreshMetadata code which is trying to refresh metadata for each topic that is in this.targetTopics Set (from admin/cluster/index.js)
Easy way to reproduce this bug:
So if one of the topics from this.targetTopics was externally removed from kafka (without use of kafkajs) - you will not be able to perform metadata refresh for any other topic and it will not be able to recover from this error.
Interesting.
We ended up switching over to node-rdkafka and stopped experiencing the issue. There must be some issue in the way
kafkajscommunicates with the brokers that’s causing it to throw these errors unnecessarily.We have also faced similar issue: Steps to reproduce:
"This server does not host this topic-partition"" "originalError\":{\"name\":\"KafkaJSProtocolError\",\"retriable\":true,\"type\":\"UNKNOWN_TOPIC_OR_PARTITION\",\"code\":3}Library should not fail to publish data to an EXISTING topic even if there is an attempt to publish data to a NON EXISTING topic in some other part of the code.
@mithiridi No solution but just a ugly workaround. Currently I have to retry in our code while
This server does not host this topic-partitionerror happen. That means I need to add lotstry catchcode to handle that error. I expect Kafka is Linearizability, but it seems not. It is ridiculous that I usecreateTopic()to create topic A, then I uselistTopicto check whether A is created, eventlistTopic()said that A is existed , it can still be error when I usesubscribe()to get message from A later.I sort of solved this problem for my case: My scenario is this: I use nestjs with kafka to consume messages from topics that does not exists and I see that the service fails to run because of
KafkaJSProtocolError: This server does not host this topic-partitionbut the service is able to create the topics anyway. This happens hen I run kafka using the confluent platform images, using the version with kraft without zookeeper:My nest config:
Now the solution was to use kafka version with zookeeper:
This is not a complete solution, but it is some progress at least
Yeah running locally doesn’t cause errors, but I have been having issues running kafka on docker, anyone got a reference?
@tekpriest Hi I have the same issue running Kafka on NestJs. Happens only in the stage/prod environment. When I use local Kafka this issue does not exist.
Update from my side: my issue was that we had the same broker for prod and dev, and the app instances were using the same group id (e.g. app dev instance connecting to
dev_topics, app prod instance connecting toprod_topics). It all started to working fine when we added the env to group id. Hope it helps someone."error":"This server does not host this topic-partition"I am experiencing this issue with NestJS with the subscribeToResponseOf() function even though I know the topic exists… I have search everywhere for a resolution, and tried many code changes.
Any guidance from anyone on how to fix this issue? Is it simply a Kafka BUG??