fluent-bit: kafka output is stuck when rdkafka queue is saturated - regression since 1.7.2

Not sure if this more a question, a bug or a feature request 😃 This issue report started off from a question: “How to ensure that messages in rdkafka’s queue are not lost when fluent-bit is restarted?”. I’m still interested in an answer, but I think more important now is fixing of the bug uncovered while investigating the subject.

My inputs are tailed files with json log, output is kafka.

I investigated how to minimize the possibility of losing messages in following scenarios: A) connection to kafka is lost, B) fluent-bit process is restarted , C) both above at once.

FLB seems to handle A) after increasing rdkafka.message.timeout.ms, B) is also OK thanks to offset stored in local registry.

But I noticed that restarts lead to data loss when the connection is down © because there can be messages pending inside rdkafka local queue, which are considered delivered by FLB. This was noticed by @Charles-WYC in this comment to #514.

With some community help I tried to use FLB filesystem storage feature, but it doesn’t seem to work very well with kafka output - new events are pushed quickly to local rdkafka queue as long as there is space in it and storage does not take into account if actual sending succeeds. I tried to disable that rdkafka buffer by setting it to minimal sizes: like rdkafka.queue.buffering.max.messages 1 or 100 to force the use of FLB storage, but that quickly lead to a deadlock soon after connection to kafka is lost. It looks very much like issues #2161 #2244 :

[2021/07/12 08:48:48] [error] % Failed to produce to topic fluentbit-test: Local: Queue full
[2021/07/12 08:48:48] [ warn] [output:kafka:kafka.0] internal queue is full, retrying in one second
[2021/07/12 08:48:57] [ warn] [engine] failed to flush chunk '3303-1626072536.595600551.flb', retry in 10 seconds: task_id=1, input=tail.0 > output=kafka.0 (out_id=0)
[2021/07/12 08:49:07] [ warn] [engine] failed to flush chunk '3303-1626072536.595600551.flb', retry in 18 seconds: task_id=1, input=tail.0 > output=kafka.0 (out_id=0)
[...failed to flush continues indefinitely ]

FLB does not recover from this even after kafka connection is restored. It continues to work OK after restarting the process though, picking up queue data from filestytem if present. I tried both with queue_full_retries 0 and queue_full_retries 10 but with same results. Not sure if this is a bug or if just such extreme config is not supported. This looks like a bug to me.

Is there any other way to ensure messages are not lost when FLB process aborts with non-empty rdkafka queue and no possibility to flush it?

Steps to reproduce the problem:

  • set up small rdkafka buffer see the configuration below
  • run some traffic like 5 messages / s
  • disable kafka connection
  • wait for the Queue full to happen:
[2021/07/12 18:17:26] [ info] [sp] stream processor started
[2021/07/12 18:17:26] [ info] [input:tail:tail.0] inotify_fs_add(): inode=147501 watch_fd=1 name=/opt/service/businessActivity.log
[2021/07/12 18:17:40] [ info] [engine] started (pid=15381)
[2021/07/12 18:17:40] [ info] [storage] version=1.1.1, initializing...
[2021/07/12 18:17:40] [ info] [storage] root path '/opt/fluentbit/data/'
[2021/07/12 18:17:40] [ info] [storage] full synchronization mode, checksum disabled, max_chunks_up=128
[2021/07/12 18:17:40] [ info] [storage] backlog input plugin: storage_backlog.2
[2021/07/12 18:17:40] [ info] [input:storage_backlog:storage_backlog.2] queue memory limit: 976.6K
[2021/07/12 18:17:40] [ info] [output:kafka:kafka.0] brokers='localhost:29093' topics='fluentbit-test'
[2021/07/12 18:17:40] [ info] [http_server] listen iface=0.0.0.0 tcp_port=2020
[2021/07/12 18:17:40] [ info] [sp] stream processor started
[2021/07/12 18:17:40] [ info] [input:tail:tail.0] inotify_fs_add(): inode=147501 watch_fd=1 name=/opt/service/businessActivity.log
[2021/07/12 18:18:36] [error] [output:kafka:kafka.0] fluent-bit#producer-1: [thrd:app]: fluent-bit#producer-1: ssl://localhost:29093/1: Connect to ipv4#127.0.0.1:29093 failed: Connection refused (after 0ms in state CONNECT)
[2021/07/12 18:18:36] [error] [output:kafka:kafka.0] fluent-bit#producer-1: [thrd:app]: fluent-bit#producer-1: ssl://localhost:29093/1: Connect to ipv4#127.0.0.1:29093 failed: Connection refused (after 0ms in state CONNECT, 1 identical e
rror(s) suppressed)
[2021/07/12 18:18:43] [error] % Failed to produce to topic fluentbit-test: Local: Queue full

