franz-go: Kafka read error with SASL auth: TOPIC_AUTHORIZATION_FAILED: Not authorized to access topics: [Topic authorization failed.] + GROUP_AUTHORIZATION_FAILED: Not authorized to access group: Group authorization failed

I have a service that continuosly writes (and then reads) from the same topic every 5 seconds.

I have two different clients using franz-go, one with TLS auth and one with SASL via msk_iam auth.

My writer seems to be fine but errors every few hours. The errors are always the same.

TOPIC_AUTHORIZATION_FAILED: Not authorized to access topics: [Topic authorization failed.]

I will get 200k of these errors over the course of 4 minutes and then everything resumes fine again.

My reader code was based heavily on this example: https://github.com/twmb/franz-go/blob/master/examples/goroutine_per_partition_consuming/autocommit_marks/main.go

This is a snipped of the code:

for {
	fetches := r.service.PollRecords(ctx, 1000)
	err := fetches.Err0()
	if err != nil {
		if errors.Is(err, kafka.ErrClientClosed) {
			contextLogger.Error().Err(err).Msg("Client closed")
			return err
		}
		if errors.Is(err, context.Canceled) {
			contextLogger.Error().Err(err).Msg("Context cancelled closed")
			return err
		}
	}
	fetches.EachError(func(topic string, partition int32, err error) {
		contextLogger.Error().
			Err(err).
			Str("Topic", topic).
			Int32("Partition", partition).
			Msg("Cannot fetch message")
	})
	fetches.EachPartition(func(fetchedTopicPartition kafka.FetchTopicPartition) {
		tp := topicPartition{
			topic:     fetchedTopicPartition.Topic,
			partition: fetchedTopicPartition.Partition,
		}

		r.consumers[tp].records <- fetchedTopicPartition
	})

	r.service.AllowRebalance()
}

And this is how the client is instantiated:

	franzGoReader := &FranzGoReader{
		consumers: make(map[topicPartition]*franzGoPartitionReader),
		log:       contextLogger,
	}

	kafkaAddr := strings.Split(brokersEndpoint, ",")
	clientOpts := []kafka.Opt{
		kafka.SeedBrokers(kafkaAddr...),
		kafka.ConsumeTopics(kafkaTopic),
		kafka.ConsumerGroup(consumerGroup),
		// default is CooperativeStickyBalancer
		kafka.Balancers(kafka.RangeBalancer()),
		kafka.ConsumeResetOffset(kafka.NewOffset().AtStart()),
		kafka.OnPartitionsAssigned(franzGoReader.assigned),
		kafka.OnPartitionsRevoked(franzGoReader.revoked),
		kafka.OnPartitionsLost(franzGoReader.lost),
		kafka.AutoCommitMarks(),
		kafka.AutoCommitInterval(3 * time.Second),
		kafka.BlockRebalanceOnPoll(),
		kafka.WithLogger(&franzGoLogger{
			log: contextLogger,
		}),
	}
	if useAuth {
		clientOpts = append(clientOpts, kafka.DialTLSConfig(&tls.Config{}))

		sess, err := session.NewSession()
		if err != nil {
			contextLogger.Error().Err(err).Msg("Could not get aws session")
		}
		clientOpts = append(clientOpts, kafka.SASL(aws.ManagedStreamingIAM(func(ctx context.Context) (aws.Auth, error) {
			val, err := sess.Config.Credentials.GetWithContext(ctx)
			if err != nil {
				return aws.Auth{}, err
			}
			return aws.Auth{
				AccessKey:    val.AccessKeyID,
				SecretKey:    val.SecretAccessKey,
				SessionToken: val.SessionToken,
				UserAgent:    "kafka-health-monitor",
			}, nil
		})))
	}

	cl, err := kafka.NewClient(clientOpts...)
	if err != nil {
		contextLogger.Error().Err(err).Msg("Could not create FranzGo client")
		return nil, err
	}

partitions assigned callback:

func (r *FranzGoReader) assigned(_ context.Context, cl *kafka.Client, assignedTopicPartitions map[string][]int32) {
	contextLogger := r.log.With().Logger()
	contextLogger.Debug().Msg("Partitions assigned")

	for topic, partitions := range assignedTopicPartitions {
		for _, partition := range partitions {
			contextLoggerPartionReader := contextLogger.With().
				Str("Topic", topic).
				Int32("Partition", partition).
				Logger()
			partitionReader := &franzGoPartitionReader{
				service:   cl,
				log:       contextLoggerPartionReader,
				topic:     topic,
				partition: partition,

				quit:    make(chan struct{}),
				done:    make(chan struct{}),
				records: make(chan kafka.FetchTopicPartition, 5),
			}
			r.consumers[topicPartition{topic, partition}] = partitionReader
			go partitionReader.consume()
		}
	}
}

partitions lost callback:

func (r *FranzGoReader) lost(_ context.Context, _ *kafka.Client, topicPartionsLost map[string][]int32) {
	contextLogger := r.log.With().Logger()
	contextLogger.Debug().Msg("Partitions lost")

	r.killPartitionReaders(topicPartionsLost)
}

func (r *FranzGoReader) killPartitionReaders(topicPartions map[string][]int32) {
	contextLogger := r.log.With().Logger()
	contextLogger.Debug().Msg("Kill partitionReaders")

	wg := &sync.WaitGroup{}
	defer wg.Wait()

	for topic, partitions := range topicPartions {
		for _, partition := range partitions {
			wg.Add(1)

			tp := topicPartition{topic, partition}
			partitionReader := r.consumers[tp]
			delete(r.consumers, tp)
			go func() {
				partitionReader.close()
				wg.Done()
			}()
		}
	}
}

func (pr *franzGoPartitionReader) close() {
	contextLogger := pr.log.With().Logger()
	contextLogger.Debug().Msg("Closing partitionReader")

	close(pr.quit)
	contextLogger.Debug().Msg("Waiting for work to finish")
	<-pr.done
}

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 21 (11 by maintainers)

Commits related to this issue

Most upvoted comments

This is released in v1.9.1

Awesome, good to hear. I’ll wait another day or two and assume that no response (or good response) means it’s good, and a reply that it’s bad indicates things are still bugged

@twmb yeah I deployed it the other day (soon after I responded to you) and I haven’t seen any errors yet! Was waiting a bit longer to call it “done” though since in the past it would randomly go a while without errors as well. But so far so good.