nats.net: MessageConsumer does not properly reconnect
Defect
When constructing a MessageConsumer
using the simplified API, turning the server off and on again results in the consumer not being able to receive any more messages. This seems to be a bug in how the consumer reconnects after a disconnect - is this supported in the simplification API or should we use a different API right now if we need to be robust in the face of disconnects/reconnects?
Versions of NATS.Client
and nats-server
:
NATS.Client: 1.0.7 nats-server: 2.9.20
OS/Container environment:
WSL (Ubuntu 22.04 LTS).
Steps or code to reproduce the issue:
Using a modified version of the SimplificationMessageConsumer
example:
using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using NATS.Client;
using NATS.Client.JetStream;
namespace NATSExamples
{
internal static class MessageConsumerExample
{
private static readonly string STREAM = "consume-handler-stream";
private static readonly string SUBJECT = "consume-handler-subject";
private static readonly string CONSUMER_NAME = "consume-handler-consumer";
private static readonly int STOP_COUNT = 5;
private static readonly string SERVER = "nats://localhost:4222";
public static void Main(String[] args)
{
Options opts = ConnectionFactory.GetDefaultOptions(SERVER);
using (IConnection c = new ConnectionFactory().CreateConnection(opts))
{
IJetStreamManagement jsm = c.CreateJetStreamManagementContext();
IJetStream js = c.CreateJetStreamContext();
// set's up the stream and publish data
// JsUtils.CreateOrReplaceStream(jsm, STREAM, SUBJECT);
// in case the stream was here before, we want a completely new one
try
{
jsm.DeleteStream(STREAM);
}
catch (Exception)
{
}
jsm.AddStream(StreamConfiguration.Builder()
.WithName(STREAM)
.WithSubjects(SUBJECT)
.Build());
// get stream context, create consumer and get the consumer context
IStreamContext streamContext;
IConsumerContext consumerContext;
try
{
streamContext = c.CreateStreamContext(STREAM);
consumerContext = streamContext.CreateOrUpdateConsumer(ConsumerConfiguration.Builder().WithDurable(CONSUMER_NAME).Build());
}
catch (Exception)
{
// possible exceptions
// - a connection problem
// - the stream or consumer did not exist
return;
}
CountdownEvent latch = new CountdownEvent(1);
int count = 0;
Stopwatch sw = Stopwatch.StartNew();
EventHandler<MsgHandlerEventArgs> handler = (s, e) =>
{
Console.WriteLine("Handler got a message...");
Thread.Sleep(1000);
e.Message.Ack();
if (count == STOP_COUNT)
{
latch.Signal();
}
};
using (IMessageConsumer consumer = consumerContext.Consume(handler))
{
latch.Wait();
// once the consumer is stopped, the client will drain messages
Console.WriteLine("Stop the consumer...");
consumer.Stop(1000);
Thread.Sleep(1000); // enough for messages to drain after stop
}
Console.WriteLine("Done!");
}
}
}
}
Start running the example, then publish messages via the CLI using the command nats pub consume-handler-subject Hello-World
. After doing this a few times, stop the NATS server and restart it, then run the CLI publish command a few more times.
Expected result:
I expected that after stopping and restarting the NATS server and publishing more messages, the message consumer would receive the newly published messages.
Actual result:
Handler got a message...
Handler got a message...
Handler got a message...
PullStatusError, Connection: 30, Subscription: 2, ConsumerName:consume-handler-consumer, Status: Status 409 Server Shutdown
DisconnectedEvent, Connection: 30
ReconnectedEvent, Connection: 18
HeartbeatAlarm, Connection: 18, Subscription: 2, ConsumerName:consume-handler-consumer, lastStreamSequence: 3, lastConsumerSequence: 3
HeartbeatAlarm, Connection: 18, Subscription: 2, ConsumerName:consume-handler-consumer, lastStreamSequence: 3, lastConsumerSequence: 3
HeartbeatAlarm, Connection: 18, Subscription: 2, ConsumerName:consume-handler-consumer, lastStreamSequence: 3, lastConsumerSequence: 3
The consumer properly receives the first few messages and upon server restart, the consumer seems to reconnect with the ReconnectedEvent
message. However, publishing more messages via the CLI does not result in the consumer getting the messages, and instead HeartbeatAlarm
messages start popping up in the logs at a frequency of ~40 seconds. In other words, the MessageConsumer
fails to properly reconnect.
About this issue
- Original URL
- State: open
- Created a year ago
- Comments: 20
Is there any timeline for when this API might be updated?
First. This whole time I thought we were talking about pull the entire time so let’s clarify.
Regarding push consumers… As it turns out I’m working on a push consumers example. Take a look at this piece of code I put together: https://github.com/nats-io/java-nats-examples/tree/main/robust-push-subscription
The thing is, many times the subscription will be able to resume, but there are several variables to consider.
The example I’m building relies on the heartbeat alarm warning to recognize when the consumer is no longer receiving messages. This is the same way the simplification consumers will recognize a stalled consumer. Pull consumers do not recover like push consumers because a pull request will get lost when it’s server goes down. The simplification endless consumers will try to recognize this and make a fresh pull.
There certainly is confusion in the docs. But when you make a raw pull, it is a one time command to the server to send messages with certain time/byte/message count parameters. If that pull is cutoff in the middle, there is no way for it to resume. The plan for simplification, which makes repeated use of raw pulls, is to recognize this situation and react to it by issuing a new pull to replace the failed one. Currently the plan is to wait for a [raw pull request] heartbeat error to know that the pull is not going to finish.
Your suggestion also seems to contradict the docs around automatic reconnections: https://docs.nats.io/using-nats/developer/connecting/reconnect. If new pulls are happening repeatedly inside the Consume, why don’t these pulls handle reconnection in the same way as doing another pull in the legacy API?