franz-go: Client stops consuming certain partitions
Hi,
I am using the Franz library to consume messages from an Azure Event Hub topic. I have 2 consumers per region, and the Event Hub in the central region. My consumer is based on the [goroutine_per_partition_consuming](https://github.com/twmb/franz-go/blob/master/examples/goroutine_per_partition_consuming/manual_commit/main.go) with manual committing, but I am not blocking a rebalance on polls. I am also using partition “flow control” to stop fetching partitions that have a backlog of records to process. I do this to ensure the polling go routine does not block waiting to send records to a worker.
What I am seeing is that sometimes the fetch no longer returns records for a partition, but I haven’t been able to pinpoint why. I am wondering if it is a race condition between the pause/resume and rebalancing. I am hoping by sharing my code, you’ll be able to pinpoint whether I am doing something wrong, or whether there is a race condition in the library that is causing this odd behavior. Please note that there are some interfaces introduced in the code below for unit testing purposes.
package franz
import (
"context"
"sync"
"github.com/XXX/log"
"github.com/twmb/franz-go/pkg/kgo"
)
// newCoordinator will create a new coordinator that will deliver events to the specified [RecordListener]
func newCoordinator(logger log.Logger, l RecordListener) *coordinator {
const defaultBufferSize = 10
return newCoordinatorWithBufferSize(logger, l, defaultBufferSize)
}
func newCoordinatorWithBufferSize(logger log.Logger, l RecordListener, size int) *coordinator {
return &coordinator{
logger: logger,
listener: l,
mu: &sync.RWMutex{},
workers: make(map[TopicPartition]*worker),
bufferSize: size,
// we will pause fetching for a partition if the channel for a worker is at least 50% capacity
pauseThreshold: 0.50 * float32(size),
}
}
// a coordinator is used for managing a Kafka consumer group member and sending records to workers it is managing
type coordinator struct {
logger log.Logger
// a listener is the application code that a worker will deliver a batch of records to
listener RecordListener
// mu is used to guard the workers map since it is accessed in the run loop and via the consumer group management
// callbacks
mu *sync.RWMutex
workers map[TopicPartition]*worker
// bufferSize is used to size the channel that is used to send records to a worker
bufferSize int
// pauseThreshold is the number of items in the records channel over which we will pause fetching data for this
// [TopicPartition]/worker
pauseThreshold float32
}
// onPartitionsAssigned is called when a group is joined after partitions are assigned before fetches for those
// partitions begin. we use it to create workers based upon the topic/partitions we have been assigned.
// NOTE - This function is not called concurrent with revoke or lost. This function can be called at any time
// while we are polling or processing records.
func (c *coordinator) onPartitionsAssigned(ctx context.Context, client *kgo.Client, assigned map[string][]int32) {
if len(assigned) == 0 {
return
}
c.handlePartitionsAssigned(ctx, client, assigned)
}
func (c *coordinator) handlePartitionsAssigned(ctx context.Context, client *kgo.Client, assigned map[string][]int32) {
// this wait group is used to ensure the workers are running before returning
var wg sync.WaitGroup
defer wg.Wait()
for topic, partitions := range assigned {
for _, partition := range partitions {
assignment := TopicPartition{topic, partition}
w := c.newWorker(c.logger, client, assignment)
c.mu.Lock()
c.workers[assignment] = w
c.mu.Unlock()
wg.Add(1)
go func() {
wg.Done()
w.run(ctx)
}()
}
}
}
func (c *coordinator) newWorker(clog log.Logger, client *kgo.Client, assignment TopicPartition) *worker {
return &worker{
logger: clog,
client: client,
listener: c.listener,
assignment: assignment,
stop: make(chan struct{}),
stopped: make(chan struct{}),
// we make it a buffered channel so that the [coordinator.doRun] loop is not blocked while waiting
// for a worker to pick up the records
records: make(chan []*kgo.Record, c.bufferSize),
// we want to resume fetching for this partition if the *used* channel capacity is <= 25%
resumeThreshold: 0.25 * float32(c.bufferSize),
}
}
// onPartitionsLost is called once this group member has partitions lost or revoked. We use it to stop any workers
// whose partitions we are losing. Since the worker is doing the committing, the coordinator will send a stop signal
// to each worker and wait until each is stopped before preceding. This blocking action is required so that the
// rebalance doesn't proceed until we have committed our work.
func (c *coordinator) onPartitionsLost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) {
if len(lost) == 0 {
return
}
c.handlePartitionsLost(lost)
}
func (c *coordinator) handlePartitionsLost(lost map[string][]int32) {
var wg sync.WaitGroup
defer wg.Wait()
for topic, partitions := range lost {
for _, partition := range partitions {
assignment := TopicPartition{topic, partition}
c.mu.Lock()
w := c.workers[assignment]
delete(c.workers, assignment)
c.mu.Unlock()
close(w.stop)
wg.Add(1)
go func(a TopicPartition) {
<-w.stopped
wg.Done()
}(assignment)
}
}
}
func (c *coordinator) run(ctx context.Context, cl *kgo.Client) {
for {
f := cl.PollFetches(ctx)
if f.IsClientClosed() {
return
}
f.EachError(func(topic string, part int32, err error) {
c.logger.WithError(err).ErrorContext(ctx, "Problem occurred while fetching records")
})
f.EachPartition(func(p kgo.FetchTopicPartition) {
// the library is giving zero records occasionally for a partition. Need to look into this to see
// as it seems like a bug.
if len(p.Records) == 0 {
return
}
tp := TopicPartition{p.Topic, p.Partition}
c.mu.RLock()
w, ok := c.workers[tp]
c.mu.RUnlock()
if ok {
c.sendRecordsToWorker(tp, cl, w, p)
}
})
}
}
func (c *coordinator) sendRecordsToWorker(tp TopicPartition, client *kgo.Client, w *worker, f kgo.FetchTopicPartition) {
w.records <- f.Records
if _, pause := c.shouldPause(w); pause {
// pause fetching more records for this partition until the worker finishes makes progress. NOTE
// that the Franz client buffers records internally, so even after we pause this partition, we
// may get a few batches delivered for this worker. This is the reason we needed a buffered channel with
// a larger capacity; so we don't block delivering records to the worker
client.PauseFetchPartitions(map[string][]int32{tp.Topic: {tp.Partition}})
}
}
func (c *coordinator) shouldPause(w *worker) (int, bool) {
l := len(w.records)
return l, float32(l) > c.pauseThreshold
}
type worker struct {
logger log.Logger
// the Topic and Partition that this worker is handling records for
assignment TopicPartition
client *kgo.Client
listener RecordListener
// stop is used by the coordinator to ask a worker to stop what it is doing, and exit
stop chan struct{}
// stopped will be closed when this worker has stopped, i.e. terminated
stopped chan struct{}
records chan []*kgo.Record
// this is the number of items in the records channel under which we will resume fetching data for this
// [TopicPartition]
resumeThreshold float32
}
func (w *worker) run(ctx context.Context) {
defer func() {
// we drain the records channel in the unlikely event that the fetch loop is blocked trying to put more
// records into this channel
drainChannel(w.records)
close(w.stopped)
}()
for {
select {
case <-w.stop:
return
case recs := <-w.records:
toCommit := w.listener.OnRecords(ctx, w.assignment, w.stop, recs)
if err := w.client.CommitRecords(ctx, toCommit...); err != nil {
w.logger.WithError(err).ErrorContext(ctx, "Problem committing records")
}
if w.shouldResume() {
// NOTE that a worker that doesn't have much of a backlog will emit "extra" resumes which Franz
// treats as no ops.
w.client.ResumeFetchPartitions(map[string][]int32{w.assignment.Topic: {w.assignment.Partition}})
}
}
}
}
func (w *worker) shouldResume() bool {
return float32(len(w.records)) <= w.resumeThreshold
}
func drainChannel(ch <-chan []*kgo.Record) {
for {
select {
case _, ok := <-ch:
if !ok {
return
}
default:
return
}
}
}
franzOpts = append(franzOpts,
kgo.OnPartitionsAssigned(c.onPartitionsAssigned),
kgo.OnPartitionsRevoked(c.onPartitionsLost),
kgo.OnPartitionsLost(c.onPartitionsLost),
// we are managing committing ourselves
kgo.DisableAutoCommit(),
)
About this issue
- Original URL
- State: closed
- Created a year ago
- Comments: 27 (13 by maintainers)
Please update to latest and let me know if it continues to happen