kafka-go: WriteMessages "Unexpected EOF" , config with sasl-plain

Describe the bug when writer message to kafka server , it report that “Unexpected EOF”

both of report are about io or net

I can’t solve it


Kafka Version kafka server : 0.10.22 kafka-go : 0.4.23


To Reproduce

package utils

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	"io/ioutil"
	"os"
	"path"
	"strings"
	"sync"
	"time"

	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/plain"
)

type Client struct {
	r      *kafka.Reader
	w      *kafka.Writer
	ctx    context.Context
	cancel context.CancelFunc
}

type Kafka struct {
	Clients  map[string]*Client
	Topics   map[string]struct{}
	CallBack func(message chan []byte)
	Lock     sync.Mutex
}

func NewKafkaDialer() *kafka.Dialer {
	conf := Conf
	dialer := &kafka.Dialer{}
	dialer.ClientID = "clientId"
	pwd, _ := os.Getwd()
	certBytes, err := ioutil.ReadFile(path.Join(pwd, "/utils/ca.cert"))
	if err != nil {
		Logger.Error("kafka client read cert file failed : " + err.Error())
	}
	clientCertPool := x509.NewCertPool()
	ok := clientCertPool.AppendCertsFromPEM(certBytes)
	if !ok {
		Logger.Error("kafka client failed to parse root certificate")
	}
	dialer.TLS = &tls.Config{
		RootCAs:            clientCertPool,
		InsecureSkipVerify: true,
	}
	dialer.SASLMechanism = plain.Mechanism{
		Username: conf.Kafka.UserName,
		Password: conf.Kafka.Password,
	}
	dialer.Timeout = time.Second * 10
	dialer.DualStack = true
	return dialer
}

func NewKafka(topics []string, callback func(message chan []byte)) *Kafka {
	conf := Conf
	brokers := strings.Split(conf.Kafka.Broker, ",")
	kafkaClients := make(map[string]*Client, len(topics))
	dialer := NewKafkaDialer()

	conn, err := dialer.DialContext(context.Background(), "tcp", brokers[0])
	if err != nil {
		Logger.Error("kafka dial failed : " + err.Error())
	}
	partitions, err := conn.ReadPartitions()
	if err != nil {
		Logger.Error("kafka read partitions failed : " + err.Error())
	}
	Topics := make(map[string]struct{}, len(partitions))

	for _, partition := range partitions {
		Topics[partition.Topic] = struct{}{}
	}

	for _, topic := range topics {
		if _, ok := Topics[topic]; !ok {
			return nil
		}
		r := kafka.NewReader(kafka.ReaderConfig{
			Brokers:        brokers,
			Topic:          topic,
			GroupID:        "server",
			MinBytes:       10e2,
			MaxBytes:       10e5,
			Dialer:         dialer,
			CommitInterval: 1 * time.Second,
		})

		w := kafka.NewWriter(kafka.WriterConfig{
			Brokers:  brokers,
			Topic:    topic,
			Balancer: &kafka.Hash{},
			Dialer:   dialer,
		})
		ctx, cancel := context.WithCancel(context.Background())
		kafkaClients[topic] = &Client{
			r:      r,
			w:      w,
			ctx:    ctx,
			cancel: cancel,
		}
	}
	return &Kafka{
		Clients:  kafkaClients,
		Topics:   Topics,
		CallBack: callback,
		Lock:     sync.Mutex{},
	}
}

func (k *Kafka) Pub(topic string, msg []byte) {
	if _, ok := k.Topics[topic]; !ok {
		return
	}

	err := k.Clients[topic].w.WriteMessages(k.Clients[topic].ctx, kafka.Message{
		Value: msg,
	})
	if err != nil {
		Logger.Error("kafka publish failed : " + err.Error())
	}
}

func (k *Kafka) Sub(topics []string) {
	for _, t := range topics {
		if _, ok := k.Topics[t]; !ok {
			return
		}
	}

	ch := make(chan []byte, 10000)
	go func(ch chan []byte) {
		for _, t := range topics {
			for {
				m, err := k.Clients[t].r.ReadMessage(k.Clients[t].ctx)
				if err != nil {
					Logger.Error("kafka subscribe failed : " + err.Error())
					return
				}
				ch <- m.Value
				
			}

		}
	}(ch)
	go k.CallBack(ch)
}

func (k *Kafka) Pause(topic string) {
	defer k.Lock.Unlock()
	k.Lock.Lock()
	if _, ok := k.Topics[topic]; !ok {
		return
	}
	k.Clients[topic].cancel()
	delete(k.Clients, topic)
	time.Sleep(time.Second * 5)
	k.Resume(topic)
}

func (k *Kafka) Resume(topic string) {
	if _, ok := k.Topics[topic]; !ok {
		return
	}
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  strings.Split(Conf.Kafka.Broker, ","),
		Topic:    topic,
		GroupID:  "server",
		MinBytes: 10e2,
		MaxBytes: 10e5,

		Dialer: NewKafkaDialer(),
	})
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  strings.Split(Conf.Kafka.Broker, ","),
		Topic:    topic,
		Balancer: &kafka.Hash{},
		Dialer:   NewKafkaDialer(),
	})
	ctx, cancel := context.WithCancel(context.Background())
	k.Clients[topic] = &Client{
		r:      r,
		w:      w,
		ctx:    ctx,
		cancel: cancel,
	}
}

func (k *Kafka) Close() {
	for _, c := range k.Clients {
		c.cancel()
		if err := c.r.Close(); err != nil {
			Logger.Error("failed to close reader:" + err.Error())
		}
	}
}


Expected behavior


Additional context because kafka is not priority queue , so I must pause some low priority topics and resume it after little time .

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 55 (8 by maintainers)

Most upvoted comments

@lxxwilliam @tong3jie I believe I may have found something related to this issue. Would you be able to try out https://github.com/rhansen2/kafka-go/tree/transport-saslv0 and let me know if it help?

Thanks!

Hi, I have the same err with the version v0.4.28 and 0.10.2 for Kafka server version. The error comes from readFull also. If I use the Writer, I have the: error unexpected EOF

w := &kafka.Writer{
	Addr:  kafka.TCP("kafka:9093"),
	Topic: "topic",
	Transport: &kafka.Transport{
		SASL: plain.Mechanism{
			Username: "user",
			Password: "XXXXX",
		},
		TLS: &tls.Config{},
	},
}

err = w.WriteMessages(context.Background(),
	kafka.Message{
		Value: []byte("message"),
	},
)
if err != nil {
	log.Fatal("failed to write messages:", err)
}

But if I use Conn, it’s work.

partition := 0
dialer := &kafka.Dialer{
	Timeout:   10 * time.Second,
	DualStack: true,
	TLS: &tls.Config{
		InsecureSkipVerify: true,
	},
	SASLMechanism: plain.Mechanism{
		Username: "user",
		Password: "XXXXX",
	},
}

conn, err := dialer.DialLeader(context.Background(), "tcp", "kafka:9093", "topic", partition)
if err != nil {
	log.Fatal("failed to dial leader:", err)
}

conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
_, err = conn.WriteMessages(
	kafka.Message{
		Value: []byte("message"),
	},
)
if err != nil {
	log.Error("failed to write messages : " + err.Error())
}

With version 0.4.0 and the older versions, it works. The error seems to come with this commit: https://github.com/segmentio/kafka-go/commit/81bd03ced29aa1fdc37e54ab79ae53884e0a8c21