ruby-kafka: Infinite reprocessing of messages from Snappy compressed producer
We have a message producer written in Go and a consumer written in Ruby. It appears that enabling snappy compression on the producer causes messages to be infinitely reprocessed in the Ruby consumer.
The example below works fine on 0.4.3 and a git bisect
shows that it may have been introduced by 7acbd7b. I’ve tried to reduce the problem to the smallest reproducible version (consumer groups are not necessary) though I have not tried to recreate it with a Ruby producer.
- Ruby: 2.4.2
- Kafka: 0.11.0.1
- ruby-kafka: 0.5.0 (and master@6a599c71)
Steps to reproduce
In the code below I have Kafka running locally in docker at 172.18.0.3:9092, you may need to change the broker list to get it working locally.
log: https://gist.github.com/gaffneyc/f2de66eceb7a4c2f9967c0ba4acda402
go get -u github.com/Shopify/sarama
go run producer.go
ruby consumer.rb
producer.go
// go get -u github.com/Shopify/sarama
// go run producer.go
package main
import (
"strconv"
"github.com/Shopify/sarama"
)
func main() {
cfg := sarama.NewConfig()
cfg.Version = sarama.V0_10_2_0
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true
// Enabling Compression appears to cause the problem
cfg.Producer.Compression = sarama.CompressionSnappy
brokers := []string{"172.18.0.3:9092"}
producer, err := sarama.NewSyncProducer(brokers, cfg)
if err != nil {
panic(err)
}
// This only appears to happen with several message on the topic
for i := 0; i < 30; i++ {
_, _, err = producer.SendMessage(&sarama.ProducerMessage{
Topic: "signals",
Key: sarama.StringEncoder("abcdefgh12"),
Value: sarama.StringEncoder(strconv.Itoa(i)),
})
if err != nil {
panic(err)
}
}
}
consumer.rb
# Place this in the project directory
$:.unshift File.expand_path("../lib", __FILE__)
require "ruby-kafka"
kafka = Kafka.new(seed_brokers: "172.18.0.3:9092")
trap("INT") { kafka.close; exit(0) }
kafka.each_message(topic: "signals") do |msg|
puts msg.inspect
end
Expected outcome
Messages are processed and the consumer waits for the next one available
Actual outcome
Messages are processed in an infinite loop. For a topic with a large number of messages it appears that only a subset may be processed.
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 23 (1 by maintainers)
Awesome! Thank you for getting a fix in there and building ruby-kafka in the first place.