fluentd: fluentd is stuck in loop when overflow_action block occurs

Describe the bug I am running fluentd in a kubernetes cluster. I have one input forwarding plugin and two output buffered plugin. When I set the output plugin to overflow_action block in the logging plugin, it seems like fluentd gets stuck in a loop and never recovers.

To Reproduce This is my config:

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-log-aggregator-config
  namespace: kube-system
data:
  fluent.conf: |
    <source>
      @type forward
      port 24225
      @log_level debug
    </source>

    <source>
      @type monitor_agent
      bind 0.0.0.0
      port 24220
    </source>

    <match ** >
      @type copy

      <store ignore_error>
        @type logging
        @log_level debug
        url xxxxxx
        <buffer>
          @type memory
          chunk_limit_size 8m
          queued_chunks_limit_size 256
          flush_thread_count 5
          flush_interval 10s
          flush_mode interval
          retry_max_times 8
          retry_wait 1s
          retry_max_interval 10s
          overflow_action block
        </buffer>
      </store>

      <store ignore_error>
        @type metrics_collector
        publish_metrics_interval 30
        flush_interval 20s   
        monitor logging
      </store>
    </match>

This is the error I get:

2019-08-19 15:38:46 +0000 [warn]: #0 buffer flush took longer time than slow_flush_log_threshold: elapsed_time=59.66392836498562 slow_flush_log_threshold=20.0 plugin_id="object:3ffff7d7e114"
2019-08-19 15:39:20 +0000 [warn]: #0 buffer flush took longer time than slow_flush_log_threshold: elapsed_time=33.92090093600564 slow_flush_log_threshold=20.0 plugin_id="object:3ffff7d7e114"
2019-08-19 15:39:27 +0000 [warn]: #0 failed to write data into buffer by buffer overflow action=:block
2019-08-19 15:39:27 +0000 [debug]: #0 buffer.write is now blocking
2019-08-19 15:39:27 +0000 [warn]: #0 failed to write data into buffer by buffer overflow action=:block
2019-08-19 15:39:27 +0000 [debug]: #0 buffer.write is now blocking
2019-08-19 15:39:58 +0000 [warn]: #0 failed to write data into buffer by buffer overflow action=:block
2019-08-19 15:39:58 +0000 [debug]: #0 buffer.write is now blocking

When I enable @log_level trace, I get the following logs:

2019-08-19 17:06:33 +0000 [trace]: #0 adding metadata instance=70368625388440 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables=nil>
2019-08-19 17:06:33 +0000 [trace]: #0 adding metadata instance=70368625388440 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables=nil>
2019-08-19 17:06:33 +0000 [trace]: #0 writing events into buffer instance=70368625388440 metadata_size=1
2019-08-19 17:06:33 +0000 [trace]: #0 writing events into buffer instance=70368625388440 metadata_size=1
2019-08-19 17:06:34 +0000 [trace]: #0 enqueueing all chunks in buffer instance=70368625388440
2019-08-19 17:06:34 +0000 [trace]: #0 enqueueing chunk instance=70368625388440 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables=nil>
2019-08-19 17:06:34 +0000 [trace]: #0 adding metadata instance=70368625388440 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables=nil>
2019-08-19 17:06:34 +0000 [warn]: #0 failed to write data into buffer by buffer overflow action=:block
2019-08-19 17:06:34 +0000 [debug]: #0 buffer.write is now blocking
2019-08-19 17:06:34 +0000 [trace]: #0 sleeping until buffer can store more data
2019-08-19 17:06:35 +0000 [trace]: #0 adding metadata instance=70368625388440 metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables=nil>
2019-08-19 17:06:35 +0000 [warn]: #0 failed to write data into buffer by buffer overflow action=:block
2019-08-19 17:06:35 +0000 [debug]: #0 buffer.write is now blocking
2019-08-19 17:06:35 +0000 [trace]: #0 sleeping until buffer can store more data
2019-08-19 17:06:35 +0000 [trace]: #0 enqueueing all chunks in buffer instance=70368625388440
2019-08-19 17:06:35 +0000 [trace]: #0 sleeping until buffer can store more data
2019-08-19 17:06:36 +0000 [trace]: #0 sleeping until buffer can store more data
...
2019-08-19 17:09:51 +0000 [trace]: #0 enqueueing all chunks in buffer instance=70368625388440
2019-08-19 17:09:51 +0000 [trace]: #0 sleeping until buffer can store more data
2019-08-19 17:09:52 +0000 [trace]: #0 sleeping until buffer can store more data
2019-08-19 17:09:52 +0000 [trace]: #0 sleeping until buffer can store more data
2019-08-19 17:09:52 +0000 [trace]: #0 enqueueing all chunks in buffer instance=70368625388440
2019-08-19 17:09:52 +0000 [trace]: #0 sleeping until buffer can store more data
2019-08-19 17:09:53 +0000 [trace]: #0 sleeping until buffer can store more data
2019-08-19 17:09:53 +0000 [trace]: #0 sleeping until buffer can store more data
2019-08-19 17:09:53 +0000 [trace]: #0 enqueueing all chunks in buffer instance=70368625388440

