ruby-kafka: Infinite reprocessing of messages from Snappy compressed producer

We have a message producer written in Go and a consumer written in Ruby. It appears that enabling snappy compression on the producer causes messages to be infinitely reprocessed in the Ruby consumer.

The example below works fine on 0.4.3 and a git bisect shows that it may have been introduced by 7acbd7b. I’ve tried to reduce the problem to the smallest reproducible version (consumer groups are not necessary) though I have not tried to recreate it with a Ruby producer.

  • Ruby: 2.4.2
  • Kafka: 0.11.0.1
  • ruby-kafka: 0.5.0 (and master@6a599c71)

Steps to reproduce

In the code below I have Kafka running locally in docker at 172.18.0.3:9092, you may need to change the broker list to get it working locally.

log: https://gist.github.com/gaffneyc/f2de66eceb7a4c2f9967c0ba4acda402

go get -u github.com/Shopify/sarama
go run producer.go
ruby consumer.rb
producer.go
// go get -u github.com/Shopify/sarama
// go run producer.go
package main

import (
  "strconv"

  "github.com/Shopify/sarama"
)

func main() {
  cfg := sarama.NewConfig()
  cfg.Version = sarama.V0_10_2_0
  cfg.Producer.Return.Successes = true
  cfg.Producer.Return.Errors = true

  // Enabling Compression appears to cause the problem
  cfg.Producer.Compression = sarama.CompressionSnappy

  brokers := []string{"172.18.0.3:9092"}
  producer, err := sarama.NewSyncProducer(brokers, cfg)
  if err != nil {
    panic(err)
  }

  // This only appears to happen with several message on the topic
  for i := 0; i < 30; i++ {
    _, _, err = producer.SendMessage(&sarama.ProducerMessage{
      Topic: "signals",
      Key:   sarama.StringEncoder("abcdefgh12"),
      Value: sarama.StringEncoder(strconv.Itoa(i)),
    })
    if err != nil {
      panic(err)
    }
  }
}
consumer.rb
# Place this in the project directory
$:.unshift File.expand_path("../lib", __FILE__)
require "ruby-kafka"

kafka = Kafka.new(seed_brokers: "172.18.0.3:9092")
trap("INT") { kafka.close; exit(0) }

kafka.each_message(topic: "signals") do |msg|
  puts msg.inspect
end

Expected outcome

Messages are processed and the consumer waits for the next one available

Actual outcome

Messages are processed in an infinite loop. For a topic with a large number of messages it appears that only a subset may be processed.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 23 (1 by maintainers)

Most upvoted comments

Awesome! Thank you for getting a fix in there and building ruby-kafka in the first place.