nats.net: MessageConsumer does not properly reconnect

Defect

When constructing a MessageConsumer using the simplified API, turning the server off and on again results in the consumer not being able to receive any more messages. This seems to be a bug in how the consumer reconnects after a disconnect - is this supported in the simplification API or should we use a different API right now if we need to be robust in the face of disconnects/reconnects?

Versions of NATS.Client and nats-server:

NATS.Client: 1.0.7 nats-server: 2.9.20

OS/Container environment:

WSL (Ubuntu 22.04 LTS).

Steps or code to reproduce the issue:

Using a modified version of the SimplificationMessageConsumer example:

using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using NATS.Client;
using NATS.Client.JetStream;

namespace NATSExamples
{
    internal static class MessageConsumerExample
    {
        private static readonly string STREAM = "consume-handler-stream";
        private static readonly string SUBJECT = "consume-handler-subject";
        private static readonly string CONSUMER_NAME = "consume-handler-consumer";
        private static readonly int STOP_COUNT = 5;

        private static readonly string SERVER = "nats://localhost:4222";

        public static void Main(String[] args)
        {
            Options opts = ConnectionFactory.GetDefaultOptions(SERVER);

            using (IConnection c = new ConnectionFactory().CreateConnection(opts))
            {
                IJetStreamManagement jsm = c.CreateJetStreamManagementContext();
                IJetStream js = c.CreateJetStreamContext();

                // set's up the stream and publish data
                // JsUtils.CreateOrReplaceStream(jsm, STREAM, SUBJECT);
                // in case the stream was here before, we want a completely new one
                try
                {
                    jsm.DeleteStream(STREAM);
                }
                catch (Exception)
                {
                }

                jsm.AddStream(StreamConfiguration.Builder()
                    .WithName(STREAM)
                    .WithSubjects(SUBJECT)
                    .Build());
                // get stream context, create consumer and get the consumer context
                IStreamContext streamContext;
                IConsumerContext consumerContext;
                try
                {
                    streamContext = c.CreateStreamContext(STREAM);
                    consumerContext = streamContext.CreateOrUpdateConsumer(ConsumerConfiguration.Builder().WithDurable(CONSUMER_NAME).Build());
                }
                catch (Exception)
                {
                    // possible exceptions
                    // - a connection problem
                    // - the stream or consumer did not exist
                    return;
                }

                CountdownEvent latch = new CountdownEvent(1);
                int count = 0;
                Stopwatch sw = Stopwatch.StartNew();
                EventHandler<MsgHandlerEventArgs> handler = (s, e) =>
                {
                    Console.WriteLine("Handler got a message...");
                    Thread.Sleep(1000);
                    e.Message.Ack();
                    if (count == STOP_COUNT)
                    {
                        latch.Signal();
                    }
                };

                using (IMessageConsumer consumer = consumerContext.Consume(handler))
                {
                    latch.Wait();
                    // once the consumer is stopped, the client will drain messages
                    Console.WriteLine("Stop the consumer...");
                    consumer.Stop(1000);
                    Thread.Sleep(1000); // enough for messages to drain after stop
                }

                Console.WriteLine("Done!");
            }
        }
    }
}

Start running the example, then publish messages via the CLI using the command nats pub consume-handler-subject Hello-World. After doing this a few times, stop the NATS server and restart it, then run the CLI publish command a few more times.

Expected result:

I expected that after stopping and restarting the NATS server and publishing more messages, the message consumer would receive the newly published messages.

Actual result:

Handler got a message...
Handler got a message...
Handler got a message...
PullStatusError, Connection: 30, Subscription: 2, ConsumerName:consume-handler-consumer, Status: Status 409 Server Shutdown
DisconnectedEvent, Connection: 30
ReconnectedEvent, Connection: 18
HeartbeatAlarm, Connection: 18, Subscription: 2, ConsumerName:consume-handler-consumer, lastStreamSequence: 3, lastConsumerSequence: 3
HeartbeatAlarm, Connection: 18, Subscription: 2, ConsumerName:consume-handler-consumer, lastStreamSequence: 3, lastConsumerSequence: 3
HeartbeatAlarm, Connection: 18, Subscription: 2, ConsumerName:consume-handler-consumer, lastStreamSequence: 3, lastConsumerSequence: 3

The consumer properly receives the first few messages and upon server restart, the consumer seems to reconnect with the ReconnectedEvent message. However, publishing more messages via the CLI does not result in the consumer getting the messages, and instead HeartbeatAlarm messages start popping up in the logs at a frequency of ~40 seconds. In other words, the MessageConsumer fails to properly reconnect.

About this issue

Most upvoted comments

Is there any timeline for when this API might be updated?

First. This whole time I thought we were talking about pull the entire time so let’s clarify.

  • There is push consumer, where the server “pushes” messages to the client.
  • There is pull consumer, where the client must request messages in batches that can be limited by time, count and bytes.
  • The simplification macro consuming use pull consumers under the covers.

Regarding push consumers… As it turns out I’m working on a push consumers example. Take a look at this piece of code I put together: https://github.com/nats-io/java-nats-examples/tree/main/robust-push-subscription

The thing is, many times the subscription will be able to resume, but there are several variables to consider.

  1. What is the inactive threshold of the consumer if it’s ephemeral and not durable
  2. What type of stream storage is there, file or memory
  3. Was the server that went down the stream leader or the consumer leader.
  4. Was that server also the one the client was connected to.
  5. What is the replication factor of the stream

The example I’m building relies on the heartbeat alarm warning to recognize when the consumer is no longer receiving messages. This is the same way the simplification consumers will recognize a stalled consumer. Pull consumers do not recover like push consumers because a pull request will get lost when it’s server goes down. The simplification endless consumers will try to recognize this and make a fresh pull.

There certainly is confusion in the docs. But when you make a raw pull, it is a one time command to the server to send messages with certain time/byte/message count parameters. If that pull is cutoff in the middle, there is no way for it to resume. The plan for simplification, which makes repeated use of raw pulls, is to recognize this situation and react to it by issuing a new pull to replace the failed one. Currently the plan is to wait for a [raw pull request] heartbeat error to know that the pull is not going to finish.

Your suggestion also seems to contradict the docs around automatic reconnections: https://docs.nats.io/using-nats/developer/connecting/reconnect. If new pulls are happening repeatedly inside the Consume, why don’t these pulls handle reconnection in the same way as doing another pull in the legacy API?