kafka-go: Unable to read messages from MSK Serverless cluster

While attempting to switch from an AWS MSK Provisioned Cluster to an MSK Serverless Cluster: Writing messages to topics works, but attempting to read from a topic fails.

Kafka Version

github.com/segmentio/kafka-go v0.4.34 github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20220809022639-fcb5875e8e6a

AWS is very vague about the version of Kafka they are running in the Serverless setup, but the examples here use the client library intended for 2.8.1. (Consuming messages with that java client works.)

To Reproduce

package main

import (
	"context"
	"crypto/tls"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	signer "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
	awsCfg "github.com/aws/aws-sdk-go-v2/config"
	kafka "github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
)

func main() {
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT)
	ctx, cancel := context.WithCancel(context.Background())

	go func() {
		sig := <-signals
		fmt.Println("Got signal: ", sig)
		cancel()
	}()

	bootstrapServers := []string{"<kafka-bootstrap-server-url>"}
	topic := "example-topic"

	cfg, _ := awsCfg.LoadDefaultConfig(ctx)
	creds, _ := cfg.Credentials.Retrieve(ctx)
	m := &aws_msk_iam_v2.Mechanism{
		Signer:      signer.NewSigner(),
		Credentials: creds,
		Region:      "us-east-1",
		SignTime:    time.Now(),
		Expiry:      time.Minute * 15,
	}

	config := kafka.ReaderConfig{
		Brokers: bootstrapServers,
		GroupID: "test-consumer-group-1",
		Topic:   topic,
		// Partition: 0,
		MaxWait: 50000 * time.Millisecond,
		Dialer: &kafka.Dialer{
			Timeout:       50 * time.Second,
			DualStack:     true,
			SASLMechanism: m,
			TLS: &tls.Config{
				MinVersion: tls.VersionTLS12,
			},
		},
	}

	r := kafka.NewReader(config)
	fmt.Println("Consumer configuration: ", config)

	defer func() {
		err := r.Close()
		if err != nil {
			fmt.Println("Error closing consumer: ", err)
			return
		}
		fmt.Println("Consumer closed")
	}()

	for {
		m, err := r.ReadMessage(ctx)
		// m, err := r.FetchMessage(ctx)
		if err != nil {
			fmt.Printf("Error reading message: %+v\n", err)
			break
		}
		fmt.Printf("Received message from %s-%d [%d]: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
	}
}

Expected Behavior

Messages should be received from the topic.

Observed Behavior

Running the example fails.

When providing a groupID in the ReaderConfig, the error is:

Error reading message: [42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details

Log output:

entering loop for consumer group, test-consumer-group-1 Failed to join group test-consumer-group-1: [42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details [42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details

Without a groupId, and with a partition, the error is:

Error reading message: unexpected EOF

Log output:

initializing kafka reader for partition 0 of example-topic starting at offset first offset kafka reader failed to read lag of partition 0 of example-topic: EOF

Additional Context

The topic has been created beforehand, and messages have been written to that topic.

I have also tried most of the possible combinations of TLSConfig.MinVersion and MaxVersion available, but to no avail.

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 19 (10 by maintainers)

Most upvoted comments

@mtkopone Sorry for jumping into the discussion. I have MSK Cluster (not serverless though) with version 2.8.1 with following versions and it works ok. So the 2.8.1 itself shouldn’t be a problem.

github.com/segmentio/kafka-go v0.4.34
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20220809022639-fcb5875e8e6a

The error is EOF which likely happen when you have network connection failure. I had this error when I contributed to aws_msk_iam_v2 module development.

My guts feeling is that I want to double check the MSK port where you are connecting with. For IAM authentication, you need to connect via port 9098 for access from within AWS and port 9198 for public access. AWS documented here. Also, please double check you enabled IAM Authentication when you created MSK.

^ I had the same issue and the hint network connection failure has helped me. Turns out I miss the TLS config (even tho set the InsecureSkipVerify to true). Thanks, @kikyomits !

@mtkopone Sorry for jumping into the discussion. I have MSK Cluster (not serverless though) with version 2.8.1 with following versions and it works ok. So the 2.8.1 itself shouldn’t be a problem.

github.com/segmentio/kafka-go v0.4.34
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20220809022639-fcb5875e8e6a

The error is EOF which likely happen when you have network connection failure. I had this error when I contributed to aws_msk_iam_v2 module development.

My guts feeling is that I want to double check the MSK port where you are connecting with. For IAM authentication, you need to connect via port 9098 for access from within AWS and port 9198 for public access. AWS documented here. Also, please double check you enabled IAM Authentication when you created MSK.