franz-go: When changing from SASL to TLS authentication, the latest offset is not stored in the consumer group

So I am using franz-go via benthos. Specifically this module: https://www.benthos.dev/docs/components/inputs/kafka_franz/

One thing I noticed is if i changed my authentication method from sasl to tls, my consumer group sometimes does not remember the last offset and repeats the last message on the partition. I manually checked that the offset was stored before switching auth methods, nonetheless the stored offset for the consumer group changes back 1 when i switch.

I have reproduced the error going from sasl to tls, and sasl to tls. Not sure if it matters but i am using brokers via aws and the ports for tls and sasl are different. The mechanism of sasl i am using is IAM. The topic has a replication factor of 3 and min.insync.replicas is set to 2

Logs when I am using sasl:

qd_kafka_outbox_test_dev | {"@service":"benthos","level":"info","msg":"Running main config from specified file","path":"/queue-dispatcher/config.yaml","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning autocommit loop","path":"root.input","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"Receiving messages from Kafka topics: [heartbeat]","path":"root.input","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"metadata update triggered","path":"root.input","time":"2022-08-03T00:39:14Z","why":"client initialization"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_deadletter_producer_avro","level":"info","msg":"Writing messages to Kafka topic: ${! meta(\"input_topic\") + env(\"DEADLETTER_TOPIC_SUFFIX\").or(\"-deadletter\") }","path":"root.output.switch.0.output.switch.0.output","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"","level":"info","msg":"Dropping messages.","path":"root.output.switch.0.output.switch.1.output","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_invalid_producer","level":"info","msg":"Writing messages to Kafka topic: ${! meta(\"input_topic\") + env(\"INVALID_TOPIC_SUFFIX\").or(\"-invalid\") }","path":"root.output.switch.1.output.switch.0.output","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"","level":"info","msg":"Dropping messages.","path":"root.output.switch.1.output.switch.1.output","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_deadletter_producer_http","level":"info","msg":"Writing messages to Kafka topic: ${! meta(\"input_topic\") + env(\"DEADLETTER_TOPIC_SUFFIX\").or(\"-deadletter\") }","path":"root.output.switch.2.output.fallback.1.switch.0.output","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"","level":"info","msg":"Dropping messages.","path":"root.output.switch.2.output.fallback.1.switch.1.output","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","level":"info","msg":"Launching a benthos instance, use CTRL+C to close","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"http_call","level":"info","msg":"Sending messages via HTTP requests to: http://api-gateway:8080/get_events/${! meta(\"EventName\") }","path":"root.output.switch.2.output.fallback.0","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","level":"info","msg":"Listening for HTTP requests at: http://0.0.0.0:4195","time":"2022-08-03T00:39:14Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning to manage the group lifecycle","path":"root.input","time":"2022-08-03T00:39:15Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"joining group","path":"root.input","time":"2022-08-03T00:39:15Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","member_id":"kgo-86ed804b-7d9a-42ff-b519-e55187a1b9f3","msg":"join returned MemberIDRequired, rejoining with response's MemberID","path":"root.input","time":"2022-08-03T00:39:16Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","balance_protocol":"cooperative-sticky","generation":"26","group":"local-queue-dispatcher","instance_id":"\u003cnil\u003e","label":"kafka_topic_consumer","leader":"true","level":"info","member_id":"kgo-86ed804b-7d9a-42ff-b519-e55187a1b9f3","msg":"joined, balancing group","path":"root.input","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"balancing group as leader","path":"root.input","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","id":"kgo-86ed804b-7d9a-42ff-b519-e55187a1b9f3","interests":"interested topics: [heartbeat], previously owned: ","label":"kafka_topic_consumer","level":"info","msg":"balance group member","path":"root.input","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"balanced","path":"root.input","plan":"kgo-86ed804b-7d9a-42ff-b519-e55187a1b9f3{heartbeat[0 1 2 3 4 5 6 7]}","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"syncing","path":"root.input","protocol":"cooperative-sticky","protocol_type":"consumer","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","assigned":"heartbeat[0 1 2 3 4 5 6 7]","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"synced","path":"root.input","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","added":"heartbeat[0 1 2 3 4 5 6 7]","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","lost":"","msg":"new group session begun","path":"root.input","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning heartbeat loop","path":"root.input","time":"2022-08-03T00:39:18Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","how":"assigning everything new, keeping current assignment","input":"heartbeat[0{23.34 0} 3{-2.-1 0} 5{-2.-1 0} 2{-2.-1 0} 6{-2.-1 0} 1{-2.-1 0} 7{-2.-1 0} 4{-2.-1 0}]","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input","time":"2022-08-03T00:39:19Z","why":"newly fetched offsets for group local-queue-dispatcher"}

qd_kafka_outbox_test_dev | {"@service":"benthos","content":{"heartbeat":6},"error":null,"label":"message_in_transit","level":"info","meta":{"EventId":"c5ab084c-c4c4-4fcc-abf7-fcc384881ad3","EventName":"heartbeat_v1","GeneratedAt":"2022-07-19T22:04:23.559063Z","OrderingGroupId":"3a7fe5d5-85f1-4d32-8ddb-bbe407b5adcf","Origin":"abff48392c1684bec55a659e007ee39b","consumer_group":"local-queue-dispatcher","id":"c5ab084c-c4c4-4fcc-abf7-fcc384881ad3","input_topic":"heartbeat","kafka_key":"3a7fe5d5-85f1-4d32-8ddb-bbe407b5adcf\n\n","kafka_offset":"23","kafka_partition":"0","kafka_timestamp_unix":"1659487672","kafka_topic":"heartbeat","old_root":"{\"heartbeat\":6}","team":"infra"},"msg":"","path":"root.pipeline.processors.2.switch.4.processors.0","time":"2022-08-03T00:47:52Z"}

Logs when I change the auth method to just tls:

qd_kafka_outbox_test_dev | {"@service":"benthos","level":"info","msg":"Running main config from specified file","path":"/queue-dispatcher/config.yaml","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning autocommit loop","path":"root.input","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"Receiving messages from Kafka topics: [heartbeat]","path":"root.input","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"metadata update triggered","path":"root.input","time":"2022-08-03T00:48:35Z","why":"client initialization"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_deadletter_producer_avro","level":"info","msg":"Writing messages to Kafka topic: ${! meta(\"input_topic\") + env(\"DEADLETTER_TOPIC_SUFFIX\").or(\"-deadletter\") }","path":"root.output.switch.0.output.switch.0.output","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"","level":"info","msg":"Dropping messages.","path":"root.output.switch.0.output.switch.1.output","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_invalid_producer","level":"info","msg":"Writing messages to Kafka topic: ${! meta(\"input_topic\") + env(\"INVALID_TOPIC_SUFFIX\").or(\"-invalid\") }","path":"root.output.switch.1.output.switch.0.output","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"","level":"info","msg":"Dropping messages.","path":"root.output.switch.1.output.switch.1.output","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_deadletter_producer_http","level":"info","msg":"Writing messages to Kafka topic: ${! meta(\"input_topic\") + env(\"DEADLETTER_TOPIC_SUFFIX\").or(\"-deadletter\") }","path":"root.output.switch.2.output.fallback.1.switch.0.output","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"","level":"info","msg":"Dropping messages.","path":"root.output.switch.2.output.fallback.1.switch.1.output","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","level":"info","msg":"Launching a benthos instance, use CTRL+C to close","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","level":"info","msg":"Listening for HTTP requests at: http://0.0.0.0:4195","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"http_call","level":"info","msg":"Sending messages via HTTP requests to: http://api-gateway:8080/get_events/${! meta(\"EventName\") }","path":"root.output.switch.2.output.fallback.0","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning to manage the group lifecycle","path":"root.input","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"joining group","path":"root.input","time":"2022-08-03T00:48:35Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","member_id":"kgo-cd7b956d-a823-4de6-aee4-c51f13656424","msg":"join returned MemberIDRequired, rejoining with response's MemberID","path":"root.input","time":"2022-08-03T00:48:36Z"
qd_kafka_outbox_test_dev | {"@service":"benthos","balance_protocol":"cooperative-sticky","generation":"27","group":"local-queue-dispatcher","instance_id":"\u003cnil\u003e","label":"kafka_topic_consumer","leader":"true","level":"info","member_id":"kgo-cd7b956d-a823-4de6-aee4-c51f13656424","msg":"joined, balancing group","path":"root.input","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"balancing group as leader","path":"root.input","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","id":"kgo-cd7b956d-a823-4de6-aee4-c51f13656424","interests":"interested topics: [heartbeat], previously owned: ","label":"kafka_topic_consumer","level":"info","msg":"balance group member","path":"root.input","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"balanced","path":"root.input","plan":"kgo-cd7b956d-a823-4de6-aee4-c51f13656424{heartbeat[0 1 2 3 4 5 6 7]}","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"syncing","path":"root.input","protocol":"cooperative-sticky","protocol_type":"consumer","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","assigned":"heartbeat[0 1 2 3 4 5 6 7]","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"synced","path":"root.input","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","added":"heartbeat[0 1 2 3 4 5 6 7]","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","lost":"","msg":"new group session begun","path":"root.input","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","group":"local-queue-dispatcher","label":"kafka_topic_consumer","level":"info","msg":"beginning heartbeat loop","path":"root.input","time":"2022-08-03T00:48:47Z"}
qd_kafka_outbox_test_dev | {"@service":"benthos","how":"assigning everything new, keeping current assignment","input":"heartbeat[6{-2.-1 0} 1{-2.-1 0} 7{-2.-1 0} 4{-2.-1 0} 0{23.34 0} 3{-2.-1 0} 5{-2.-1 0} 2{-2.-1 0}]","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input","time":"2022-08-03T00:48:47Z","why":"newly fetched offsets for group local-queue-dispatcher"}
qd_kafka_outbox_test_dev | {"@service":"benthos","label":"kafka_topic_consumer","level":"info","msg":"metadata update triggered","path":"root.input","time":"2022-08-03T00:48:47Z","why":"loading offsets in new session from assign"}

qd_kafka_outbox_test_dev | {"@service":"benthos","content":{"heartbeat":6},"error":null,"label":"message_in_transit","level":"info","meta":{"EventId":"c5ab084c-c4c4-4fcc-abf7-fcc384881ad3","EventName":"heartbeat_v1","GeneratedAt":"2022-07-19T22:04:23.559063Z","OrderingGroupId":"3a7fe5d5-85f1-4d32-8ddb-bbe407b5adcf","Origin":"abff48392c1684bec55a659e007ee39b","consumer_group":"local-queue-dispatcher","id":"c5ab084c-c4c4-4fcc-abf7-fcc384881ad3","input_topic":"heartbeat","kafka_key":"3a7fe5d5-85f1-4d32-8ddb-bbe407b5adcf\n\n","kafka_offset":"23","kafka_partition":"0","kafka_timestamp_unix":"1659487672","kafka_topic":"heartbeat","old_root":"{\"heartbeat\":6}","team":"infra"},"msg":"","path":"root.pipeline.processors.2.switch.4.processors.0","time":"2022-08-03T00:48:53Z"}

Logs that stuck out:

qd_kafka_outbox_test_dev | {"@service":"benthos","how":"assigning everything new, keeping current assignment","input":"heartbeat[0{23.34 0} 3{-2.-1 0} 5{-2.-1 0} 2{-2.-1 0} 6{-2.-1 0} 1{-2.-1 0} 7{-2.-1 0} 4{-2.-1 0}]","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input","time":"2022-08-03T00:39:19Z","why":"newly fetched offsets for group local-queue-dispatcher"}
qd_kafka_outbox_test_dev | {"@service":"benthos","how":"assigning everything new, keeping current assignment","input":"heartbeat[6{-2.-1 0} 1{-2.-1 0} 7{-2.-1 0} 4{-2.-1 0} 0{23.34 0} 3{-2.-1 0} 5{-2.-1 0} 2{-2.-1 0}]","label":"kafka_topic_consumer","level":"info","msg":"assigning partitions","path":"root.input","time":"2022-08-03T00:48:47Z","why":"newly fetched offsets for group local-queue-dispatcher"}

Not sure if anything sticks out in the logs that makes you think the franz-go client is ill-configured? or if there is a bug with franz-go itself…

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 15 (9 by maintainers)

Most upvoted comments

ok, good, that’s great. Sorry about the breaking(ish) changes, the API didn’t change, but I do concede that you need to handle a new error new if you previously canceled contexts.