xk6-kafka: Unable to send messages to the kafka topics

Hi, I am trying the send messages to already available kafka topics. I am able to successfully make a connection to the kafka broker and read the existing topics, but when I try to produce and consume messages I am not getting any error on k6 and also the messages are not reaching the topic. Could you please help me on this ?

Script snippet :

import { check } from "k6";
//import {fs} from 'fs';
import {
  Writer,
  Reader,
  Connection,
  SchemaRegistry,
  SCHEMA_TYPE_JSON,
  SCHEMA_TYPE_STRING,
  SASL_SCRAM_SHA512,
} from "k6/x/kafka"; // import kafka extension

export const options = {
  
  vus: 1,
  iteration : 1,
 
};

const brokers = ["bootstrap-server-name:portnumber"];
const topic = "topic-name";

const saslConfig = {
  
  username: "<username>",
  password: "<pwd>",
  algorithm: SASL_SCRAM_SHA512,
  clientCertPem: "pathto/client.pem",
  serverCaPem: "pathto/serverCa.pem",
  
};
const tlsConfig = {
 
  enableTls: true,
  insecureSkipTlsVerify: true,
  clientCertPem: "pathto/client.pem",
  serverCaPem: "pathto/serverCa.pem",
  
};

const offset = 0;
// partition and groupId are mutually exclusive
const partition = 0;
const numPartitions = 1;
const replicationFactor = 1;

const writer = new Writer({
  brokers: brokers,
  topic: topic,
  sasl: saslConfig,
  tls: tlsConfig,
});
const reader = new Reader({
  brokers: brokers,
  topic: topic,
  //partition: partition,
  //maxWait: 300,
  offset: offset,
  sasl: saslConfig,
  tls: tlsConfig,
});
const connection = new Connection({
  address: brokers[0],
  sasl: saslConfig,
  tls: tlsConfig,
});
const schemaRegistry = new SchemaRegistry();



if (__VU == 0) {
  console.log(
    "Existing topics: ",
    connection.listTopics(saslConfig, tlsConfig)
  );
}

