kafka-go: the kafka reader got an unknown error reading partition

First issue

we’re often getting a reader error ~500k/day.

the kafka reader got an unknown error reading partition 9 of SOME_TOPIC at offset 3: read tcp IP_ADDRESS:46406->IP_ADDRESS:9093: i/o timeout

https://github.com/segmentio/kafka-go/blob/a4890bd956b5658ca57e964dd28381c0ee4fd617/reader.go#L1366-L1375

Kafka-go 0.4.16

Kafka 2.5.0

Second issue

The errors happened when kafka reader is committing the message after it has been processed successfully, the message was re-consumed by another replica

“msg”: “debezium.Consumer: failed to commit message: write tcp IP_ADDRESS:49610->IP_ADDRESS:9093: use of closed network connection” Receiving Successfully handled message and a while afterward getting failed to commit message

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Comments: 16 (7 by maintainers)

Commits related to this issue

Most upvoted comments

#989 has been merged, let me know if you are still experiencing the issue on the latest version of kafka-go!

@mostafa, the issue has been detected and indeed allowing to configure the reads timeout will fix is so the user of the lib will be able to adjust the read timeouts based on their needs.

atm it’s hard coded to 10 sec. and increasing that will resolve it.

So I am waiting in approving my PR and merging it

And not sure the meaning of code below

	const safetyTimeout = 60 * time.Second // changed from 10 to 60
	deadline := time.Now().Add(safetyTimeout)
	conn.SetReadDeadline(deadline)

it overwrite the deadline which been set before and cannot be configured with MaxWait in Reader configure. Can we remove the code above? Refer to commented code below

func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
	r.stats.fetches.observe(1)
	r.stats.offset.observe(offset)

	t0 := time.Now()
	conn.SetReadDeadline(t0.Add(r.maxWait))

	batch := conn.ReadBatchWith(ReadBatchConfig{
		MinBytes:       r.minBytes,
		MaxBytes:       r.maxBytes,
		IsolationLevel: r.isolationLevel,
	})
	highWaterMark := batch.HighWaterMark()

	t1 := time.Now()
	r.stats.waitTime.observeDuration(t1.Sub(t0))

	var msg Message
	var err error
	var size int64
	var bytes int64

	//const safetyTimeout = 10 * time.Second
	//deadline := time.Now().Add(safetyTimeout)
	//conn.SetReadDeadline(deadline)

	for {
		//if now := time.Now(); deadline.Sub(now) < (safetyTimeout / 2) {
		//deadline = now.Add(safetyTimeout)
		//conn.SetReadDeadline(deadline)
		//}

		if msg, err = batch.ReadMessage(); err != nil {
			batch.Close()
			break
		}

		n := int64(len(msg.Key) + len(msg.Value))
		r.stats.messages.observe(1)
		r.stats.bytes.observe(n)

		if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
			batch.Close()
			break
		}

		offset = msg.Offset + 1
		r.stats.offset.observe(offset)
		r.stats.lag.observe(highWaterMark - offset)

		size++
		bytes += n
	}

	conn.SetReadDeadline(time.Time{})

	t2 := time.Now()
	r.stats.readTime.observeDuration(t2.Sub(t1))
	r.stats.fetchSize.observe(size)
	r.stats.fetchBytes.observe(bytes)
	return offset, err
}