[2021/07/12 18:18:43] [ warn] [output:kafka:kafka.0] internal queue is full, retrying in one second
[2021/07/12 18:18:44] [error] % Failed to produce to topic fluentbit-test: Local: Queue full

[2021/07/12 18:18:44] [ warn] [output:kafka:kafka.0] internal queue is full, retrying in one second
[2021/07/12 18:18:44] [ warn] [engine] failed to flush chunk '15381-1626106723.310681384.flb', retry in 6 seconds: task_id=1, input=tail.0 > output=kafka.0 (out_id=0)
[2021/07/12 18:18:45] [ warn] [engine] failed to flush chunk '15381-1626106724.295776787.flb', retry in 7 seconds: task_id=2, input=tail.0 > output=kafka.0 (out_id=0)
[2021/07/12 18:18:46] [ warn] [engine] failed to flush chunk '15381-1626106725.304509217.flb', retry in 11 seconds: task_id=3, input=tail.0 > output=kafka.0 (out_id=0)
[2021/07/12 18:18:47] [ warn] [engine] failed to flush chunk '15381-1626106726.305080618.flb', retry in 9 seconds: task_id=4, input=tail.0 > output=kafka.0 (out_id=0)
[...retries repeated]
  • restore the connection to kafka
  • from now on nothing will happen but “failed to flush chunk” errors no matter what
  • kill -9 the FLB process
  • now it starts working again
[2021/07/12 18:24:25] [ info] [storage] version=1.1.1, initializing...
[2021/07/12 18:24:25] [ info] [storage] root path '/opt/fluentbit/data/'
[2021/07/12 18:24:25] [ info] [storage] full synchronization mode, checksum disabled, max_chunks_up=128
[2021/07/12 18:24:25] [ info] [storage] backlog input plugin: storage_backlog.2
[2021/07/12 18:24:25] [ info] [input:storage_backlog:storage_backlog.2] queue memory limit: 976.6K
[2021/07/12 18:24:25] [ info] [input:storage_backlog:storage_backlog.2] register tail.0/15381-1626106722.295896938.flb
[2021/07/12 18:24:25] [ info] [input:storage_backlog:storage_backlog.2] register tail.0/15381-1626106723.310681384.flb
[2021/07/12 18:24:25] [ info] [input:storage_backlog:storage_backlog.2] register tail.0/15381-1626106724.295776787.flb
[2021/07/12 18:24:25] [ info] [input:storage_backlog:storage_backlog.2] register tail.0/15381-1626106725.304509217.flb
[... all flb from disk are registered]
[2021/07/12 18:24:25] [ info] [output:kafka:kafka.0] brokers='localhost:29093' topics='fluentbit-test'
[2021/07/12 18:24:25] [ info] [http_server] listen iface=0.0.0.0 tcp_port=2020
[2021/07/12 18:24:25] [ info] [sp] stream processor started
[2021/07/12 18:24:25] [ info] [input:tail:tail.0] inotify_fs_add(): inode=147507 watch_fd=1 name=/opt/service/businessActivity.log
[2021/07/12 18:24:26] [ info] [input:storage_backlog:storage_backlog.2] queueing tail.0:15381-1626106722.295896938.flb
[2021/07/12 18:24:26] [ info] [input:storage_backlog:storage_backlog.2] queueing tail.0:15381-1626106723.310681384.flb
[2021/07/12 18:24:26] [ info] [input:storage_backlog:storage_backlog.2] queueing tail.0:15381-1626106724.295776787.flb
[2021/07/12 18:24:26] [ info] [input:storage_backlog:storage_backlog.2] queueing tail.0:15381-1626106725.304509217.flb
[...]
[2021/07/12 18:24:27] [ info] [engine] flush backlog chunk '15381-1626106722.295896938.flb' succeeded: task_id=0, input=storage_backlog.2 > output=kafka.0 (out_id=0)
[2021/07/12 18:24:27] [ info] [engine] flush backlog chunk '15381-1626106723.310681384.flb' succeeded: task_id=1, input=storage_backlog.2 > output=kafka.0 (out_id=0)
[2021/07/12 18:24:27] [ info] [engine] flush backlog chunk '15381-1626106724.295776787.flb' succeeded: task_id=2, input=storage_backlog.2 > output=kafka.0 (out_id=0)
[2021/07/12 18:24:27] [ info] [engine] flush backlog chunk '15381-1626106725.304509217.flb' succeeded: task_id=3, input=storage_backlog.2 > output=kafka.0 (out_id=0)
  • it may happen that after restart everything locks again if it hits Queue full condition.

