kafkajs: KafkaJS claims a broker does not host a topic-partition, even though it does

Describe the bug I receive lots of error messages like the following:

{
    "level": "ERROR",
    "timestamp": "2020-07-15T16:48:34.740Z",
    "logger": "kafkajs",
    "message": "[Connection] Response Metadata(key: 3, version: 5)",
    "broker": "aws_msk_host_1:9092",
    "clientId": "ti-qa",
    "error": "This server does not host this topic-partition",
    "correlationId": 16,
    "size": 2120
}

However, a check of the topic metadata (using this topic as an example), speaks to the contrary:

  "learningpaths" with 6 partition(s)
partition 0 leader: 2, replicas: [2, 3, 1], isrs: [3, 1, 2] errstr: 
partition 1 leader: 1, replicas: [1, 2, 3], isrs: [3, 1, 2] errstr: 
partition 2 leader: 3, replicas: [3, 1, 2], isrs: [3, 1, 2] errstr: 
partition 3 leader: 2, replicas: [2, 1, 3], isrs: [3, 1, 2] errstr: 
partition 4 leader: 1, replicas: [1, 3, 2], isrs: [3, 1, 2] errstr: 
partition 5 leader: 3, replicas: [3, 2, 1], isrs: [3, 1, 2] errstr: 

This happens across multiple topics.

Code

const awslog = require('lib/awslog');
const config = require('config');

const { Kafka } = require('kafkajs');

const BROKERS =
  config.kafkaBrokers && config.kafkaBrokers.trim() !== '' ? config.kafkaBrokers.split(',') : null;

const USE_KAFKA = config.env !== 'test' && BROKERS !== null;

const kafka = USE_KAFKA
  ? new Kafka({
      clientId: `ti-${config.env}`,
      brokers: BROKERS,
      retry: {
        initialRetryTime: 1000,
        retries: 9
      }
    })
  : null;

const producer = USE_KAFKA
  ? kafka.producer({
      metadataMaxAge: 60000
    })
  : null;

function push(name, records) {
  if (USE_KAFKA && records && records.length) {
    Promise.all(
      records.map(record =>
        // `Promise.resolve` here prevents invalid messages from throwing,
        // just in case others in the same batch are valid.
        Promise.resolve(keysToLowerCase(record)).then(
          value => ({ value, key: record.id || record.requestId }),
          err => {
            config.bugsnag.notify(new Error('Failed to prepare record for Kafka'), {
              message: err.message,
              paths: err.paths,
              record,
              topic: name.toLowerCase(),
              brokers: BROKERS
            });

            return null;
          }
        )
      )
    )
      .then(encodedMessages => {
        const validMessages = encodedMessages.filter(message => message);
        if (validMessages.length) {
          return producer.send({
            topic: name.toLowerCase(),
            messages: validMessages,
            acks: 1
          });
        }
      })
      .catch(e => {
        awslog.error(null, new Error('Failed to send record to Kafka'), {
          message: e.message,
          topic: name.toLowerCase(),
          messages: records,
          brokers: BROKERS,
          paths: e.paths
        });
      });
  }
}

function flush() {
  if (USE_KAFKA) {
    return producer.disconnect();
  } else {
    return Promise.resolve(true);
  }
}

module.exports = {
  push,
  flush
};

function keysToLowerCase(obj) {
  const newObj = {};
  const keys = Object.keys(obj);
  for (const key of keys) {
    newObj[key.toLowerCase()] = obj[key];
  }

  return JSON.stringify(newObj);
}

Expected behavior Messages get sent and acknowledged by at least the topic-partition leader, unless an error occurs.

Observed behavior KafkaJS producer throws above error claiming This server does not host this topic-partition, when it obviously does. It’s possible there’s another issue but the logic throws this error instead.

Environment:

  • OS: Ubuntu 14.04.6 LTS
  • KafkaJS version 1.12.0
  • Kafka version 2.2.1 (Amazon MSK)
  • NodeJS version 10.20.1

Additional context Any pointers on what might be wrong with my code, or wrong with the library would be helpful.

About this issue

  • Original URL
  • State: open
  • Created 4 years ago
  • Reactions: 35
  • Comments: 42 (4 by maintainers)

Most upvoted comments

