franz-go: Kafka read error with SASL auth: TOPIC_AUTHORIZATION_FAILED: Not authorized to access topics: [Topic authorization failed.] + GROUP_AUTHORIZATION_FAILED: Not authorized to access group: Group authorization failed
I have a service that continuosly writes (and then reads) from the same topic every 5 seconds.
I have two different clients using franz-go, one with TLS auth and one with SASL via msk_iam auth.
My writer seems to be fine but errors every few hours. The errors are always the same.
TOPIC_AUTHORIZATION_FAILED: Not authorized to access topics: [Topic authorization failed.]
I will get 200k of these errors over the course of 4 minutes and then everything resumes fine again.
My reader code was based heavily on this example: https://github.com/twmb/franz-go/blob/master/examples/goroutine_per_partition_consuming/autocommit_marks/main.go
This is a snipped of the code:
for {
fetches := r.service.PollRecords(ctx, 1000)
err := fetches.Err0()
if err != nil {
if errors.Is(err, kafka.ErrClientClosed) {
contextLogger.Error().Err(err).Msg("Client closed")
return err
}
if errors.Is(err, context.Canceled) {
contextLogger.Error().Err(err).Msg("Context cancelled closed")
return err
}
}
fetches.EachError(func(topic string, partition int32, err error) {
contextLogger.Error().
Err(err).
Str("Topic", topic).
Int32("Partition", partition).
Msg("Cannot fetch message")
})
fetches.EachPartition(func(fetchedTopicPartition kafka.FetchTopicPartition) {
tp := topicPartition{
topic: fetchedTopicPartition.Topic,
partition: fetchedTopicPartition.Partition,
}
r.consumers[tp].records <- fetchedTopicPartition
})
r.service.AllowRebalance()
}
And this is how the client is instantiated:
franzGoReader := &FranzGoReader{
consumers: make(map[topicPartition]*franzGoPartitionReader),
log: contextLogger,
}
kafkaAddr := strings.Split(brokersEndpoint, ",")
clientOpts := []kafka.Opt{
kafka.SeedBrokers(kafkaAddr...),
kafka.ConsumeTopics(kafkaTopic),
kafka.ConsumerGroup(consumerGroup),
// default is CooperativeStickyBalancer
kafka.Balancers(kafka.RangeBalancer()),
kafka.ConsumeResetOffset(kafka.NewOffset().AtStart()),
kafka.OnPartitionsAssigned(franzGoReader.assigned),
kafka.OnPartitionsRevoked(franzGoReader.revoked),
kafka.OnPartitionsLost(franzGoReader.lost),
kafka.AutoCommitMarks(),
kafka.AutoCommitInterval(3 * time.Second),
kafka.BlockRebalanceOnPoll(),
kafka.WithLogger(&franzGoLogger{
log: contextLogger,
}),
}
if useAuth {
clientOpts = append(clientOpts, kafka.DialTLSConfig(&tls.Config{}))
sess, err := session.NewSession()
if err != nil {
contextLogger.Error().Err(err).Msg("Could not get aws session")
}
clientOpts = append(clientOpts, kafka.SASL(aws.ManagedStreamingIAM(func(ctx context.Context) (aws.Auth, error) {
val, err := sess.Config.Credentials.GetWithContext(ctx)
if err != nil {
return aws.Auth{}, err
}
return aws.Auth{
AccessKey: val.AccessKeyID,
SecretKey: val.SecretAccessKey,
SessionToken: val.SessionToken,
UserAgent: "kafka-health-monitor",
}, nil
})))
}
cl, err := kafka.NewClient(clientOpts...)
if err != nil {
contextLogger.Error().Err(err).Msg("Could not create FranzGo client")
return nil, err
}
partitions assigned callback:
func (r *FranzGoReader) assigned(_ context.Context, cl *kafka.Client, assignedTopicPartitions map[string][]int32) {
contextLogger := r.log.With().Logger()
contextLogger.Debug().Msg("Partitions assigned")
for topic, partitions := range assignedTopicPartitions {
for _, partition := range partitions {
contextLoggerPartionReader := contextLogger.With().
Str("Topic", topic).
Int32("Partition", partition).
Logger()
partitionReader := &franzGoPartitionReader{
service: cl,
log: contextLoggerPartionReader,
topic: topic,
partition: partition,
quit: make(chan struct{}),
done: make(chan struct{}),
records: make(chan kafka.FetchTopicPartition, 5),
}
r.consumers[topicPartition{topic, partition}] = partitionReader
go partitionReader.consume()
}
}
}
partitions lost callback:
func (r *FranzGoReader) lost(_ context.Context, _ *kafka.Client, topicPartionsLost map[string][]int32) {
contextLogger := r.log.With().Logger()
contextLogger.Debug().Msg("Partitions lost")
r.killPartitionReaders(topicPartionsLost)
}
func (r *FranzGoReader) killPartitionReaders(topicPartions map[string][]int32) {
contextLogger := r.log.With().Logger()
contextLogger.Debug().Msg("Kill partitionReaders")
wg := &sync.WaitGroup{}
defer wg.Wait()
for topic, partitions := range topicPartions {
for _, partition := range partitions {
wg.Add(1)
tp := topicPartition{topic, partition}
partitionReader := r.consumers[tp]
delete(r.consumers, tp)
go func() {
partitionReader.close()
wg.Done()
}()
}
}
}
func (pr *franzGoPartitionReader) close() {
contextLogger := pr.log.With().Logger()
contextLogger.Debug().Msg("Closing partitionReader")
close(pr.quit)
contextLogger.Debug().Msg("Waiting for work to finish")
<-pr.done
}
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 21 (11 by maintainers)
This is released in v1.9.1
Awesome, good to hear. I’ll wait another day or two and assume that no response (or good response) means it’s good, and a reply that it’s bad indicates things are still bugged
@twmb yeah I deployed it the other day (soon after I responded to you) and I haven’t seen any errors yet! Was waiting a bit longer to call it “done” though since in the past it would randomly go a while without errors as well. But so far so good.