fluent-bit: kafka output is stuck when rdkafka queue is saturated - regression since 1.7.2
Not sure if this more a question, a bug or a feature request 😃
This issue report started off from a question: “How to ensure that messages in rdkafka’s queue are not lost when fluent-bit is restarted?”. I’m still interested in an answer, but I think more important now is fixing of the bug uncovered while investigating the subject.
My inputs are tailed files with json log, output is kafka.
I investigated how to minimize the possibility of losing messages in following scenarios: A) connection to kafka is lost, B) fluent-bit process is restarted , C) both above at once.
FLB seems to handle A) after increasing rdkafka.message.timeout.ms
, B) is also OK thanks to offset stored in local registry.
But I noticed that restarts lead to data loss when the connection is down © because there can be messages pending inside rdkafka local queue, which are considered delivered by FLB. This was noticed by @Charles-WYC in this comment to #514.
With some community help I tried to use FLB filesystem storage feature, but it doesn’t seem to work very well with kafka output - new events are pushed quickly to local rdkafka queue as long as there is space in it and storage does not take into account if actual sending succeeds. I tried to disable that rdkafka buffer by setting it to minimal sizes: like rdkafka.queue.buffering.max.messages 1
or 100
to force the use of FLB storage, but that quickly lead to a deadlock soon after connection to kafka is lost. It looks very much like issues #2161 #2244 :
[2021/07/12 08:48:48] [error] % Failed to produce to topic fluentbit-test: Local: Queue full
[2021/07/12 08:48:48] [ warn] [output:kafka:kafka.0] internal queue is full, retrying in one second
[2021/07/12 08:48:57] [ warn] [engine] failed to flush chunk '3303-1626072536.595600551.flb', retry in 10 seconds: task_id=1, input=tail.0 > output=kafka.0 (out_id=0)
[2021/07/12 08:49:07] [ warn] [engine] failed to flush chunk '3303-1626072536.595600551.flb', retry in 18 seconds: task_id=1, input=tail.0 > output=kafka.0 (out_id=0)
[...failed to flush continues indefinitely ]
FLB does not recover from this even after kafka connection is restored. It continues to work OK after restarting the process though, picking up queue data from filestytem if present. I tried both with queue_full_retries 0
and queue_full_retries 10
but with same results. Not sure if this is a bug or if just such extreme config is not supported. This looks like a bug to me.
Is there any other way to ensure messages are not lost when FLB process aborts with non-empty rdkafka queue and no possibility to flush it?
Steps to reproduce the problem:
- set up small rdkafka buffer see the configuration below
- run some traffic like 5 messages / s
- disable kafka connection
- wait for the
Queue full
to happen:
[2021/07/12 18:17:26] [ info] [sp] stream processor started
[2021/07/12 18:17:26] [ info] [input:tail:tail.0] inotify_fs_add(): inode=147501 watch_fd=1 name=/opt/service/businessActivity.log
[2021/07/12 18:17:40] [ info] [engine] started (pid=15381)
[2021/07/12 18:17:40] [ info] [storage] version=1.1.1, initializing...
[2021/07/12 18:17:40] [ info] [storage] root path '/opt/fluentbit/data/'
[2021/07/12 18:17:40] [ info] [storage] full synchronization mode, checksum disabled, max_chunks_up=128
[2021/07/12 18:17:40] [ info] [storage] backlog input plugin: storage_backlog.2
[2021/07/12 18:17:40] [ info] [input:storage_backlog:storage_backlog.2] queue memory limit: 976.6K
[2021/07/12 18:17:40] [ info] [output:kafka:kafka.0] brokers='localhost:29093' topics='fluentbit-test'
[2021/07/12 18:17:40] [ info] [http_server] listen iface=0.0.0.0 tcp_port=2020
[2021/07/12 18:17:40] [ info] [sp] stream processor started
[2021/07/12 18:17:40] [ info] [input:tail:tail.0] inotify_fs_add(): inode=147501 watch_fd=1 name=/opt/service/businessActivity.log
[2021/07/12 18:18:36] [error] [output:kafka:kafka.0] fluent-bit#producer-1: [thrd:app]: fluent-bit#producer-1: ssl://localhost:29093/1: Connect to ipv4#127.0.0.1:29093 failed: Connection refused (after 0ms in state CONNECT)
[2021/07/12 18:18:36] [error] [output:kafka:kafka.0] fluent-bit#producer-1: [thrd:app]: fluent-bit#producer-1: ssl://localhost:29093/1: Connect to ipv4#127.0.0.1:29093 failed: Connection refused (after 0ms in state CONNECT, 1 identical e
rror(s) suppressed)
[2021/07/12 18:18:43] [error] % Failed to produce to topic fluentbit-test: Local: Queue full
[2021/07/12 18:18:43] [ warn] [output:kafka:kafka.0] internal queue is full, retrying in one second
[2021/07/12 18:18:44] [error] % Failed to produce to topic fluentbit-test: Local: Queue full
[2021/07/12 18:18:44] [ warn] [output:kafka:kafka.0] internal queue is full, retrying in one second
[2021/07/12 18:18:44] [ warn] [engine] failed to flush chunk '15381-1626106723.310681384.flb', retry in 6 seconds: task_id=1, input=tail.0 > output=kafka.0 (out_id=0)
[2021/07/12 18:18:45] [ warn] [engine] failed to flush chunk '15381-1626106724.295776787.flb', retry in 7 seconds: task_id=2, input=tail.0 > output=kafka.0 (out_id=0)
[2021/07/12 18:18:46] [ warn] [engine] failed to flush chunk '15381-1626106725.304509217.flb', retry in 11 seconds: task_id=3, input=tail.0 > output=kafka.0 (out_id=0)
[2021/07/12 18:18:47] [ warn] [engine] failed to flush chunk '15381-1626106726.305080618.flb', retry in 9 seconds: task_id=4, input=tail.0 > output=kafka.0 (out_id=0)
[...retries repeated]
- restore the connection to kafka
- from now on nothing will happen but “failed to flush chunk” errors no matter what
- kill -9 the FLB process
- now it starts working again
[2021/07/12 18:24:25] [ info] [storage] version=1.1.1, initializing...
[2021/07/12 18:24:25] [ info] [storage] root path '/opt/fluentbit/data/'
[2021/07/12 18:24:25] [ info] [storage] full synchronization mode, checksum disabled, max_chunks_up=128
[2021/07/12 18:24:25] [ info] [storage] backlog input plugin: storage_backlog.2
[2021/07/12 18:24:25] [ info] [input:storage_backlog:storage_backlog.2] queue memory limit: 976.6K
[2021/07/12 18:24:25] [ info] [input:storage_backlog:storage_backlog.2] register tail.0/15381-1626106722.295896938.flb
[2021/07/12 18:24:25] [ info] [input:storage_backlog:storage_backlog.2] register tail.0/15381-1626106723.310681384.flb
[2021/07/12 18:24:25] [ info] [input:storage_backlog:storage_backlog.2] register tail.0/15381-1626106724.295776787.flb
[2021/07/12 18:24:25] [ info] [input:storage_backlog:storage_backlog.2] register tail.0/15381-1626106725.304509217.flb
[... all flb from disk are registered]
[2021/07/12 18:24:25] [ info] [output:kafka:kafka.0] brokers='localhost:29093' topics='fluentbit-test'
[2021/07/12 18:24:25] [ info] [http_server] listen iface=0.0.0.0 tcp_port=2020
[2021/07/12 18:24:25] [ info] [sp] stream processor started
[2021/07/12 18:24:25] [ info] [input:tail:tail.0] inotify_fs_add(): inode=147507 watch_fd=1 name=/opt/service/businessActivity.log
[2021/07/12 18:24:26] [ info] [input:storage_backlog:storage_backlog.2] queueing tail.0:15381-1626106722.295896938.flb
[2021/07/12 18:24:26] [ info] [input:storage_backlog:storage_backlog.2] queueing tail.0:15381-1626106723.310681384.flb
[2021/07/12 18:24:26] [ info] [input:storage_backlog:storage_backlog.2] queueing tail.0:15381-1626106724.295776787.flb
[2021/07/12 18:24:26] [ info] [input:storage_backlog:storage_backlog.2] queueing tail.0:15381-1626106725.304509217.flb
[...]
[2021/07/12 18:24:27] [ info] [engine] flush backlog chunk '15381-1626106722.295896938.flb' succeeded: task_id=0, input=storage_backlog.2 > output=kafka.0 (out_id=0)
[2021/07/12 18:24:27] [ info] [engine] flush backlog chunk '15381-1626106723.310681384.flb' succeeded: task_id=1, input=storage_backlog.2 > output=kafka.0 (out_id=0)
[2021/07/12 18:24:27] [ info] [engine] flush backlog chunk '15381-1626106724.295776787.flb' succeeded: task_id=2, input=storage_backlog.2 > output=kafka.0 (out_id=0)
[2021/07/12 18:24:27] [ info] [engine] flush backlog chunk '15381-1626106725.304509217.flb' succeeded: task_id=3, input=storage_backlog.2 > output=kafka.0 (out_id=0)
- it may happen that after restart everything locks again if it hits
Queue full
condition.
Your Environment
- Version used: 1.7.9 installed from official Centos 7 packages
- Server type and version: Centos 7
- Configuration:
[SERVICE]
flush 1
daemon Off
log_file /opt/fluentbit/logs/fluentbit.log
log_level info
parsers_file parsers.conf
plugins_file plugins.conf
http_server On
http_listen 0.0.0.0
http_port 2020
storage.metrics on
storage.path /opt/fluentbit/data/
storage.sync full
storage.max_chunks_up 128
storage.checksum off
storage.backlog.mem_limit 1M
[INPUT]
name tail
path /opt/service/businessActivity.log*
tag activity.log
refresh_interval 1
rotate_wait 10
ignore_older 10m
db /opt/fluentbit/logs/registry.db
read_from_head on
parser json
storage.type filesystem
[OUTPUT]
name kafka
match activity.log
format json
message_key_field uuid
brokers localhost:29093
topics fluentbit-test
timestamp_format iso8601
# keep retrying indefinitely from engine level
Retry_Limit false
# limit retries from plugin level
queue_full_retries 10
# or try no limit from plugin level
#queue_full_retries 0
# ensure rdkafka tries to redeliver at least for an hour
rdkafka.message.timeout.ms 3600000
# try to minimize number of messages stored in rdkafka buffer. Quite extreme, but the same happens for value =100 or 1000 just need to wait longer
rdkafka.queue.buffering.max.messages 10
rdkafka.log.connection.close false
rdkafka.request.required.acks 1
rdkafka.security.protocol ssl
rdkafka.ssl.ca.location /opt/fluentbit/ca-cert
rdkafka.ssl.endpoint.identification.algorithm none
rdkafka.ssl.key.location /opt/fluentbit/client.key
rdkafka.ssl.key.password ******
rdkafka.ssl.certificate.location /opt/fluentbit/client-cert-signed
storage.total_limit_size 100M
Additional context I compare fluent-bit with its direct competitor to decide which one is up for the job. Help me choose fluent-bit 😃 I have a requirement to ensure no messages are lost from the event stream in the environment of unstable connection.
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Reactions: 2
- Comments: 18 (2 by maintainers)
Commits related to this issue
- minimal reproducible example for deadlock condition reported in https://github.com/fluent/fluent-bit/issues/3766 — committed to tporeba/fluentbit-3766-reproductor by deleted user 3 years ago
- scheduler: fix race condition of double closing of oneshot timers (#3766) Kafka output plugin does oneshot timer of 1 second via flb_time_sleep(1000) when the queue is full and will retry up to 10 ti... — committed to fluent/fluent-bit by danlenar 3 years ago
- scheduler: fix race condition of double closing of oneshot timers (#3766) Kafka output plugin does oneshot timer of 1 second via flb_time_sleep(1000) when the queue is full and will retry up to 10 ti... — committed to fluent/fluent-bit by danlenar 3 years ago
- scheduler: fix race condition of double closing of oneshot timers (#3766) Kafka output plugin does oneshot timer of 1 second via flb_time_sleep(1000) when the queue is full and will retry up to 10 ti... — committed to fluent/fluent-bit by danlenar 3 years ago
@edsiper Can you take a look, please? I’m now more concerned with that bug (kafka output getting stuck after filling the rdkafka queue) than achieving 100% persistency. This bug dramatically increases the chances that one needs to kill fluent-bit process when rdkafka queue is full, which is a guaranteed data loss.
Issue was introduced in 1.7.2 due to this commit. https://github.com/fluent/fluent-bit/commit/5edf91a3e1fe940f0359ece47f502f7db45e5865
Basically, what is happening is flb_sched_timer_cb_disable(timer) calls close(timer->timer_fd) and flb_sched_timer_cb_destroy(timer) calls close on (timer->event.fd). The fd is the same in timer->timer_fd and timer_>event.fd. The same fd gets quickly recycled in between flb_sched_timer_cb_disable(timer) and flb_sched_timer_cb_destroy(timer) , so the second close actually cancels out next flb_time_sleep(1000) that was initiated and flb_time_sleep(1000) gets stuck forever.
I will attempt to fix it and submit PR in the next couple of days.
I just tested the recently released version 1.8.8 with my reproduction project and it works fine - no bug present. 👍