Your Environment

  • Version used: 1.7.9 installed from official Centos 7 packages
  • Server type and version: Centos 7
  • Configuration:
[SERVICE]
    flush        1
    daemon       Off
    log_file /opt/fluentbit/logs/fluentbit.log
    log_level    info
    parsers_file parsers.conf
    plugins_file plugins.conf
    http_server  On
    http_listen  0.0.0.0
    http_port    2020
    storage.metrics on
    storage.path /opt/fluentbit/data/
    storage.sync full
    storage.max_chunks_up 128
    storage.checksum off
    storage.backlog.mem_limit 1M

[INPUT]
    name tail
    path /opt/service/businessActivity.log*
    tag  activity.log
    refresh_interval 1
    rotate_wait 10
    ignore_older 10m
    db /opt/fluentbit/logs/registry.db
    read_from_head on
    parser json
    storage.type  filesystem

[OUTPUT]
    name kafka
    match activity.log
    format json
    message_key_field uuid
    brokers localhost:29093
    topics fluentbit-test
    timestamp_format iso8601
    # keep retrying indefinitely from engine level
    Retry_Limit false
    # limit retries from plugin level
    queue_full_retries 10
    # or try no limit from plugin level
    #queue_full_retries 0
    # ensure rdkafka tries to redeliver at least for an hour
    rdkafka.message.timeout.ms 3600000	
    # try to minimize number of messages stored in rdkafka buffer. Quite extreme, but the same happens for value =100 or 1000 just need to wait longer
    rdkafka.queue.buffering.max.messages 10
    rdkafka.log.connection.close false
    rdkafka.request.required.acks 1
    rdkafka.security.protocol ssl
    rdkafka.ssl.ca.location /opt/fluentbit/ca-cert
    rdkafka.ssl.endpoint.identification.algorithm none
    rdkafka.ssl.key.location /opt/fluentbit/client.key
    rdkafka.ssl.key.password ******
    rdkafka.ssl.certificate.location /opt/fluentbit/client-cert-signed
    storage.total_limit_size 100M

Additional context I compare fluent-bit with its direct competitor to decide which one is up for the job. Help me choose fluent-bit 😃 I have a requirement to ensure no messages are lost from the event stream in the environment of unstable connection.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 2
  • Comments: 18 (2 by maintainers)

Commits related to this issue

Most upvoted comments

@edsiper Can you take a look, please? I’m now more concerned with that bug (kafka output getting stuck after filling the rdkafka queue) than achieving 100% persistency. This bug dramatically increases the chances that one needs to kill fluent-bit process when rdkafka queue is full, which is a guaranteed data loss.

Issue was introduced in 1.7.2 due to this commit. https://github.com/fluent/fluent-bit/commit/5edf91a3e1fe940f0359ece47f502f7db45e5865

else if (timer->type == FLB_SCHED_TIMER_CB_ONESHOT) {
        consume_byte(timer->timer_fd);
        flb_sched_timer_cb_disable(timer);
        timer->cb(config, timer->data);
        flb_sched_timer_cb_destroy(timer);
}

Basically, what is happening is flb_sched_timer_cb_disable(timer) calls close(timer->timer_fd) and flb_sched_timer_cb_destroy(timer) calls close on (timer->event.fd). The fd is the same in timer->timer_fd and timer_>event.fd. The same fd gets quickly recycled in between flb_sched_timer_cb_disable(timer) and flb_sched_timer_cb_destroy(timer) , so the second close actually cancels out next flb_time_sleep(1000) that was initiated and flb_time_sleep(1000) gets stuck forever.

I will attempt to fix it and submit PR in the next couple of days.

I just tested the recently released version 1.8.8 with my reproduction project and it works fine - no bug present. 👍