sarama: consumer group deadlock on close: consumer group shutdown hangs

Versions

Sarama Version: d84c59b2a2d87f185d91a1cc426a1f4d4e9365109fe0d96cbd2404c3a57c365a / release v1.22.0 Kafka Version: kafka_2.12-2.1.0.jar Go Version: go version go1.12.1 linux/amd64

Configuration

What configuration values are you using for Sarama and Kafka?

Kafka: a single topic with only 1 partition and 2 consumers in a single consumer group

Logs
goroutine 64 [chan receive]:
github.com/Shopify/sarama.(*consumerGroup).Consume(0xc000159ea0, 0x14ed720, 0xc0000c4020, 0xc000436ae0, 0x1, 0x1, 0x14e40a0, 0xc0000ba3c0, 0x0, 0x0)
	github.com/Shopify/sarama/consumer_group.go:175 +0x38b
internal/session.(*Service).Start.func1(0xc0000cc230)
created by internal/session.(*Service).Start

goroutine 78 [semacquire]:
sync.runtime_SemacquireMutex(0xc000159ef4, 0x900000000)
	/usr/lib/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc000159ef0)
	/usr/lib/go/src/sync/mutex.go:134 +0x1e2
github.com/Shopify/sarama.(*consumerGroup).Close.func1()
	github.com/Shopify/sarama/consumer_group.go:121 +0x94
sync.(*Once).Do(0xc000159f00, 0xc0000dff08)
	/usr/lib/go/src/sync/once.go:44 +0xdf
github.com/Shopify/sarama.(*consumerGroup).Close(0xc000159ea0, 0x0, 0x0)
	github.com/Shopify/sarama/consumer_group.go:118 +0x7a
internal/session.(*Service).Stop(0xc0000cc230)

goroutine 77 [select]:
github.com/Shopify/sarama.(*consumerGroupSession).heartbeatLoop(0xc000428500)
	github.com/Shopify/sarama/consumer_group.go:701 +0x7e0
created by github.com/Shopify/sarama.newConsumerGroupSession
	github.com/Shopify/sarama/consumer_group.go:505 +0x41b

goroutine 76 [select]:
github.com/Shopify/sarama.(*offsetManager).mainLoop(0xc0003fdce0)
	github.com/Shopify/sarama/offset_manager.go:226 +0x1ef
github.com/Shopify/sarama.withRecover(0xc0001da8f0)
	github.com/Shopify/sarama/utils.go:45 +0x51
created by github.com/Shopify/sarama.newOffsetManagerFromClient
	github.com/Shopify/sarama/offset_manager.go:70 +0x3ae
Problem Description

Consumer group call to Close() hangs (or deadlocks) on acquiring a mutex, because consume cannot finish when it is waiting for a session to complete but there were no particions assigned to this consumer instance.

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Reactions: 8
  • Comments: 20 (1 by maintainers)

Most upvoted comments

@PapayaJuice yes, but the gods of concurrent programming will not praise me. The main idea is to ignore happening data race in a bad case.

done := channel
go func() {
    for not_shutdown {
       cs.Consume()
    }
    cleanup()
    close(done)
}()

CloseConsumerGroup(cs, 10 * time.Second, done)
kafkaConsumeGroupClient.Close()

func CloseConsumerGroup(cs sarama.ConsumerGroup, timeout time.Duration, wait ...<-chan struct{}) {
	done := make(chan struct{})

	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()

	// if consumer group will try to close shutdown first, it may
	// hang on waiting for a consumer group session to complete
	// when there were no partitions assigned to this node. in that
	// case s.cs.Close() will hang until cluster re-balance will
	// happen or disconnect. if that is a case, the cs Close()
	// invocation will be a data race on closed client, but at least
	// it will finish.
	go func() {
		if err := cs.Close(); err != nil {
			log.Logger().WithError(err).Info("close consumer group")
		}

		close(done)
	}()

	select {
	case <-ctx.Done():
		// consumer did not exit
		log.Logger().WithError(context.DeadlineExceeded).Info("close consumer group")
	case <-done:
		// cs.Close() finished successfully,
		// consume exited and closed `done`
		for _, ch := range wait {
			select {
			case <-ctx.Done():
				// consumer did not exit
				log.Logger().WithError(context.DeadlineExceeded).Info("close consumer group: wait external chan")
				return
			case <-ch:
				// finished
			}
		}
	}
}

The main idea behind that code is that it spawns a goroutine which tries to close a consumer group. If it deadlocks, the client which is owned by the CS will be closed forcibly in a goroutine which called CloseConsumerGroup. In a good case, the close of the CS client has a happens-before relationship set by the waiting in CloseConsumerGroup. In the bad case, it will be closed with a data race. But whatever, the cs.Close() would not block an execution thread and shutdown will continue.

The wait channels in arguments are required for this function to wait for finishing of the goroutine which does the consumer group session loop. When cs.Consume() finishes this goroutine may have a desire to do some cleanup.