xk6-kafka: Unable to send messages to the kafka topics
Hi, I am trying the send messages to already available kafka topics. I am able to successfully make a connection to the kafka broker and read the existing topics, but when I try to produce and consume messages I am not getting any error on k6 and also the messages are not reaching the topic. Could you please help me on this ?
Script snippet :
import { check } from "k6";
//import {fs} from 'fs';
import {
Writer,
Reader,
Connection,
SchemaRegistry,
SCHEMA_TYPE_JSON,
SCHEMA_TYPE_STRING,
SASL_SCRAM_SHA512,
} from "k6/x/kafka"; // import kafka extension
export const options = {
vus: 1,
iteration : 1,
};
const brokers = ["bootstrap-server-name:portnumber"];
const topic = "topic-name";
const saslConfig = {
username: "<username>",
password: "<pwd>",
algorithm: SASL_SCRAM_SHA512,
clientCertPem: "pathto/client.pem",
serverCaPem: "pathto/serverCa.pem",
};
const tlsConfig = {
enableTls: true,
insecureSkipTlsVerify: true,
clientCertPem: "pathto/client.pem",
serverCaPem: "pathto/serverCa.pem",
};
const offset = 0;
// partition and groupId are mutually exclusive
const partition = 0;
const numPartitions = 1;
const replicationFactor = 1;
const writer = new Writer({
brokers: brokers,
topic: topic,
sasl: saslConfig,
tls: tlsConfig,
});
const reader = new Reader({
brokers: brokers,
topic: topic,
//partition: partition,
//maxWait: 300,
offset: offset,
sasl: saslConfig,
tls: tlsConfig,
});
const connection = new Connection({
address: brokers[0],
sasl: saslConfig,
tls: tlsConfig,
});
const schemaRegistry = new SchemaRegistry();
if (__VU == 0) {
console.log(
"Existing topics: ",
connection.listTopics(saslConfig, tlsConfig)
);
}
export default function () {
for (let index = 0; index < 2; index++) {
let messages = [
{
key: schemaRegistry.serialize({
data:
"4096525_541" ,
schemaType: SCHEMA_TYPE_STRING,
}),
value: schemaRegistry.serialize({
data: {
"eventId" : "22358d8b-9826-4bb0-b209-cd13a0ae1ff", //random UUI
"itemNumber" : 476183,
"storeNumber": 4105,
"retailPricePriorAmount": 10.99,
"retailPriceAmount": 10.99,
"labelPrice": 8.77,
"sellingPriceAmount" : 8.77,
"winningLabelAdv" : "WAS_PP",
"priceTypeCode": 2,
"priceDisplayCode": 3,
"advEvents": [{
"eventType": "WAS_PP",
"eventDescription" : "WAS NCF ITEMS ALICIA",
"eventStartDate" : "2022-12-21T00:00:00Z",
"eventEndDate" : "2023-07-01T23:59:59.999Z",
"dollarOffPrice": 8.77,
"percentOff" : 0,
"sourceId" : "CORP"
}],
"nlpIndicator": "N",
"mgrPriceIndicator": "N",
"updateId": "PEWINCMP",
"createdTS": "2022-11-09T06:07:16.733+0000",
"updatedTS": "2022-11-09T06:07:16.733+0000"
},
schemaType: SCHEMA_TYPE_JSON,
}),
},
];
//console.log(new Date())
writer.produce({ messages: messages });
}
let messages = reader.consume({ limit: 2 });
check(messages, {
"2 messages returned": (msgs) => msgs.length == 2,
});
}
export function teardown(data) {
writer.close();
reader.close();
connection.close();
}
K6 run logs :
./k6 run -v '/Users/cnagara/Documents/kafkaTesting/spexKafkaTesting.js'
DEBU[0000] Logger format: TEXT
DEBU[0000] k6 version: v0.41.0 ((devel), go1.18.3, darwin/amd64)
/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io
DEBU[0000] Resolving and reading test '/Users//Documents/kafkaTesting/spexKafkaTesting.js'...
DEBU[0000] Loading... moduleSpecifier="file:///Users//Documents/kafkaTesting/spexKafkaTesting.js" originalModuleSpecifier=/Users//Documents/kafkaTesting/spexKafkaTesting.js
DEBU[0000] '/Users//Documents/kafkaTesting/spexKafkaTesting.js' resolved to 'file:///Users/cnagara/Documents/kafkaTesting/spexKafkaTesting.js' and successfully loaded 5165 bytes!
DEBU[0000] Gathering k6 runtime options...
DEBU[0000] Initializing k6 runner for '/Users//Documents/kafkaTesting/spexKafkaTesting.js' (file:///Users//Documents/kafkaTesting/spexKafkaTesting.js)...
DEBU[0000] Detecting test type for... test_path="file:///Users//Documents/kafkaTesting/spexKafkaTesting.js"
DEBU[0000] Trying to load as a JS test... test_path="file:///Users//Documents/kafkaTesting/spexKafkaTesting.js"
DEBU[0000] Babel: Transformed t=127.309875ms
INFO[0005] Existing topics: ["sync","pendingprice"] source=console
WARN[0005] There were unknown fields in the options exported in the script error="json: unknown field \"iteration\""
DEBU[0005] Runner successfully initialized!
DEBU[0005] Parsing CLI flags...
DEBU[0005] Consolidating config layers...
DEBU[0005] Parsing thresholds and validating config...
DEBU[0005] Initializing the execution scheduler...
DEBU[0005] Starting 1 outputs... component=output-manager
DEBU[0005] Starting... component=metrics-engine-ingester
DEBU[0005] Started! component=metrics-engine-ingester
execution: local
script: /Users/Documents/kafkaTesting/spexKafkaTesting.js
output: -
scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
* default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)
DEBU[0005] Starting the REST API server on localhost:6565
DEBU[0005] Initialization starting... component=engine
DEBU[0005] Starting emission of VUs and VUsMax metrics...
DEBU[0005] Start of initialization executorsCount=1 neededVUs=1 phase=local-execution-scheduler-init
DEBU[0010] Initialized VU #1 phase=local-execution-scheduler-init
DEBU[0010] Finished initializing needed VUs, start initializing executors... phase=local-execution-scheduler-init
DEBU[0010] Initialized executor default phase=local-execution-scheduler-init
DEBU[0010] Initialization completed phase=local-execution-scheduler-init
DEBU[0010] Execution scheduler starting... component=engine
DEBU[0010] Start of test run executorsCount=1 phase=local-execution-scheduler-run
DEBU[0010] Running setup() phase=local-execution-scheduler-run
DEBU[0010] Metrics processing started... component=engine
INFO[0015] Existing topics: ["pendingprice","promocentral"] source=console
DEBU[0015] Start all executors... phase=local-execution-scheduler-run
DEBU[0015] Starting executor executor=default startTime=0s type=per-vu-iterations
DEBU[0015] Starting executor run... executor=per-vu-iterations iterations=1 maxDuration=10m0s scenario=default type=per-vu-iterations vus=1
INFO[0015] "2022-12-26T08:15:14.116Z" source=console
INFO[0020] "2022-12-26T08:15:18.657Z" source=console
^CDEBU[0190] Stopping k6 in response to signal... sig=interrupt
DEBU[0190] Engine: Thresholds terminated component=engine
ERRO[0190] Unable to read messages. error="Unable to read messages."
DEBU[0190] run: context expired; exiting... component=engine
DEBU[0190] Metrics emission of VUs and VUsMax metrics stopped
DEBU[0190] Executor finished successfully executor=default startTime=0s type=per-vu-iterations
DEBU[0190] Running teardown() phase=local-execution-scheduler-run
INFO[0194] Existing topics: ["promocentral","pendingprice"] source=console
DEBU[0194] Execution scheduler terminated component=engine error="<nil>"
DEBU[0194] Processing metrics and thresholds after the test run has ended... component=engine
DEBU[0194] Stopping... component=metrics-engine-ingester
DEBU[0194] Stopped! component=metrics-engine-ingester
DEBU[0194] Engine run terminated cleanly
running (03m04.2s), 0/1 VUs, 0 complete and 1 interrupted iterations
default ✗ [--------------------------------------] 1 VUs 02m54.5s/10m0s 0/1 iters, 1 per VU
WARN[0194] No script iterations finished, consider making the test duration longer
INFO[0199] Existing topics: ["promocentral","pendingprice"] source=console
█ teardown
data_received................: 0 B 0 B/s
data_sent....................: 0 B 0 B/s
iteration_duration...........: avg=63.75µs min=63.75µs med=63.75µs max=63.75µs p(90)=63.75µs p(95)=63.75µs
kafka_writer_acks_required...: 0 min=0 max=0
kafka_writer_async...........: 0.00% ✓ 0 ✗ 2
kafka_writer_attempts_max....: 0 min=0 max=0
kafka_writer_batch_bytes.....: 1.3 kB 7.1105804965559365 B/s
kafka_writer_batch_max.......: 1 min=1 max=1
kafka_writer_batch_size......: 2 0.010856/s
kafka_writer_batch_timeout...: 0s min=0s max=0s
kafka_writer_error_count.....: 0 0/s
kafka_writer_message_bytes...: 1.3 kB 7.1105804965559365 B/s
kafka_writer_message_count...: 2 0.010856/s
kafka_writer_read_timeout....: 0s min=0s max=0s
kafka_writer_retries_count...: 0 0/s
kafka_writer_wait_seconds....: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_write_count.....: 2 0.010856/s
kafka_writer_write_seconds...: avg=2.15s min=1.9s med=2.15s max=2.4s p(90)=2.35s p(95)=2.37s
kafka_writer_write_timeout...: 0s min=0s max=0s
vus..........................: 1 min=0 max=1
vus_max......................: 1 min=0 max=1
DEBU[0199] Waiting for engine processes to finish...
DEBU[0199] Metrics processing winding down... component=engine
DEBU[0199] Everything has finished, exiting k6!
DEBU[0199] Stopping 1 outputs... component=output-manager
DEBU[0199] Stopping... component=metrics-engine-ingester
DEBU[0199] Stopped! component=metrics-engine-ingester
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 40 (19 by maintainers)
@chandana194 That’s strange! Of course you can control it. I just used for-loops to demonstrate that you can generate lots of messages before sending them to Kafka, but it stuck as de-facto way to write xk6-kafka scripts and actually use xk6-kafka. You can simply generate one message per VU per iteration, and that’s totally fine.
@mostafa One issue have observed with the latest version is that, when I test with more number of users( like 50 Vu’s for 30 seconds ), the k6 summary report is not printing the message count/metrics properly. From the backend I see that more than 10K messages are produced, but summary shows very small number - kafka_writer_message_count…: 1931 78.300207/s
@chandana194 Awesome!
The changes are released in v0.16.1. I’ll close this ticket, as I consider this fixed. If you have further questions, feel free to reopen this issue or create a new.
Yes @mostafa consuming messages through group ID’s is working fine now.
@chandana194
Then you shouldn’t care about that offset. I’ll fix it. And I think the issue is fixed by now.
Have just went through the complete logs and don’t see the above error message !
@mostafa Tried with the latest script you have shared. I am able to consume the messages now 😃 but one observation below
there are multiple partitions and I tried to consume only 2 messages. from the above logs I can say that the Messages were consumed at partition 11 with the above offset, but in the k6 results the offset values are out of sync highlighted below. Please correct me if my understanding is wrong !
@mostafa Have tried with this branch… but no luck … getting the same error !
Sure Will try this and update you. FYI when I was testing earlier, I made sure that there were enough messages to consume.
@chandana194
Can you please test changes in this PR?
Update: I tested the changes myself and added a working script in
scripts/test_consumer_group.js
. I observed this issue while running the test script: when there is no data to consume, thereader.consume
keeps waiting on messages on each partition of the topic and fails with theUnable to read messages.
when it times out.@chandana194
Also, if you want to consume from a consumer group, you need to set
groupID
andgroupTopics
, and and unsetpartition
andtopic
. I am currently investigating configuration changes when passing values tokafka-go
, to see if I missed setting any default values.@chandana194 I just remembered that someone reported a similar issue:
At that time I tried to make a quick fix by adding
readBatchTimeout
option toReaderConfig
, which was newly introduced to kafka-go. I rebased the branch, and opened a PR #188. Can you please fetch that code and rebuild your binary and test it again using that option. For this, follow this guide, and use GitHub cli to quickly fetch the changes in the PR or just switch to the branch usinggit checkout add-read-batch-timeout
before building the binary.@mostafa Tried below config but no luck ! still seeing the same error
const reader = new Reader({ brokers: brokers, topic: topic, maxWait:30000, connectLogger: true, });
@chandana194
I think I have an idea why this happens. 💡
Currently the
maxWait
value of theReaderConfig
is set to 200ms by default. This means that the reader gives up looking for new messages after 200ms and your log says it is ani/o timeout
. Consider increasing it to see if it helps.@mostafa I even tried with different offset but by default it’s taking the same offset FYI : These topics are Strimzi kafka topics not the confluent kafka topics.
@chandana194 Are you sure about the offset (10238021)?
Honestly, this is hard to reproduce locally, and given we don’t have logs from the server-side, it makes it even harder. In the meantime, I added #187.
@mostafa Tried changing the iteration to duration, Observed the same behaviour
Hi @mostafa Thank you for your response.
Yes, As you have mentioned - K6 says the messages are produced successfully but they are not reaching the the kafka topics. We have verified all the logs at kafka side.
I will try changing it to duration and see how it behaves.