franz-go: Client stops consuming certain partitions

Hi,

I am using the Franz library to consume messages from an Azure Event Hub topic. I have 2 consumers per region, and the Event Hub in the central region. My consumer is based on the [goroutine_per_partition_consuming](https://github.com/twmb/franz-go/blob/master/examples/goroutine_per_partition_consuming/manual_commit/main.go) with manual committing, but I am not blocking a rebalance on polls. I am also using partition “flow control” to stop fetching partitions that have a backlog of records to process. I do this to ensure the polling go routine does not block waiting to send records to a worker.

What I am seeing is that sometimes the fetch no longer returns records for a partition, but I haven’t been able to pinpoint why. I am wondering if it is a race condition between the pause/resume and rebalancing. I am hoping by sharing my code, you’ll be able to pinpoint whether I am doing something wrong, or whether there is a race condition in the library that is causing this odd behavior. Please note that there are some interfaces introduced in the code below for unit testing purposes.

package franz

import (
	"context"
	"sync"

	"github.com/XXX/log"
	"github.com/twmb/franz-go/pkg/kgo"
)

// newCoordinator will create a new coordinator that will deliver events to the specified [RecordListener]
func newCoordinator(logger log.Logger, l RecordListener) *coordinator {
	const defaultBufferSize = 10
	return newCoordinatorWithBufferSize(logger, l, defaultBufferSize)
}

func newCoordinatorWithBufferSize(logger log.Logger, l RecordListener, size int) *coordinator {
	return &coordinator{
		logger:   logger,
		listener: l,

		mu:         &sync.RWMutex{},
		workers:    make(map[TopicPartition]*worker),
		bufferSize: size,
		// we will pause fetching for a partition if the channel for a worker is at least 50% capacity
		pauseThreshold: 0.50 * float32(size),
	}
}

// a coordinator is used for managing a Kafka consumer group member and sending records to workers it is managing
type coordinator struct {
	logger log.Logger

	// a listener is the application code that a worker will deliver a batch of records to
	listener RecordListener

	// mu is used to guard the workers map since it is accessed in the run loop and via the consumer group management
	// callbacks
	mu      *sync.RWMutex
	workers map[TopicPartition]*worker

	// bufferSize is used to size the channel that is used to send records to a worker
	bufferSize int
	// pauseThreshold is the number of items in the records channel over which we will pause fetching data for this
	// [TopicPartition]/worker
	pauseThreshold float32
}

// onPartitionsAssigned is called when a group is joined after partitions are assigned before fetches for those
// partitions begin. we use it to create workers based upon the topic/partitions we have been assigned.
// NOTE - This function is not called concurrent with revoke or lost. This function can be called at any time
// while we are polling or processing records.
func (c *coordinator) onPartitionsAssigned(ctx context.Context, client *kgo.Client, assigned map[string][]int32) {
	if len(assigned) == 0 {
		return
	}
	c.handlePartitionsAssigned(ctx, client, assigned)
}

func (c *coordinator) handlePartitionsAssigned(ctx context.Context, client *kgo.Client, assigned map[string][]int32) {
	// this wait group is used to ensure the workers are running before returning
	var wg sync.WaitGroup
	defer wg.Wait()

	for topic, partitions := range assigned {
		for _, partition := range partitions {
			assignment := TopicPartition{topic, partition}

			w := c.newWorker(c.logger, client, assignment)

			c.mu.Lock()
			c.workers[assignment] = w
			c.mu.Unlock()
			wg.Add(1)

			go func() {
				wg.Done()
				w.run(ctx)
			}()
		}
	}
}

func (c *coordinator) newWorker(clog log.Logger, client *kgo.Client, assignment TopicPartition) *worker {
	return &worker{
		logger:     clog,
		client:     client,
		listener:   c.listener,
		assignment: assignment,

		stop:    make(chan struct{}),
		stopped: make(chan struct{}),
		// we make it a buffered channel so that the [coordinator.doRun] loop is not blocked while waiting
		// for a worker to pick up the records
		records: make(chan []*kgo.Record, c.bufferSize),

		// we want to resume fetching for this partition if the *used* channel capacity is <= 25%
		resumeThreshold: 0.25 * float32(c.bufferSize),
	}
}

// onPartitionsLost is called once this group member has partitions lost or revoked. We use it to stop any workers
// whose partitions we are losing. Since the worker is doing the committing, the coordinator will send a stop signal
// to each worker and wait until each is stopped before preceding. This blocking action is required so that the
// rebalance doesn't proceed until we have committed our work.
func (c *coordinator) onPartitionsLost(ctx context.Context, _ *kgo.Client, lost map[string][]int32) {
	if len(lost) == 0 {
		return
	}
	c.handlePartitionsLost(lost)
}

func (c *coordinator) handlePartitionsLost(lost map[string][]int32) {
	var wg sync.WaitGroup
	defer wg.Wait()

	for topic, partitions := range lost {
		for _, partition := range partitions {
			assignment := TopicPartition{topic, partition}

			c.mu.Lock()
			w := c.workers[assignment]
			delete(c.workers, assignment)
			c.mu.Unlock()

			close(w.stop)
			wg.Add(1)

			go func(a TopicPartition) {
				<-w.stopped

				wg.Done()
			}(assignment)
		}
	}
}

func (c *coordinator) run(ctx context.Context, cl *kgo.Client) {
	for {
		f := cl.PollFetches(ctx)
		if f.IsClientClosed() {
			return
		}

		f.EachError(func(topic string, part int32, err error) {
			c.logger.WithError(err).ErrorContext(ctx, "Problem occurred while fetching records")
		})

		f.EachPartition(func(p kgo.FetchTopicPartition) {
			// the library is giving zero records occasionally for a partition. Need to look into this to see
			// as it seems like a bug.
			if len(p.Records) == 0 {
				return
			}

			tp := TopicPartition{p.Topic, p.Partition}

			c.mu.RLock()
			w, ok := c.workers[tp]
			c.mu.RUnlock()
			if ok {
				c.sendRecordsToWorker(tp, cl, w, p)
			}
		})
	}
}

func (c *coordinator) sendRecordsToWorker(tp TopicPartition, client *kgo.Client, w *worker, f kgo.FetchTopicPartition) {
	w.records <- f.Records

	if _, pause := c.shouldPause(w); pause {
		// pause fetching more records for this partition until the worker finishes makes progress. NOTE
		// that the Franz client buffers records internally, so even after we pause this partition, we
		// may get a few batches delivered for this worker. This is the reason we needed a buffered channel with
		// a larger capacity; so we don't block delivering records to the worker
		client.PauseFetchPartitions(map[string][]int32{tp.Topic: {tp.Partition}})
	}
}

func (c *coordinator) shouldPause(w *worker) (int, bool) {
	l := len(w.records)
	return l, float32(l) > c.pauseThreshold
}

type worker struct {
	logger log.Logger

	// the Topic and Partition that this worker is handling records for
	assignment TopicPartition
	client     *kgo.Client
	listener   RecordListener

	// stop is used by the coordinator to ask a worker to stop what it is doing, and exit
	stop chan struct{}
	// stopped will be closed when this worker has stopped, i.e. terminated
	stopped chan struct{}
	records chan []*kgo.Record

	// this is the number of items in the records channel under which we will resume fetching data for this
	// [TopicPartition]
	resumeThreshold float32
}

func (w *worker) run(ctx context.Context) {
	defer func() {
		// we drain the records channel in the unlikely event that the fetch loop is blocked trying to put more
		// records into this channel
		drainChannel(w.records)
		close(w.stopped)
	}()

	for {
		select {
		case <-w.stop:
			return
		case recs := <-w.records:
			toCommit := w.listener.OnRecords(ctx, w.assignment, w.stop, recs)

			if err := w.client.CommitRecords(ctx, toCommit...); err != nil {
				w.logger.WithError(err).ErrorContext(ctx, "Problem committing records")
			}

			if w.shouldResume() {
				// NOTE that a worker that doesn't have much of a backlog will emit "extra" resumes which Franz
				// treats as no ops.
				w.client.ResumeFetchPartitions(map[string][]int32{w.assignment.Topic: {w.assignment.Partition}})
			}
		}
	}
}

func (w *worker) shouldResume() bool {
	return float32(len(w.records)) <= w.resumeThreshold
}

func drainChannel(ch <-chan []*kgo.Record) {
	for {
		select {
		case _, ok := <-ch:
			if !ok {
				return
			}
		default:
			return
		}
	}
}
	franzOpts = append(franzOpts,
		kgo.OnPartitionsAssigned(c.onPartitionsAssigned),
		kgo.OnPartitionsRevoked(c.onPartitionsLost),
		kgo.OnPartitionsLost(c.onPartitionsLost),
		// we are managing committing ourselves
		kgo.DisableAutoCommit(),
	)

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 27 (13 by maintainers)

Most upvoted comments

Please update to latest and let me know if it continues to happen