I had the same issue. There is a bug in refreshMetadata. It is not able to handle external (outside of kafkajs) removal of the topic properly.

Problem is related to refreshMetadata code which is trying to refresh metadata for each topic that is in this.targetTopics Set (from admin/cluster/index.js)

Easy way to reproduce this bug:

  1. create topic1 and topic2
  2. get offsets for topic1 and topic2 with kafkajs
  3. manually delete topic1 without use of kafkajs
  4. try to get offset for topic2 with kafkajs. It will fail with “This server does not host this topic-partition” exception even if topic2 exists and we only deleted topic1.

So if one of the topics from this.targetTopics was externally removed from kafka (without use of kafkajs) - you will not be able to perform metadata refresh for any other topic and it will not be able to recover from this error.

Interesting.

We ended up switching over to node-rdkafka and stopped experiencing the issue. There must be some issue in the way kafkajs communicates with the brokers that’s causing it to throw these errors unnecessarily.

We have also faced similar issue: Steps to reproduce:

  1. Try to publish message to a topic that is NOT EXISTING
  2. Try to publish message to a topic that is EXISTING Both the publishes will fail below error: "This server does not host this topic-partition"" "originalError\":{\"name\":\"KafkaJSProtocolError\",\"retriable\":true,\"type\":\"UNKNOWN_TOPIC_OR_PARTITION\",\"code\":3}

Library should not fail to publish data to an EXISTING topic even if there is an attempt to publish data to a NON EXISTING topic in some other part of the code.

@mithiridi No solution but just a ugly workaround. Currently I have to retry in our code while This server does not host this topic-partition error happen. That means I need to add lots try catch code to handle that error. I expect Kafka is Linearizability, but it seems not. It is ridiculous that I use createTopic() to create topic A, then I use listTopic to check whether A is created, event listTopic() said that A is existed , it can still be error when I use subscribe() to get message from A later.

I sort of solved this problem for my case: My scenario is this: I use nestjs with kafka to consume messages from topics that does not exists and I see that the service fails to run because of KafkaJSProtocolError: This server does not host this topic-partition but the service is able to create the topics anyway. This happens hen I run kafka using the confluent platform images, using the version with kraft without zookeeper:

  broker1:
    image: confluentinc/cp-kafka:7.4.0
    hostname: broker1
    container_name: broker1
    depends_on:
      - controller
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker1:29092,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      # KAFKA_METRIC_REPORTERS: 'io.confluent.metrics.reporter.ConfluentMetricsReporter'
      # KAFKA_CONFLUENT_METRIC_REPORTER_BOOTSTRAP_SERVER: 'broker1:9092,controller:9092'
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

  controller:
    image: confluentinc/cp-kafka:7.4.0
    hostname: controller
    container_name: controller
    ports:
      - "9093:9093"
      - "9102:9102"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9102
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
      KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
      # KAFKA_METRIC_REPORTERS: 'io.confluent.metrics.reporter.ConfluentMetricsReporter'
      # KAFKA_CONFLUENT_METRIC_REPORTER_BOOTSTRAP_SERVER: 'broker1:9092,controller:9092'
      # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
      # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

My nest config:

{
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
        consumer: {
          groupId: 'nest-consumer-group',
        },
}

Now the solution was to use kafka version with zookeeper:

 zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:7.4.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

This is not a complete solution, but it is some progress at least

@tekpriest Hi I have the same issue running Kafka on NestJs. Happens only in the stage/prod environment. When I use local Kafka this issue does not exist.

Yeah running locally doesn’t cause errors, but I have been having issues running kafka on docker, anyone got a reference?

@tekpriest Hi I have the same issue running Kafka on NestJs. Happens only in the stage/prod environment. When I use local Kafka this issue does not exist.

Update from my side: my issue was that we had the same broker for prod and dev, and the app instances were using the same group id (e.g. app dev instance connecting to dev_ topics, app prod instance connecting to prod_ topics). It all started to working fine when we added the env to group id. Hope it helps someone.

"error":"This server does not host this topic-partition"

I am experiencing this issue with NestJS with the subscribeToResponseOf() function even though I know the topic exists… I have search everywhere for a resolution, and tried many code changes.

Any guidance from anyone on how to fix this issue? Is it simply a Kafka BUG??