xk6-kafka: Error writing messages to [topic_name] kafka.(*Client).Produce: context deadline exceeded

config.json

{
  "scenarios": {
    "constant_request_rate": {
      "executor": "constant-arrival-rate",
      "rate": 100,
      "timeUnit": "1s",
      "duration": "30s",
      "preAllocatedVUs": 100,
      "maxVUs": 200
    }
  }
}

orderPaidScenario.js

import OrderManager from '../common/orderManager/OrderManager.js'
import OrderPaidNotify from "../common/kafka/exchanges/OrderPaidNotify.js"


const testConfig = JSON.parse(open('../config/test.json'))
const orderPaidNotify = new OrderPaidNotify()


export const options = Object.assign({

}, testConfig);


export default function () {
    const order = new OrderManager().create()
    orderPaidNotify.orderPaid(order.data.order_id, order.data.items)

    console.log('OrderID: ', order.data.order_id)

}

Producer:

import { Writer, SchemaRegistry, SCHEMA_TYPE_STRING} from "k6/x/kafka";


export default class Producer {
    constructor(topic) {
        this.client = this.resolveProducerClient(topic)
        this.schemaRegistry = new SchemaRegistry()
    }

    resolveProducerClient(topic) {
        return new Writer({
            brokers: [
                'localhost:9092'
            ],
            topic: topic,
            autoCreateTopic: true,
            connectLogger: true,
            maxAttempts: 10,
            readTimeout: 1000
        })
    }

    send(message) {
        this.client.produce(this.buildMessage(message))
    }

    buildMessage(message) {
        return {
            messages:[{
                key: this.schemaRegistry.serialize({
                    data: 'k6-key-id-' + this.generateRandom(),
                    schemaType: SCHEMA_TYPE_STRING,
                }),
                value: this.schemaRegistry.serialize({
                    data: JSON.stringify(message),
                    schemaType: SCHEMA_TYPE_STRING
                })
            }]
        }
    }

    generateRandom(min = 0, max = 90000) {

        // find diff
        let difference = max - min

        // generate random number
        let rand = Math.random()

        // multiply with difference
        rand = Math.floor(rand * difference)

        // add with min value
        rand = rand + min

        return rand
    }
}

in terminal: ./k6 run scenarios/orderPaidScenario.js Output: First time its okey producing, after 5 seconds i see more error messages:

ERRO[0011] Error writing messages., OriginalError: %!w(context.deadlineExceededError={})  error="Error writing messages., OriginalError: %!w(context.deadlineExceededError={})"
ERRO[0011] GoError: Error writing messages., OriginalError: %!w(context.deadlineExceededError={})
constantat github.com/mostafa/xk6-kafka.(*Kafka).writerClass.func1 (native)
        at send (file:///Users/WebstormProjects/k6-test/common/kafka/producer.js:24:46(7))
        at produce (file:///Users/WebstormProjects/k6-test/common/kafka/exchanges/baseExchange.js:9:27(5))
        at orderPaid (file:///Users/WebstormProjects/k6-test/common/kafka/exchanges/OrderPaidNotify.js:14:21(12))
        at file:///Users/WebstormProjects/k6-test/scenarios/orderPaidScenario.js:41:22(15)

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 18 (8 by maintainers)

Most upvoted comments

Apparently, the culprit was the metadata polling machinery in kafka-go library. Each few seconds it will fetch metadata from the kafka cluster with all topics, replicas, partitions, etc. For our cluster that would be about 1MB each response. Since each VU runs that loop independently, with 100+ VUs this will quickly saturate 100Mb/s connection bandwidth, leading to all those weird timeouts in seemingly random places (sometimes even timeouts during DNS queries for kafka hosts).

As a stopgap measure I made a patch for kafka-go library, which makes it to query for metadata only once, cache the response in a global variable, and then use that response instead of polling from that point on (assuming that cluster topology does not change during test).

A better solution probably would be somehow allowing to query metadata only for topics that’s needed for testing (not sure if kafka-go allows that, maybe I missed something).

New trable with kafka work Amount tcp connection over hight image

Experimentally, I came to conclusion that optimal value is 50 vue. However, we need much more.

I’m not use consumer(reader) in mine test, only producer (writer)

Hi @mostafa with this option readTimeout: 1000 i see in result response in terminal value is 1 second image