kafka-go: Writer error "unexpected EOF" with v0.4.2 + SASL + TLS

Describe the bug When I try to write a message, I receive “unexpected EOF” error message

Kafka Version I’m using the version available from confluent.cloud, so I think it’s the latest.

To Reproduce

package main

import (
	"context"
	"crypto/tls"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/plain"
)

func main() {
	ctx := context.Background()
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers: []string{
			"xxx.gcp.confluent.cloud:9092",
		},
		Topic: "test",
		Dialer: &kafka.Dialer{
			Timeout:   10 * time.Second,
			DualStack: true,
			TLS:       &tls.Config{},
			SASLMechanism: plain.Mechanism{
				Username: "yyy",
				Password: "zzz",
			},
		},
		BatchSize: 1,
		Logger: kafka.LoggerFunc(func(format string, args ...interface{}) {
			log.Printf("Kafka writer: "+format, args...)
		}),
	})
	for {
		err := w.WriteMessages(ctx, kafka.Message{
			Value: []byte(time.Now().Format(time.RFC3339)),
		})
		if err != nil {
			panic(err)
		}
		log.Println("message sent")
		time.Sleep(1 * time.Second)
	}
}

The call to w.WriteMessages returns an error.

Expected behavior I shouldn’t not receive any error

Additional context I’m using v0.4.2 with TLS + SASL auth.

It was working fine with 0.3.8 and 0.4.0. 0.4.1 has a panic.

I think it’s actually the same issue as https://github.com/segmentio/kafka-go/issues/490 But I don’t understand why it was closed, the issue is not fixed. Someone said https://github.com/segmentio/kafka-go/issues/490#issuecomment-676238806 but it’s not true, right ?

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 11
  • Comments: 17 (1 by maintainers)

Most upvoted comments

I ran into this behavior again while trying to hook up a new broker in Confluent Cloud. All attempts at sending a message with a Writer resulted in an unexpected EOF error. Eventually, I isolated it down to a struct field in Transport. We are using v0.4.27 kafka-go and connect according to the Confluent documentation (SASL Plain / API Key & Secret).

Per the example documentation, we started out using (and had the errors mentioned above):

sharedTransport := &kafka.Transport{
		SASL: mechanism,
	}

I went through the Confluent documentation and also took a look at the default config they supply. The clue that tipped me off was that they require TLS 1.2 minimum. Adding that to our Transport struct as below solved the issue for us:

sharedTransport := &kafka.Transport{
		SASL: mechanism,
		// If we don't include this we get unexpected EOF errors
		TLS: &tls.Config{
			MinVersion: tls.VersionTLS12,
		},
	}

Thanks for verifying @pierrre, and thanks everyone for your reports!

Any fix there?

probably fixed by #541 I will test it, closing the issue for now

@swarupdonepudi thank you for noticing this ! But the issue is still the same. Here is my new code:

package main

import (
	"context"
	"crypto/tls"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/plain"
)

func main() {
	ctx := context.Background()
	w := &kafka.Writer{
		Addr:  kafka.TCP("xxx.gcp.confluent.cloud:9092"),
		Topic: "test",
		Transport: &kafka.Transport{
			TLS: &tls.Config{},
			SASL: plain.Mechanism{
				Username: "yyy",
				Password: "zzz",
			},
		},
		BatchSize: 1,
		Logger: kafka.LoggerFunc(func(format string, args ...interface{}) {
			log.Printf("Kafka writer: "+format, args...)
		}),
	}
	for {
		err := w.WriteMessages(ctx, kafka.Message{
			Value: []byte(time.Now().Format(time.RFC3339)),
		})
		if err != nil {
			panic(err)
		}
		log.Println("message sent")
		time.Sleep(1 * time.Second)
	}
}

PS: while testing on a different Kafka cluster, I was able to access the error log on the broker side:

[2020-08-31 14:02:52,081] INFO [SocketServer brokerId=1] Failed authentication with /1.2.3.4 (Invalid SASL/PLAIN response: expected 3 tokens, got 4) (org.apache.kafka.common.network.Selector)

I’m not sure, but it could be related to this code: https://github.com/segmentio/kafka-go/blob/3d2413fa36e84d1f459dc86f88e5ad7652996c52/sasl/plain/plain.go#L23 However I don’t know from where the 3rd \x00 is written 🤷