azure-sdk-for-net: Service Bus subscriber stops processing messages

Describe the bug After 5000 messages are processed by the subscriber, the subscriber doesn’t accept any more messages. No exception are thrown by the subscriber.

Async message delegate doesn’t receive any more messages. Any sync/async operation locks indefinitely.

Exception or Stack Trace No exception are thrown.

To Reproduce Execute snippet below

Code Snippet

static readonly ConcurrentBag<Tuple<string, string>> messageRefs = new ConcurrentBag<Tuple<string, string>>();

        static void Main(string[] args)
        {
            /*
                * Subscription Details
                * Max delivery count: 10
                * Lock Duration: 2minutes
            */

            var receiver = CreateSubscriptionClient();
            var options = new MessageHandlerOptions(ExceptionReceivedHandler)
            {
                MaxConcurrentCalls = 5,
                AutoComplete = false,
            };

            receiver.RegisterMessageHandler(async (message, cancellation) =>
            {
                try
                {
                    messageRefs.Add(new Tuple<string, string>(message.MessageId.ToString(), message.SystemProperties.LockToken));

                    throw new Exception("something bad happens...");
                    await receiver.CompleteAsync(message.SystemProperties.LockToken);
                }
                catch (Exception)
                {
                    //we do not want to return the message immediately (receiver.AbandonAsync)
                    //let the message lock (2min) expire before this message being re-processed
                }
            }, options);

            while (true) //monitor
            {
                Console.WriteLine($"Message read: {messageRefs.Count}");
                if (MONITOR_VERBOSE)
                    Console.WriteLine(
                        String.Join(
                            Environment.NewLine,
                            messageRefs
                                .GroupBy(x => x.Item1)
                                .OrderByDescending(x => x.Count())
                                .Take(10)
                                .Select(x => $"Id: {x.Key} | Locks: {x.Count()}")));
                Thread.Sleep(5000);
            }
        }

Expected behavior

We are aware of the max 5000 concurrent connections to subscriber entities while using the amqp protocol. We are although expecting those connections to be released, when the message lock expires.

Specifically, we want the lock (30secs) to expire before the message being visible again. As indeed highlighted from the previous snippet, after the lock expires, the message is picked up again by the same subscriber, but the previous connection is not released, bringing the subscriber into a locked state

Additional context Same behavior from WindowsAzure.ServiceBus library.

Setup (please complete the following information):

  • OS: Win10
  • 3.4.0

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 32 (14 by maintainers)

Most upvoted comments

This is a client bug which impacts the server as well. Think about customers using native AMQP library instead of our library which sets the value to 5000. If they set the value to int.MaxValue and forget to acknowledge the message, the service will also end up keeping the whole state in memory (as is required by AMQP). I don’t want to propose final solution right now as it needs some discussion and thinking through.

Hi @nemakam , thanks for coming back on this one. Late is better than ever 😃

As explained in this thread, we initially intentionally avoided releasing the messages to leverage the peak lock timeout before the message becoming visible again. That was a crude way to time retries over transient errors, using the oob delivery count increase and the automatic deadlettering after n retries.

We then misinterpreted the docs on that 5000 limit. Finally we ended up implementing our own logic for delayed retries and dead lettering.

Regarding the point you made about protecting the server from “dumb” clients, I can give you my 5cent as the library consumer. We would have really preferred having the library throwing exceptions back to us, rather than having production systems locking up because of a “unintended” utilization of the library itself.

That would have saved us time and headaches trying to get to the bottom of it. I hope my feedback helped!

Cheers

@federicobarera Sorry for extremely late response. Somehow lost the visibility of this thread. This is the behavior on our implementation of AMQP. AMQP is a stateful link and keeps the state of all the messages received in its cache. So it is always advisable to abandon/complete the message when you are done processing it. If you don’t the state reaches its limit of 5000. This is unfortunately by design.

Fixes -

  1. Abandon / Complete / DeadLetter the message to free up the “amqp-delivery” on your receiver.
  2. Alternatively, recycle the receiver which will create a new link, and hence a new 5000 limit.

Based on this discussion, I am going to say you are only on 1 connection and this has nothing to do with connection limit.

I’ll add a backlog item on the service team’s board to figure out a better solution for this so that it becomes evident on the client. Two approaches for long term fixes -

  1. Client realizes it reached 5000 window size and next Receive() will throw an exception.
  2. Alternatively, server will close the link when this happens.