kafka-go: Unable to read messages from MSK Serverless cluster
While attempting to switch from an AWS MSK Provisioned Cluster to an MSK Serverless Cluster: Writing messages to topics works, but attempting to read from a topic fails.
Kafka Version
github.com/segmentio/kafka-go v0.4.34 github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.0.0-20220809022639-fcb5875e8e6a
AWS is very vague about the version of Kafka they are running in the Serverless setup, but the examples here use the client library intended for 2.8.1. (Consuming messages with that java client works.)
To Reproduce
package main
import (
"context"
"crypto/tls"
"fmt"
"os"
"os/signal"
"syscall"
"time"
signer "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
awsCfg "github.com/aws/aws-sdk-go-v2/config"
kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
)
func main() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT)
ctx, cancel := context.WithCancel(context.Background())
go func() {
sig := <-signals
fmt.Println("Got signal: ", sig)
cancel()
}()
bootstrapServers := []string{"<kafka-bootstrap-server-url>"}
topic := "example-topic"
cfg, _ := awsCfg.LoadDefaultConfig(ctx)
creds, _ := cfg.Credentials.Retrieve(ctx)
m := &aws_msk_iam_v2.Mechanism{
Signer: signer.NewSigner(),
Credentials: creds,
Region: "us-east-1",
SignTime: time.Now(),
Expiry: time.Minute * 15,
}
config := kafka.ReaderConfig{
Brokers: bootstrapServers,
GroupID: "test-consumer-group-1",
Topic: topic,
// Partition: 0,
MaxWait: 50000 * time.Millisecond,
Dialer: &kafka.Dialer{
Timeout: 50 * time.Second,
DualStack: true,
SASLMechanism: m,
TLS: &tls.Config{
MinVersion: tls.VersionTLS12,
},
},
}
r := kafka.NewReader(config)
fmt.Println("Consumer configuration: ", config)
defer func() {
err := r.Close()
if err != nil {
fmt.Println("Error closing consumer: ", err)
return
}
fmt.Println("Consumer closed")
}()
for {
m, err := r.ReadMessage(ctx)
// m, err := r.FetchMessage(ctx)
if err != nil {
fmt.Printf("Error reading message: %+v\n", err)
break
}
fmt.Printf("Received message from %s-%d [%d]: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
}
Expected Behavior
Messages should be received from the topic.
Observed Behavior
Running the example fails.
When providing a groupID in the ReaderConfig, the error is:
Error reading message: [42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details
Log output:
entering loop for consumer group, test-consumer-group-1 Failed to join group test-consumer-group-1: [42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details [42] Invalid Request: this most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker, se the broker logs for more details
Without a groupId, and with a partition, the error is:
Error reading message: unexpected EOF
Log output:
initializing kafka reader for partition 0 of example-topic starting at offset first offset kafka reader failed to read lag of partition 0 of example-topic: EOF
Additional Context
The topic has been created beforehand, and messages have been written to that topic.
I have also tried most of the possible combinations of TLSConfig.MinVersion and MaxVersion available, but to no avail.
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 19 (10 by maintainers)
^ I had the same issue and the hint
network connection failure
has helped me. Turns out I miss the TLS config (even tho set the InsecureSkipVerify to true). Thanks, @kikyomits !@mtkopone Sorry for jumping into the discussion. I have MSK Cluster (not serverless though) with version 2.8.1 with following versions and it works ok. So the
2.8.1
itself shouldn’t be a problem.The error is
EOF
which likely happen when you have network connection failure. I had this error when I contributed toaws_msk_iam_v2
module development.My guts feeling is that I want to double check the MSK port where you are connecting with. For IAM authentication, you need to connect via port 9098 for access from within AWS and port 9198 for public access. AWS documented here. Also, please double check you enabled IAM Authentication when you created MSK.