From the logs I can see that it is stuck in the inifinite loop and it never exits. https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin/output.rb#L885

Expected behavior The expected behaviour is that fluentd calls the write function when blocking. If we do not call the write then the buffer will never be emptied out so we will be stuck in the loop.

Your Environment

  • Fluentd v1.6.3, ruby v2.6.3
  • using fluentd debian image
  • running fluentd in kubernetes

Someone also reported a similar issue: https://github.com/uken/fluent-plugin-elasticsearch/issues/609

@repeatedly wondering if you have seen the same thing? If so I can create a PR to fix the issue.

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 16 (15 by maintainers)

Most upvoted comments

I’m not sure but the flush thread which is created here isn’t blocked? then I think it won’t be blocked forever.

Yep. You are right. flush thread won’t be blocked.

Here is another diff and log:

diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb
index 7c2930fa..a339de86 100644
--- a/lib/fluent/plugin/buffer.rb
+++ b/lib/fluent/plugin/buffer.rb
@@ -224,6 +224,9 @@ module Fluent
       end
 
       def storable?
+        log.trace "executing storable?..."
+        log.trace "storable? internal info:", total_limit_size: @total_limit_size, staged_size: @staged_size, que
ue_size: @queue_size, clause: (@total_limit_size > @stage_size + @queue_size), instance: self.object_id
+
         @total_limit_size > @stage_size + @queue_size
       end
 
diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb
index a3d44b9b..5eb65178 100644
--- a/lib/fluent/plugin/output.rb
+++ b/lib/fluent/plugin/output.rb
@@ -1414,6 +1414,7 @@ module Fluent
         begin
           # This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` p
hase
           while @output_flush_threads_running
+            log.trace "flush_thread_running"
             current_clock = Fluent::Clock.now
             next_retry_time = nil
 
@@ -1428,6 +1429,7 @@ module Fluent
             else
               state.mutex.unlock
               begin
+                log.trace "flush_thread: try flushing"
                 try_flush
                 # next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying)
                 interval = next_flush_time.to_f - Time.now.to_f
@@ -1451,6 +1453,7 @@ module Fluent
               end
             end
 
+            log.trace "flush_thread: cond_var.wait"
             state.cond_var.wait(state.mutex, interval) if interval > 0
           end
         rescue => e

If a storage which will store records from Fluentd has a capacity, it won’t be blocked forever:

