rust-rdkafka: Upgrade from 0.34.0 to 0.36.0 causes consumer to stop consuming but keep running

As the title says, after upgrade from 0.34.0 to 0.36.0 we started getting Kafka consumers that would stop consuming messages, start leaking memory and do not leave the consumer group.

  • I do not have clear reproducer as it happened to use across multiple production environments in random fashion.
    • And for example in our staging it never happened.
  • It happened more often to environments where Kafka was under higher load. Maybe it has to do with change to use event interface, but that is just wild guess.
  • In the end I tried to set low max.poll.interval.ms but found out it happens again, which seems like it is still sending heartbeats, but for whatever reason stops pulling records, CPU drops down, memory increases and the consumer group does not re-balance.
  • After revert to use 0.34.0
Screenshot 2023-12-12 at 14 40 59 Screenshot 2023-12-12 at 14 40 33

Disclaimer: Running on the 0.34.0 for half day without it reproducing and it happened many times with 0.36.0. Would report back if it still happens on 0.34.0 and the origin is elsewhere.

About this issue

  • Original URL
  • State: open
  • Created 7 months ago
  • Reactions: 6
  • Comments: 34 (3 by maintainers)

Most upvoted comments

I think I was able to reproduce the issue locally. Running high-level consumer with following timing-related settings:

max.poll.interval.ms=20000
session.timeout.ms=10000
auto.commit.interval.ms=5000
statistics.interval.ms=5000

After calling subscribe I am polling the consumer in a loop using recv. If I add a tokio::time::sleep of 7s between polls, 0.36.2 silently fails & just stops consuming, with a sleep of 6s it resumes. With 0.35.0 it keeps consuming, even using larger sleep durations (10s+).

My guess is that there’s something wrong with the underlying librdkafka consumer not sending heartbeats.

On a side note, another issue I noticed is that since 0.36.2 the consumer starts printing verbose log messages if the debug setting is not left empty, not only from the custom client context (as it would be expected), but apparently also directly from librdkafka.

I just ran into this issue; might I suggest yanking all variants of the 0.36 branch until a proper patch lands? This will prevent others from unknowingly having issues.

If yanking is considered to be too problematic, then perhaps reverting the event API work and publishing a patch version of 0.36 would be more appropriate. Several users have reported production issues with the current release, and it would be good stewardship to prevent others from dealing with that headache.

then that library is forcing you to use a version of rdkafka that I wouldn’t consider stable for production usage and is functionally broken for some usage pattern

I think that’s the point of contention. We use 0.36 just fine and the maintainers have trouble reproducing the issue. It’s not universally broken.

I’ll mention that 0.35.0 is what we consider the last-known-good version of rdkafka, and we’ve currently blocked upgrades to 0.36.0 or higher. If there were a plan to perform a breaking republish, I’d recommend 0.35.0.

From an ecosystem perspective, I lean toward the yank. If a library is forcing you onto 0.36, then that library is forcing you to use a version of rdkafka that I wouldn’t consider stable for production usage and is functionally broken for some usage patterns. That library maintainer should re-evaluate whether they want to force their users onto such a version.

Of note, lib.rs indicates that the following libraries are depending on the 0.36.x line of rdkafka. From there, the users of the most generally used intermediate libraries tend to be using the version that relies on 0.35.

Crates that have some general usage, with prior version supporting 0.35

  • picokafka version 0.1.11 supports 0.35, most users are staying on 0.1.11
  • sarchive optional, version 0.12.7 supports 0.35, most users are staying on 0.12.7

New crates that don’t have support for 0.35 [only ever depended on 0.36]:

Crates that have earlier versions that support 0.35 [but little general usage]

Depended on only as a dev-dependency

I don’t think that’s how yank is supposed to be used and I am sure it will cause even more problems for existing users. one example is libraries that depend on 0.36.

Libraries that depend on 0.36 might get a warning. But people starting a new project or upgrading their dependencies stacks might introduce this new bug in their software.

As said above, I’ve noticed similar behaviour using 0.36.2 in two separate projects that use a StreamConsumer. Getting some memory leaks after a long period of time (first leak started after 10 days of runtime).

Feel free to tell me if you want some information about my setup (configuration, topics setup, …).

image

Will rollback to 0.35.0 to see if it fixes the issue.

Minimal example to reproduce the issue (can be included as-is in the crate’s high-level consumer integration tests):

// All produced messages should be consumed.
#[tokio::test(flavor = "multi_thread")]
async fn test_produce_consume_delay() {
    let _r = env_logger::try_init();

    let start_time = current_time_millis();
    let topic_name = rand_test_topic("test_produce_consume_delay");
    let message_map = populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;
    let consumer = create_stream_consumer(&rand_test_group(), None);
    consumer.subscribe(&[topic_name.as_str()]).unwrap();

    for i in 0..10 {
        println!("awaiting message: {}", i);

        let message = consumer.recv().await;

        let timeout = tokio::time::sleep(std::time::Duration::from_millis(5000));

        println!("processing message: {}", i);

        match message {
            Ok(m) => {
                let id = message_map[&(m.partition(), m.offset())];
                match m.timestamp() {
                    Timestamp::CreateTime(timestamp) => assert!(timestamp >= start_time),
                    _ => panic!("Expected createtime for message timestamp"),
                };
                assert_eq!(m.payload_view::<str>().unwrap().unwrap(), value_fn(id));
                assert_eq!(m.key_view::<str>().unwrap().unwrap(), key_fn(id));
                assert_eq!(m.topic(), topic_name.as_str());
            }
            Err(e) => panic!("Error receiving message: {:?}", e),
        };

        timeout.await;
    }
}

On v0.35.0 this completes w/o issues, while on v0.36.2 it gets stuck after the first message w/o any error.

You can also try lowering the timeout of 5s, on my machine it still fails on 0.36.2 with a timeout of 1s and works with 500ms.

We tested internally today the BaseProducer and BaseConsumer and we couldn’t reproduce the issue. We’ll try out tomorrow the FutureProducer and the StreamConsumer. Please provide the properties you’re setting and any relevant logs. Are you also using the statically linked librdkafka or linking it dynamically?

@zhrebicek @neoeinstein The event API does allow some transient errors to be propagated that the callback API did not. @scanterog brought this up to upstream here: https://github.com/confluentinc/librdkafka/issues/4493 . Is it possible that your code might have not handled these explicitly, leading to either the consumer or producer becoming idle?

This may manifest with KafkaError (Message consumption error: BrokerTransportFailure (Local: Broker transport failure)).