export default function () {
  for (let index = 0; index < 2; index++) {
    let messages = [
      {
        key: schemaRegistry.serialize({
          data: 
             "4096525_541" ,
          schemaType: SCHEMA_TYPE_STRING,
        }),
        value: schemaRegistry.serialize({
          data: {
            "eventId" : "22358d8b-9826-4bb0-b209-cd13a0ae1ff", //random UUI
            "itemNumber" : 476183, 
            "storeNumber": 4105, 
            "retailPricePriorAmount": 10.99,
            "retailPriceAmount": 10.99,
            "labelPrice": 8.77,
            "sellingPriceAmount" : 8.77,
            "winningLabelAdv" : "WAS_PP",
            "priceTypeCode": 2,
            "priceDisplayCode": 3,
            "advEvents": [{
                "eventType": "WAS_PP",
                "eventDescription" : "WAS NCF ITEMS ALICIA",
                "eventStartDate" : "2022-12-21T00:00:00Z",
                "eventEndDate" : "2023-07-01T23:59:59.999Z",
                "dollarOffPrice": 8.77,
                "percentOff" : 0,
                "sourceId" : "CORP"
            }],
            "nlpIndicator": "N",
            "mgrPriceIndicator": "N",
            "updateId": "PEWINCMP",
            "createdTS": "2022-11-09T06:07:16.733+0000",
            "updatedTS": "2022-11-09T06:07:16.733+0000"
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
      },
    ];
//console.log(new Date())
    writer.produce({ messages: messages });
  }

  let messages = reader.consume({ limit: 2 });
  check(messages, {
    "2 messages returned": (msgs) => msgs.length == 2,
  });
 
}

export function teardown(data) {
  writer.close();
  reader.close();
  connection.close();
}

K6 run logs :

./k6 run -v '/Users/cnagara/Documents/kafkaTesting/spexKafkaTesting.js'                 
DEBU[0000] Logger format: TEXT                          
DEBU[0000] k6 version: v0.41.0 ((devel), go1.18.3, darwin/amd64) 

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

DEBU[0000] Resolving and reading test '/Users//Documents/kafkaTesting/spexKafkaTesting.js'... 
DEBU[0000] Loading...                                    moduleSpecifier="file:///Users//Documents/kafkaTesting/spexKafkaTesting.js" originalModuleSpecifier=/Users//Documents/kafkaTesting/spexKafkaTesting.js
DEBU[0000] '/Users//Documents/kafkaTesting/spexKafkaTesting.js' resolved to 'file:///Users/cnagara/Documents/kafkaTesting/spexKafkaTesting.js' and successfully loaded 5165 bytes! 
DEBU[0000] Gathering k6 runtime options...              
DEBU[0000] Initializing k6 runner for '/Users//Documents/kafkaTesting/spexKafkaTesting.js' (file:///Users//Documents/kafkaTesting/spexKafkaTesting.js)... 
DEBU[0000] Detecting test type for...                    test_path="file:///Users//Documents/kafkaTesting/spexKafkaTesting.js"
DEBU[0000] Trying to load as a JS test...                test_path="file:///Users//Documents/kafkaTesting/spexKafkaTesting.js"
DEBU[0000] Babel: Transformed                            t=127.309875ms
INFO[0005] Existing topics:  ["sync","pendingprice"]  source=console
WARN[0005] There were unknown fields in the options exported in the script  error="json: unknown field \"iteration\""
DEBU[0005] Runner successfully initialized!             
DEBU[0005] Parsing CLI flags...                         
DEBU[0005] Consolidating config layers...               
DEBU[0005] Parsing thresholds and validating config...  
DEBU[0005] Initializing the execution scheduler...      
DEBU[0005] Starting 1 outputs...                         component=output-manager
DEBU[0005] Starting...                                   component=metrics-engine-ingester
DEBU[0005] Started!                                      component=metrics-engine-ingester
  execution: local
     script: /Users/Documents/kafkaTesting/spexKafkaTesting.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)

DEBU[0005] Starting the REST API server on localhost:6565 
DEBU[0005] Initialization starting...                    component=engine
DEBU[0005] Starting emission of VUs and VUsMax metrics... 
DEBU[0005] Start of initialization                       executorsCount=1 neededVUs=1 phase=local-execution-scheduler-init
DEBU[0010] Initialized VU #1                             phase=local-execution-scheduler-init
DEBU[0010] Finished initializing needed VUs, start initializing executors...  phase=local-execution-scheduler-init
DEBU[0010] Initialized executor default                  phase=local-execution-scheduler-init
DEBU[0010] Initialization completed                      phase=local-execution-scheduler-init
DEBU[0010] Execution scheduler starting...               component=engine
DEBU[0010] Start of test run                             executorsCount=1 phase=local-execution-scheduler-run
DEBU[0010] Running setup()                               phase=local-execution-scheduler-run
DEBU[0010] Metrics processing started...                 component=engine
INFO[0015] Existing topics:  ["pendingprice","promocentral"]  source=console
DEBU[0015] Start all executors...                        phase=local-execution-scheduler-run
DEBU[0015] Starting executor                             executor=default startTime=0s type=per-vu-iterations
DEBU[0015] Starting executor run...                      executor=per-vu-iterations iterations=1 maxDuration=10m0s scenario=default type=per-vu-iterations vus=1
INFO[0015] "2022-12-26T08:15:14.116Z"                    source=console
INFO[0020] "2022-12-26T08:15:18.657Z"                    source=console
^CDEBU[0190] Stopping k6 in response to signal...          sig=interrupt
DEBU[0190] Engine: Thresholds terminated                 component=engine
ERRO[0190] Unable to read messages.                      error="Unable to read messages."
DEBU[0190] run: context expired; exiting...              component=engine
DEBU[0190] Metrics emission of VUs and VUsMax metrics stopped 
DEBU[0190] Executor finished successfully                executor=default startTime=0s type=per-vu-iterations
DEBU[0190] Running teardown()                            phase=local-execution-scheduler-run
INFO[0194] Existing topics:  ["promocentral","pendingprice"]  source=console
DEBU[0194] Execution scheduler terminated                component=engine error="<nil>"
DEBU[0194] Processing metrics and thresholds after the test run has ended...  component=engine
DEBU[0194] Stopping...                                   component=metrics-engine-ingester
DEBU[0194] Stopped!                                      component=metrics-engine-ingester
DEBU[0194] Engine run terminated cleanly                

running (03m04.2s), 0/1 VUs, 0 complete and 1 interrupted iterations
default ✗ [--------------------------------------] 1 VUs  02m54.5s/10m0s  0/1 iters, 1 per VU
WARN[0194] No script iterations finished, consider making the test duration longer 
INFO[0199] Existing topics:  ["promocentral","pendingprice"]  source=console

     █ teardown

     data_received................: 0 B    0 B/s
     data_sent....................: 0 B    0 B/s
     iteration_duration...........: avg=63.75µs min=63.75µs med=63.75µs max=63.75µs p(90)=63.75µs p(95)=63.75µs
     kafka_writer_acks_required...: 0      min=0                  max=0 
     kafka_writer_async...........: 0.00%  ✓ 0                    ✗ 2   
     kafka_writer_attempts_max....: 0      min=0                  max=0 
     kafka_writer_batch_bytes.....: 1.3 kB 7.1105804965559365 B/s
     kafka_writer_batch_max.......: 1      min=1                  max=1 
     kafka_writer_batch_size......: 2      0.010856/s
     kafka_writer_batch_timeout...: 0s     min=0s                 max=0s
     kafka_writer_error_count.....: 0      0/s
     kafka_writer_message_bytes...: 1.3 kB 7.1105804965559365 B/s
     kafka_writer_message_count...: 2      0.010856/s
     kafka_writer_read_timeout....: 0s     min=0s                 max=0s
     kafka_writer_retries_count...: 0      0/s
     kafka_writer_wait_seconds....: avg=0s      min=0s      med=0s      max=0s      p(90)=0s      p(95)=0s     
     kafka_writer_write_count.....: 2      0.010856/s
     kafka_writer_write_seconds...: avg=2.15s   min=1.9s    med=2.15s   max=2.4s    p(90)=2.35s   p(95)=2.37s  
     kafka_writer_write_timeout...: 0s     min=0s                 max=0s
     vus..........................: 1      min=0                  max=1 
     vus_max......................: 1      min=0                  max=1 

DEBU[0199] Waiting for engine processes to finish...    
DEBU[0199] Metrics processing winding down...            component=engine
DEBU[0199] Everything has finished, exiting k6!         
DEBU[0199] Stopping 1 outputs...                         component=output-manager
DEBU[0199] Stopping...                                   component=metrics-engine-ingester
DEBU[0199] Stopped!                                      component=metrics-engine-ingester

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 40 (19 by maintainers)

Most upvoted comments

@chandana194 That’s strange! Of course you can control it. I just used for-loops to demonstrate that you can generate lots of messages before sending them to Kafka, but it stuck as de-facto way to write xk6-kafka scripts and actually use xk6-kafka. You can simply generate one message per VU per iteration, and that’s totally fine.

@mostafa One issue have observed with the latest version is that, when I test with more number of users( like 50 Vu’s for 30 seconds ), the k6 summary report is not printing the message count/metrics properly. From the backend I see that more than 10K messages are produced, but summary shows very small number - kafka_writer_message_count…: 1931 78.300207/s

running (24.7s), 00/50 VUs, 50 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs  1s

     █ teardown

     data_received................: 0 B    0 B/s
     data_sent....................: 0 B    0 B/s
     iteration_duration...........: avg=12.15s min=6.93s med=12.36s max=13.15s p(90)=13s   p(95)=13.01s
     iterations...................: 50     2.027452/s
   ✓ kafka_reader_error_count.....: 0      0/s
     kafka_writer_acks_required...: 0      min=0       max=0 
     kafka_writer_async...........: 0.00%  ✓ 0         ✗ 50  
     kafka_writer_attempts_max....: 0      min=0       max=0 
     kafka_writer_batch_bytes.....: 33 kB  1.3 kB/s
     kafka_writer_batch_max.......: 1      min=1       max=1 
     kafka_writer_batch_size......: 50     2.027452/s
     kafka_writer_batch_timeout...: 0s     min=0s      max=0s
   ✗ kafka_writer_error_count.....: 431    17.476639/s
     kafka_writer_message_bytes...: 1.3 MB 52 kB/s
     kafka_writer_message_count...: 1931   78.300207/s
     kafka_writer_read_timeout....: 0s     min=0s      max=0s
     kafka_writer_retries_count...: 431    17.476639/s
     kafka_writer_wait_seconds....: avg=0s     min=0s    med=0s     max=0s     p(90)=0s    p(95)=0s    
     kafka_writer_write_count.....: 1931   78.300207/s
     kafka_writer_write_seconds...: avg=2.83s  min=2.54s med=2.85s  max=3.1s   p(90)=3.01s p(95)=3.04s 
     kafka_writer_write_timeout...: 0s     min=0s      max=0s
     vus..........................: 0      min=0       max=50
     vus_max......................: 50     min=0       max=50

@chandana194 Awesome!

The changes are released in v0.16.1. I’ll close this ticket, as I consider this fixed. If you have further questions, feel free to reopen this issue or create a new.

Yes @mostafa consuming messages through group ID’s is working fine now.

@chandana194

Then you shouldn’t care about that offset. I’ll fix it. And I think the issue is fixed by now.

Unable to set offset, yet returning the reader

Have just went through the complete logs and don’t see the above error message !

@chandana194 Have you looked at the example script I added to the PR? I managed to make it work with that script.

@mostafa Tried with the latest script you have shared. I am able to consume the messages now 😃 but one observation below

INFO[0019] committed offsets for group stage-test-v18: 
running topic: pexintegration.pendingpriced 0 interrupted iterations
default   [-----partition 11: 2998881 -----------] 1 VUs  00m11.3s/10m0s  0/1 iters, 1 per VU
INFO[0020] committed offsets for group stage-test-v18: 
running topic: pexintegration.pendingpriced 0 interrupted iterations
default   [-----partition 11: 2998882 -----------] 1 VUs  00m11.6s/10m0s  0/1 iters, 1 per VU

there are multiple partitions and I tried to consume only 2 messages. from the above logs I can say that the Messages were consumed at partition 11 with the above offset, but in the k6 results the offset values are out of sync highlighted below. Please correct me if my understanding is wrong !

✓ 2 messages is received

     █ teardown

     checks.........................: 100.00% ✓ 1         ✗ 0        
     data_received..................: 0 B     0 B/s
     data_sent......................: 0 B     0 B/s
     iteration_duration.............: avg=10.79s   min=9.99s    med=10.79s   max=11.6s    p(90)=11.44s   p(95)=11.52s  
     iterations.....................: 1       0.039389/s
     kafka_reader_dial_count........: 15      0.590836/s
     kafka_reader_dial_seconds......: avg=3.94s    min=3.94s    med=3.94s    max=3.94s    p(90)=3.94s    p(95)=3.94s   
   ✓ kafka_reader_error_count.......: 0       0/s
     kafka_reader_fetch_bytes.......: 7.0 kB  276 B/s
     kafka_reader_fetch_bytes_max...: 1000000 min=1000000 max=1000000
     kafka_reader_fetch_bytes_min...: 1       min=1       max=1      
     kafka_reader_fetch_size........: 3       0.118167/s
     kafka_reader_fetch_wait_max....: 10s     min=10s     max=10s    
     kafka_reader_fetches_count.....: 30      1.181671/s
     kafka_reader_lag...............: 0       min=0       max=0      
     kafka_reader_message_bytes.....: 33 kB   1.3 kB/s
     kafka_reader_message_count.....: 51      2.008841/s
     **kafka_reader_offset............: 7240251 min=7240251 max=7240251**
     kafka_reader_queue_capacity....: 100     min=100     max=100    
     kafka_reader_queue_length......: 49      min=49      max=49     
     kafka_reader_read_seconds......: avg=74.03µs  min=74.03µs  med=74.03µs  max=74.03µs  p(90)=74.03µs  p(95)=74.03µs 
     kafka_reader_rebalance_count...: 1       0.039389/s
     kafka_reader_timeouts_count....: 0       0/s
     kafka_reader_wait_seconds......: avg=255.59ms min=255.59ms med=255.59ms max=255.59ms p(90)=255.59ms p(95)=255.59ms

Sure Will try this and update you. FYI when I was testing earlier, I made sure that there were enough messages to consume.

@mostafa Have tried with this branch… but no luck … getting the same error !

  • Fix ReaderConfig’s default values #189

Sure Will try this and update you. FYI when I was testing earlier, I made sure that there were enough messages to consume.

@chandana194

Can you please test changes in this PR?

Update: I tested the changes myself and added a working script in scripts/test_consumer_group.js. I observed this issue while running the test script: when there is no data to consume, the reader.consume keeps waiting on messages on each partition of the topic and fails with the Unable to read messages. when it times out.

@chandana194

Also, if you want to consume from a consumer group, you need to set groupID and groupTopics, and and unset partition and topic. I am currently investigating configuration changes when passing values to kafka-go, to see if I missed setting any default values.

@chandana194 I just remembered that someone reported a similar issue:

At that time I tried to make a quick fix by adding readBatchTimeout option to ReaderConfig, which was newly introduced to kafka-go. I rebased the branch, and opened a PR #188. Can you please fetch that code and rebuild your binary and test it again using that option. For this, follow this guide, and use GitHub cli to quickly fetch the changes in the PR or just switch to the branch using git checkout add-read-batch-timeout before building the binary.

@chandana194

I think I have an idea why this happens. 💡

Currently the maxWait value of the ReaderConfig is set to 200ms by default. This means that the reader gives up looking for new messages after 200ms and your log says it is an i/o timeout. Consider increasing it to see if it helps.

@mostafa Tried below config but no luck ! still seeing the same error

const reader = new Reader({ brokers: brokers, topic: topic, maxWait:30000, connectLogger: true, });

@chandana194

I think I have an idea why this happens. 💡

Currently the maxWait value of the ReaderConfig is set to 200ms by default. This means that the reader gives up looking for new messages after 200ms and your log says it is an i/o timeout. Consider increasing it to see if it helps.

@mostafa I even tried with different offset but by default it’s taking the same offset FYI : These topics are Strimzi kafka topics not the confluent kafka topics.

INFO[0014] looking up offset of kafka reader for partition 0 of pexintegration.pendingprice: 18238021
INFO[0016] writing 1 messages to pexintegration.pendingprice (partition: 0) 
INFO[0017] initializing kafka reader for partition 0 of pexintegration.pendingprice starting at offset 18238021 
INFO[0022] the kafka reader for partition 0 of pexintegration.pendingprice is seeking to offset 10238021
INFO[0023] the kafka reader got an unknown error reading partition 0 of pexintegration.pendingprice at offset 10238021: read tcp 172.31.230.127:51358->172.16.49.27:443: i/o timeout 
INFO[0023] initializing kafka reader for partition 0 of pexintegration.pendingprice starting at offset 10238021 

@chandana194 Are you sure about the offset (10238021)?

Honestly, this is hard to reproduce locally, and given we don’t have logs from the server-side, it makes it even harder. In the meantime, I added #187.

@mostafa Tried changing the iteration to duration, Observed the same behaviour

  1. K6 produced msgs successfully but they are not reaching the the kafka topics
  2. ERRO[0055] Unable to read messages. error=“Unable to read messages.”
 **data_received................: 0 B    0 B/s**
     **data_sent....................: 0 B    0 B/s**
     iteration_duration...........: avg=143.29µs min=143.29µs med=143.29µs max=143.29µs p(90)=143.29µs p(95)=143.29µs
     kafka_writer_acks_required...: 0      min=0      max=0 
     kafka_writer_async...........: 0.00%  ✓ 0        ✗ 2   
     kafka_writer_attempts_max....: 0      min=0      max=0 
     kafka_writer_batch_bytes.....: 1.3 kB 27 B/s
     kafka_writer_batch_max.......: 1      min=1      max=1 
     kafka_writer_batch_size......: 2      0.040419/s
     kafka_writer_batch_timeout...: 0s     min=0s     max=0s
     kafka_writer_error_count.....: 1      0.02021/s
     kafka_writer_message_bytes...: 2.0 kB 40 B/s
     kafka_writer_message_count...: 3      0.060629/s
     kafka_writer_read_timeout....: 0s     min=0s     max=0s
     kafka_writer_retries_count...: 1      0.02021/s
     kafka_writer_wait_seconds....: avg=0s       min=0s       med=0s       max=0s       p(90)=0s       p(95)=0s      
     kafka_writer_write_count.....: 3      0.060629/s
     kafka_writer_write_seconds...: avg=3.17s    min=2.13s    med=3.17s    max=4.22s    p(90)=4.01s    p(95)=4.11s   
     kafka_writer_write_timeout...: 0s     min=0s     max=0s
     vus..........................: 0      min=0      max=1 
     vus_max......................: 1      min=0      max=1

Hi @mostafa Thank you for your response.

Yes, As you have mentioned - K6 says the messages are produced successfully but they are not reaching the the kafka topics. We have verified all the logs at kafka side.

I will try changing it to duration and see how it behaves.