confluent-kafka-go: Consumer hanging on `Close()` call when redeploying application
Description
We are running into issues when shutting down consumers in our environment, where closing the consumer gets stuck and loops before our pod (we run in k8s) is forcibly terminated. This does not happen for every instance of our application, but on a restart or redeploy of all pods, 5-10% will end up hanging and forcibly terminated. Details on how many pods are running and how we redeploy are in the “How to Reproduce” section.
We think it has something to do with the unsubscribe() call in consumer.Close(), but are unsure. It could also be an implentation detail of how we are handling Assign/Revoke events. It is worth noting that we will be in the middle of a rebalance while making this call to consumer.Close. We cannot use static membership due to the issue reported here:
Config/Env Details:
- confluent-kafka-go: 1.8.2
- librdkafka: 1.8.2
- Apache Kafka broker version: 2.7
- OS: “Ubuntu 18.04.6 LTS (Bionic Beaver)”
- Client logs - https://gist.github.com/stb2132/720851a416a510f7135b4ab5eb408bd7
- Note: the logs are “earlier” as you scroll down
- Configmap:
cfgMap := &kafka.ConfigMap{
"session.timeout.ms": "30000",
"bootstrap.servers": "REDACTED",
"broker.version.fallback": "0.10.2",
"group.id": "net-shadow",
"log.connection.close": true,
"go.logs.channel.enable": true,
"message.max.bytes": "5000000",
"go.application.rebalance.enable": true,
"enable.auto.commit": false,
"enable.partition.eof": true,
"default.topic.config": kafka.ConfigMap{
"auto.offset.reset": "latest",
},
"coordinator.query.interval.ms": 120000,
"queued.max.messages.kbytes": 1048576,
"fetch.wait.max.ms": 100,
"partition.assignment.strategy": "cooperative-sticky",
}
Code Details
We instantiate our consumer with a rebalanceCB and a SubscribeTopics call:
if err := consumer.SubscribeTopics(topics, r.rebalanceCallback); err != nil {
return nil, errors.Wrap(err, "error subscribing to kafka topic")
}
Note: our rebalanceCB is a bit strange in that it just a wrapper for calling our “handleEvent” function:
func (r *reader) rebalanceCallback(kafkaConsumer *kafka.Consumer, event kafka.Event) error {
r.handleEvent(event, nil)
return nil
}
With our handleEvent function (note - have removed some log lines/if statements to keep the code concise/clear):
func (r *reader) handleEvent(ev kafka.Event, decoder *decoder) {
var err error
switch e := ev.(type) {
case kafka.AssignedPartitions:
err = r.consumer.IncrementalAssign(e.Partitions)
if err != nil {
log.Error("error assigning partitions")
}
case kafka.RevokedPartitions:
err = r.consumer.IncrementalUnassign(e.Partitions)
if err != nil {
log.Error("error revoking partitions")
}
case *kafka.Message:
// Redacted, but basically we pass message info onto a channel
handler.in <- e
case kafka.Error:
log.Error("error received from kafka consumer")
}
}
We have two Go threads that handle polling and offset commits respectively. The application will shut down by stopping the poll goroutine with stopReading(), then closing offset commits channel, before finally attempting to close the consumer:
func (r *reader) stopReading() {
r.stopReads.Set(true)
r.readStopper.StopAndWait()
}
func (r *reader) close() {
close(r.offsetCommits)
r.stopper.StopAndWait()
if r.events != nil {
close(r.events)
}
log.Info("closing queue reader...", log.String("cluster", r.cluster.Name), log.Strings("topics", r.topics))
if err := r.consumer.Close(); err != nil {
log.Error("error closing consumer")
} else {
log.Info("consumer closed")
}
}
Additional Observations
According to our application logs, we will stop reading and writing offset commits, but we will hang when calling r.consumer.Close().
This can be found in the gist of client logs linked below, but I have included a snippet here:
"2022-04-19T16:01:20.357Z","2022-04-19T16:01:19.549434+00:00 INFO (queue/reader.go:479) - closing queue reader... cluster=kafka-net topics={flow_logs_topic}"
"2022-04-19T16:01:20.357Z","%7|1650384079.549|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group ""net-shadow"" received op SUBSCRIBE in state up (join-state wait-join)"
"2022-04-19T16:01:20.357Z","%7|1650384079.549|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group ""net-shadow"": subscribe to new unset subscription of 0 topics (join-state wait-join)"
"2022-04-19T16:01:20.357Z","%7|1650384079.549|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group ""net-shadow"": postponing subscribe until previous rebalance completes (join-state wait-join)"
"2022-04-19T16:01:20.357Z","%7|1650384079.606|CLOSE|rdkafka#consumer-1| [thrd:app]: Closing consumer"
"2022-04-19T16:01:20.357Z","%7|1650384079.606|CLOSE|rdkafka#consumer-1| [thrd:app]: Waiting for close events"
After this point in the logs, we saw a few commonalities in the pods that get stuck. We consistently see:
"2022-04-19T16:01:20.358Z","%7|1650384079.606|UNASSIGN|rdkafka#consumer-1| [thrd:app]: Forcing unassign of 13 partition(s)"
...
...
"2022-04-19T16:01:20.359Z","%4|1650384079.606|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group ""net-shadow"": application *assign() call failed: Changes to the current assignment must be made using incremental_assign() or incremental_unassign() when rebalance protocol type is COOPERATIVE"
and ultimately we will “hang” with this log line repeatedly reported:
"2022-04-19T16:01:36.424Z","%7|1650384095.480|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group ""net-shadow"": waiting for assign call, 13 toppar(s), 0 commit(s) (state up, join-state wait-unassign-call) before terminating"
How to reproduce
We can have this consistently occur in our setup, but I have not spun up a separate test.
We see this happen upon a deploy with 20 consumer instances running. If we run such that 4 consumers rotate every 30 seconds during a deploy, usually 1-2 out of 20 will end up stuck and forcibly terminated.
spec:
replicas: 20
minReadySeconds: 30
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 4
maxUnavailable: 0
Please let me know if you need more information or have further questions.
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 15 (13 by maintainers)
Commits related to this issue
- Allow any form of unassign*() during consumer close (thanks to @kevinconaway) https://github.com/confluentinc/confluent-kafka-go/issues/767#issuecomment-1107465737 — committed to confluentinc/librdkafka by edenhill 2 years ago
- Allow any form of unassign*() during consumer close (thanks to @kevinconaway) https://github.com/confluentinc/confluent-kafka-go/issues/767#issuecomment-1107465737 — committed to confluentinc/librdkafka by edenhill 2 years ago
- Fix rdkafka consumer hanging when disconnecting (#13144) Calling `consumer.close()` was hanging indefinitely when the `optimizedRebalance` config was enabled. The issue is the same as https://gi... — committed to microsoft/FluidFramework by GaryWilber 2 years ago
If all goes well we should be able to release 1.9.0 next week.
We generally try to avoid backports.
@edenhill Any update on when librdkafka 1.9.0 will be released?