google-cloud-go: pubsub: subscriber stuck without error in case of lots of messages in the subscription

Client

PubSub

Describe Your Environment

locally. But same behaviour on GKE.

I have a fairly simple subscriber (tried both synchronous and asynchronous) that should constantly check for new messages and handle a bulk of messages.

This is the code you can run to reproduce:

package main

import (
	"context"
	"os"
	"time"

	"cloud.google.com/go/pubsub"
	log "github.com/sirupsen/logrus"
)

func main() {
	ctx := context.Background()
	projectID := os.Getenv("GCLOUD_PROJECT_ID")
	subscriptionID := os.Getenv("SUBSCRIPTION_ID")

	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	sub := client.Subscription(subscriptionID)
	// sub.ReceiveSettings.Synchronous = true
	// sub.ReceiveSettings.MaxOutstandingMessages = 0
	// sub.ReceiveSettings.MaxOutstandingBytes = 1000
	// sub.ReceiveSettings.MaxExtension = 10
	for {
		// Receive messages constantly
		log.Infof("check for new messages in subscription %v...", subscriptionID)
		tctx, cancel := context.WithTimeout(ctx, 5*time.Second)
		defer cancel()
		// Create a channel to handle messages to as they come in.
		cm := make(chan *pubsub.Message)

		messages := 0
		finished := false
		go func() {
			for {
				select {
				case msg := <-cm:
					log.Infof("received message: %v", string(msg.Data))
					messages++
					// make acks conditional
					defer func() {
						if finished {
							msg.Ack()
						} else {
							msg.Nack()
						}
					}()
				case <-tctx.Done():
					if messages > 0 {
						log.Info("here we do something, only if it suceeds we acknowledge the messages")
						finished = true
					}
					log.Infof("number of messages: %v", messages)
					messages = 0
					return
				}
			}
		}()

		// Receive blocks until the context is cancelled or an error occurs.
		err = sub.Receive(tctx, func(ctx context.Context, msg *pubsub.Message) {
			cm <- msg
		})
		if err != nil && status.Code(err) != codes.Canceled {
			log.Errorf("Receive: %v", err)
		}
		close(cm)
	}
}

Expected Behavior

this program constantly checks for new messages in the subscription. If new messages arrive, it picks up as many messages as possible within 5 seconds, process them and acknowledge them. If a lot of messages arrive in the subscription, the program should still be able to handle them by processing bulks of messages one by another.

Actual Behavior

All works fine if there are only few messages in the subscription. If I start putting load onto it, say produce 2000 messages, the program gets stuck after printing the log info number of messages: X.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 15 (6 by maintainers)

Most upvoted comments

I realized that my application doesn’t get stuck if I execute the receiver synchronously by setting sub.ReceiveSettings.Synchronous = true. Then the only thing that does not work is to defer the acknowledgment. I want to defer it as I only want to acknowledge all messages (say, I have processed 1000 messages within the timeout of 5secs) if and only if certain operations succeed on ALL of these messages. Can I achieve that somehow?

Definitely. I wanted to clarify why Pub/Sub doesn’t redeliver immediately if your subscription’s ack deadline is too high. Let me look into this more and I’ll try to have an update by tomorrow.