2019-12-06 15:25:11 +0900 [trace]: #0 storable? internal info: total_limit_size=12582912 staged_size=nil queue_size=12146850 clause=true instance=47224034617900
2019-12-06 15:25:11 +0900 [trace]: #0 writing events into buffer instance=47224034617900 metadata_size=1
2019-12-06 15:25:11 +0900 [trace]: #0 executing storable?...
2019-12-06 15:25:11 +0900 [trace]: #0 storable? internal info: total_limit_size=12582912 staged_size=nil queue_size=12146850 clause=true instance=47224034617900
2019-12-06 15:25:11 +0900 [trace]: #0 writing events into buffer instance=47224034617900 metadata_size=1
2019-12-06 15:25:11 +0900 [trace]: #0 executing storable?...
2019-12-06 15:25:11 +0900 [trace]: #0 storable? internal info: total_limit_size=12582912 staged_size=nil queue_size=12146850 clause=true instance=47224034617900
2019-12-06 15:25:11 +0900 [trace]: #0 writing events into buffer instance=47224034617900 metadata_size=1
2019-12-06 15:25:11 +0900 [trace]: #0 executing storable?...
2019-12-06 15:25:11 +0900 [trace]: #0 storable? internal info: total_limit_size=12582912 staged_size=nil queue_size=12146850 clause=false instance=47224034617900
2019-12-06 15:25:11 +0900 [warn]: #0 failed to write data into buffer by buffer overflow action=:block
2019-12-06 15:25:11 +0900 [debug]: #0 buffer.write is now blocking
2019-12-06 15:25:11 +0900 [trace]: #0 executing storable?...
2019-12-06 15:25:11 +0900 [trace]: #0 storable? internal info: total_limit_size=12582912 staged_size=nil queue_size=12146850 clause=false instance=47224034617900
2019-12-06 15:25:11 +0900 [trace]: #0 sleeping until buffer can store more data
2019-12-06 15:25:12 +0900 [info]: #0 plugin:out_flowcounter_simple      count:50612     indicator:num   unit:second
2019-12-06 15:25:12 +0900 [trace]: #0 bulk response: {"took"=>398, "errors"=>false, "items"=>[{"index"=>{"_index"=>"fluentd", "_type"=>"_doc", "_id"=>"Hiji2W4BSjvbIHCeNkEx", "_ver$
ted", "_shards"=>{"total"=>2, "successful"=>1, "failed"=>0}, "_seq_no"=>10318947, "_primary_term"=>1, "status"=>201}}, {"index"=>{"_index"=>"fluentd", "_type"=>"_doc", "_id"=>"Qyj$
"index"=>{"_index"=>"fluentd", "_type"=>"_doc", "_id"=>"Kyji2W4BSjvbIHCeNpU2", "_version"=>1, "result"=>"created", "_shards"=>{"total"=>2, "successful"=>1, "failed"=>0}, "_seq_no"$
rds"=>{"total"=>2, "successful"=>1, "failed"=>0}, "_seq_no"=>10327604, "_primary_term"=>1, "status"=>201}}, {"index"=>{"_index"=>"fluentd", "_type"=>"_doc", "_id"=>"FCji2W4BSjvbIH$
IHCeNpY2", "_version"=>1, "result"=>"created", "_shards"=>{"total"=>2, "successful"=>1, "failed"=>0}, "_seq_no"=>10327640, "_primary_term"=>1, "status"=>201}}, {"index"=>{"_index"$
2019-12-06 15:25:12 +0900 [trace]: #0 write operation done, committing chunk="599031b75c47e94d5b20d5202f280ac2"
2019-12-06 15:25:12 +0900 [trace]: #0 committing write operation to a chunk chunk="599031b75c47e94d5b20d5202f280ac2" delayed=false
2019-12-06 15:25:12 +0900 [trace]: #0 purging a chunk instance=47224034617900 chunk_id="599031b75c47e94d5b20d5202f280ac2" metadata=#<struct Fluent::Plugin::Buffer::Metadata timeke$
2019-12-06 15:25:12 +0900 [trace]: #0 chunk purged instance=47224034617900 chunk_id="599031b75c47e94d5b20d5202f280ac2" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=n$
2019-12-06 15:25:12 +0900 [trace]: #0 done to commit a chunk chunk="599031b75c47e94d5b20d5202f280ac2"
2019-12-06 15:25:12 +0900 [trace]: #0 flush_thread: cond_var.wait
2019-12-06 15:25:13 +0900 [trace]: #0 enqueueing all chunks in buffer instance=47224034617900
2019-12-06 15:25:13 +0900 [trace]: #0 executing storable?...
2019-12-06 15:25:13 +0900 [trace]: #0 storable? internal info: total_limit_size=12582912 staged_size=nil queue_size=9110160 clause=true instance=47224034617900
2019-12-06 15:25:13 +0900 [debug]: #0 retrying buffer.write after blocked operation
2019-12-06 15:25:13 +0900 [trace]: #0 executing storable?...
2019-12-06 15:25:13 +0900 [trace]: #0 storable? internal info: total_limit_size=12582912 staged_size=nil queue_size=9110160 clause=true instance=47224034617900
2019-12-06 15:25:13 +0900 [trace]: #0 writing events into buffer instance=47224034617900 metadata_size=1
2019-12-06 15:25:13 +0900 [trace]: #0 flush_thread_running
2019-12-06 15:25:13 +0900 [trace]: #0 flush_thread: try flushing
2019-12-06 15:25:13 +0900 [trace]: #0 dequeueing a chunk instance=47224034617900

Or other situations?

Maybe, yes. The previous log caused by index writing freezed Elasticsearch cluster. For example, exhausted Elasticsearch cluster didn’t permit incoming bulk request, then Fluentd cannot process flush thread. So, flushing process won’t progress and then buffer#storable? always returns false.

I used the following configuration to reproduce this issue.

<source>
  @type tail
  path /home/hhatake/GitHub/dummer/dummer.log
  pos_file dummer.log.pos
  read_from_head true
  tag raw.tailing
  <parse>
    @type ltsv
  </parse>
</source>
<match raw.**>
  @type copy
  <store>
    @type elasticsearch
    type_name _doc
    # with_transporter_log true
    @log_level trace
    <buffer tag>
      @type memory
      chunk_limit_size 3m
      total_limit_size 12m
      queued_chunks_limit_size 2
      flush_thread_count 1
      flush_interval 1s
      flush_mode interval
      retry_max_times 8
      retry_wait 1s
      retry_max_interval 10s
      overflow_action block
    </buffer>
  </store>
  <store>
    @type flowcounter_simple
  </store>
</match>
<system>
  @log_level trace
</system>