kafka-go: WriteMessages "Unexpected EOF" , config with sasl-plain
Describe the bug when writer message to kafka server , it report that “Unexpected EOF”
both of report are about io or net
I can’t solve it
Kafka Version kafka server : 0.10.22 kafka-go : 0.4.23
To Reproduce
package utils
import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"os"
"path"
"strings"
"sync"
"time"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)
type Client struct {
r *kafka.Reader
w *kafka.Writer
ctx context.Context
cancel context.CancelFunc
}
type Kafka struct {
Clients map[string]*Client
Topics map[string]struct{}
CallBack func(message chan []byte)
Lock sync.Mutex
}
func NewKafkaDialer() *kafka.Dialer {
conf := Conf
dialer := &kafka.Dialer{}
dialer.ClientID = "clientId"
pwd, _ := os.Getwd()
certBytes, err := ioutil.ReadFile(path.Join(pwd, "/utils/ca.cert"))
if err != nil {
Logger.Error("kafka client read cert file failed : " + err.Error())
}
clientCertPool := x509.NewCertPool()
ok := clientCertPool.AppendCertsFromPEM(certBytes)
if !ok {
Logger.Error("kafka client failed to parse root certificate")
}
dialer.TLS = &tls.Config{
RootCAs: clientCertPool,
InsecureSkipVerify: true,
}
dialer.SASLMechanism = plain.Mechanism{
Username: conf.Kafka.UserName,
Password: conf.Kafka.Password,
}
dialer.Timeout = time.Second * 10
dialer.DualStack = true
return dialer
}
func NewKafka(topics []string, callback func(message chan []byte)) *Kafka {
conf := Conf
brokers := strings.Split(conf.Kafka.Broker, ",")
kafkaClients := make(map[string]*Client, len(topics))
dialer := NewKafkaDialer()
conn, err := dialer.DialContext(context.Background(), "tcp", brokers[0])
if err != nil {
Logger.Error("kafka dial failed : " + err.Error())
}
partitions, err := conn.ReadPartitions()
if err != nil {
Logger.Error("kafka read partitions failed : " + err.Error())
}
Topics := make(map[string]struct{}, len(partitions))
for _, partition := range partitions {
Topics[partition.Topic] = struct{}{}
}
for _, topic := range topics {
if _, ok := Topics[topic]; !ok {
return nil
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: "server",
MinBytes: 10e2,
MaxBytes: 10e5,
Dialer: dialer,
CommitInterval: 1 * time.Second,
})
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: topic,
Balancer: &kafka.Hash{},
Dialer: dialer,
})
ctx, cancel := context.WithCancel(context.Background())
kafkaClients[topic] = &Client{
r: r,
w: w,
ctx: ctx,
cancel: cancel,
}
}
return &Kafka{
Clients: kafkaClients,
Topics: Topics,
CallBack: callback,
Lock: sync.Mutex{},
}
}
func (k *Kafka) Pub(topic string, msg []byte) {
if _, ok := k.Topics[topic]; !ok {
return
}
err := k.Clients[topic].w.WriteMessages(k.Clients[topic].ctx, kafka.Message{
Value: msg,
})
if err != nil {
Logger.Error("kafka publish failed : " + err.Error())
}
}
func (k *Kafka) Sub(topics []string) {
for _, t := range topics {
if _, ok := k.Topics[t]; !ok {
return
}
}
ch := make(chan []byte, 10000)
go func(ch chan []byte) {
for _, t := range topics {
for {
m, err := k.Clients[t].r.ReadMessage(k.Clients[t].ctx)
if err != nil {
Logger.Error("kafka subscribe failed : " + err.Error())
return
}
ch <- m.Value
}
}
}(ch)
go k.CallBack(ch)
}
func (k *Kafka) Pause(topic string) {
defer k.Lock.Unlock()
k.Lock.Lock()
if _, ok := k.Topics[topic]; !ok {
return
}
k.Clients[topic].cancel()
delete(k.Clients, topic)
time.Sleep(time.Second * 5)
k.Resume(topic)
}
func (k *Kafka) Resume(topic string) {
if _, ok := k.Topics[topic]; !ok {
return
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: strings.Split(Conf.Kafka.Broker, ","),
Topic: topic,
GroupID: "server",
MinBytes: 10e2,
MaxBytes: 10e5,
Dialer: NewKafkaDialer(),
})
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: strings.Split(Conf.Kafka.Broker, ","),
Topic: topic,
Balancer: &kafka.Hash{},
Dialer: NewKafkaDialer(),
})
ctx, cancel := context.WithCancel(context.Background())
k.Clients[topic] = &Client{
r: r,
w: w,
ctx: ctx,
cancel: cancel,
}
}
func (k *Kafka) Close() {
for _, c := range k.Clients {
c.cancel()
if err := c.r.Close(); err != nil {
Logger.Error("failed to close reader:" + err.Error())
}
}
}
Expected behavior
Additional context because kafka is not priority queue , so I must pause some low priority topics and resume it after little time .
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 55 (8 by maintainers)
@lxxwilliam @tong3jie I believe I may have found something related to this issue. Would you be able to try out https://github.com/rhansen2/kafka-go/tree/transport-saslv0 and let me know if it help?
Thanks!
Hi, I have the same err with the version v0.4.28 and 0.10.2 for Kafka server version. The error comes from readFull also. If I use the Writer, I have the: error unexpected EOF
But if I use Conn, it’s work.
With version 0.4.0 and the older versions, it works. The error seems to come with this commit: https://github.com/segmentio/kafka-go/commit/81bd03ced29aa1fdc37e54ab79ae53884e0a8c21