vector: Perfomance degradation in elasticsearch sink when using compression (gzip or zlib) after v0.34.0

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

[Tl;DR] After vector version 0.33.1 (since 0.34.0), the elastic search sink performance is very poor when using compression option. (50% degraded of throughput and cpu & memory usage is very high)

Recently I upgraded vector from 0.27 to 0.36, then I can observe the performance degrade. So I do regression test to point out which version is the problem. I can find that after 0.34.0 the elasticsearch sink performance is very poor.

image image

After observe the performance degrade, I do perf profiling v0.33.1 vs v0.34.0, then I can find difference between two version’s flame graph

[Version v0.33.1] perf-0 33 1

[Version v0.34.0] perf-34

After version v0.34.0, the memory set system call count is very high when to do GzEncoder (I also confirm that this call stack occur when I using the zlib compression rather than gzip)

After these debugging, I turn off the compression and then I can observe the performance is almost same with previous version. image

But of course, the memory usage is high because the compression is disabled image

I think this problem is related to https://github.com/vectordotdev/vector/pull/18850 and https://github.com/rust-lang/flate2-rs/issues/395

Thank you.

Configuration

[api]
      enabled = true
      address = "0.0.0.0:9600"
    [log_schema]
      host_key = "_vector_host"
      metadata_key = "_vector_metadata"
      source_type_key = "_vector_source"
      message_key = "_vector_message"
      timestamp_key = "_vector_timestamp"
    [sources.kafka]
      type = "kafka"
      bootstrap_servers = "***"
      topics = ["***"]
      group_id = "***"
      decoding.codec = "json"
      auto_offset_reset = "latest"
      fetch_wait_max_ms = 10000
      session_timeout_ms = 10000
      headers_key = "\"@metadata\".kafka.headers"
      key_field =  "\"@metadata\".kafka.key"
      offset_key =  "\"@metadata\".kafka.offset"
      partition_key = "\"@metadata\".kafka.partition"
      topic_key =  "\"@metadata\".kafka.topic"
      [sources.kafka.librdkafka_options]
        "client.id" = "***"
        "max.poll.interval.ms" = "300000"
        "heartbeat.interval.ms" = "5000"
        "fetch.min.bytes" = "102400"
        "socket.receive.buffer.bytes" = "4096000"
    [sinks.to_es]
      type = "elasticsearch"
      inputs = ["kafka"]
      endpoint = "***"
      encoding.except_fields = ["_vector_source", "_vector_timestamp", "@metadata"]
      mode = "bulk"
      bulk.action = "index"
      bulk.index = "***"
      id_key = "\"@metadata\".documentId"
      bulk.max_bytes = 7680000
      compression = "gzip"
      request.timeout_secs = 7200
      request.rate_limit_duration_secs = 1
      request.rate_limit_num = 15
      buffer.max_events = 5000
      request_retry_partial = true


    [sources.internal_metrics]
      type = "internal_metrics"
      scrape_interval_secs = 15
      namespace = "vector"

    [sinks.internal_metrics_to_prometheus]
      inputs = ["internal_metrics"]
      type = "prometheus_exporter"
      address = "0.0.0.0:9601"
      default_namespace = "default"

Version

v0.34.0

Debug Output

No response

Example Data

No response

Additional Context

No response

References

No response

About this issue

  • Original URL
  • State: closed
  • Created 4 months ago
  • Reactions: 2
  • Comments: 15 (10 by maintainers)

Commits related to this issue

Most upvoted comments

@Byron This is just a draft version of the patch to verify the idea. It should be moved to Compressor.

diff --git a/src/sinks/elasticsearch/encoder.rs b/src/sinks/elasticsearch/encoder.rs
index cdee78120..95221d845 100644
--- a/src/sinks/elasticsearch/encoder.rs
+++ b/src/sinks/elasticsearch/encoder.rs
@@ -1,4 +1,4 @@
-use std::{io, io::Write};
+use std::{io, io::BufWriter, io::Write};
 
 use serde::Serialize;
 use vector_buffers::EventCount;
@@ -72,6 +72,7 @@ impl Encoder<Vec<ProcessedEvent>> for ElasticsearchEncoder {
     ) -> std::io::Result<(usize, GroupedCountByteSize)> {
         let mut written_bytes = 0;
         let mut byte_size = telemetry().create_request_count_byte_size();
+        let mut buffered_writer = BufWriter::with_capacity(10_240, writer);
         for event in input {
             let log = {
                 let mut event = Event::from(event.log);
@@ -81,7 +82,7 @@ impl Encoder<Vec<ProcessedEvent>> for ElasticsearchEncoder {
                 event.into_log()
             };
             written_bytes += write_bulk_action(
-                writer,
+                &mut buffered_writer,
                 event.bulk_action.as_str(),
                 &event.index,
                 &self.doc_type,
@@ -89,14 +90,15 @@ impl Encoder<Vec<ProcessedEvent>> for ElasticsearchEncoder {
                 &event.id,
             )?;
             written_bytes +=
-                as_tracked_write::<_, _, io::Error>(writer, &log, |mut writer, log| {
-                    writer.write_all(&[b'\n'])?;
-                    serde_json::to_writer(&mut writer, log)?;
-                    writer.write_all(&[b'\n'])?;
+                as_tracked_write::<_, _, io::Error>(&mut buffered_writer, &log, |mut buffered_writer, log| {
+                    buffered_writer.write_all(&[b'\n'])?;
+                    serde_json::to_writer(&mut buffered_writer, log)?;
+                    buffered_writer.write_all(&[b'\n'])?;
                     Ok(())
                 })?;
         }
 
+        buffered_writer.flush()?;
         Ok((written_bytes, byte_size))
     }
 }

I think the solution make sense though, as you suggest, it should be implemented in the Compressor so that all sinks can benefit.

Without buffer writer Vector is calling encoder writer::write for every small amount of data (10 bytes on average). With 32KB inner buffer in flate2 it causes resizing with memset for each write call.

Thanks for the hint! I think I put the pieces together now. The problem is that the internal buffer is created with 32kb capacity. However, with each small write it’s memset to its full capacity, just to be truncated to what’s actually written right after.

I will report this over at flate2 in the hopes that this is fixable, even though in a way the issue that it uncovered along with a fix that further improves performance compared to the previous version seems like a net-positive.

Hey, Maybe, it’s worth adding a buffered writer for *Encoder writers? I’ve tried to use 10KB buffer and performance improved a lot, similar to v0.33.0.

Vector at revision b3889bcea flamegraph-orig

Vector at revision b3889bcea with buffered writer flamegraph-optimized