nats-server: Consumers stops receiving messages

Defect

Versions of nats-server and affected client libraries used:

Nats server version

[83] 2021/09/04 18:51:12.239432 [INF] Starting nats-server
[83] 2021/09/04 18:51:12.239488 [INF]   Version:  2.4.0
[83] 2021/09/04 18:51:12.239494 [INF]   Git:      [219a7c98]
[83] 2021/09/04 18:51:12.239496 [DBG]   Go build: go1.16.7
[83] 2021/09/04 18:51:12.239517 [INF]   Name:     NBVE7O7DMRAZ63STC7Z644KHF5HJ6QQUGLZVGDIKEG32CFL2J6O2456M
[83] 2021/09/04 18:51:12.239533 [INF]   ID:       NBVE7O7DMRAZ63STC7Z644KHF5HJ6QQUGLZVGDIKEG32CFL2J6O2456M
[83] 2021/09/04 18:51:12.239605 [DBG] Created system account: "$SYS"

Go client version: v1.12.0

OS/Container environment:

GKE Kubernetes. Running nats js HA cluster. Deployed via nats helm chart.

Steps or code to reproduce the issue:

Stream configuration:

apiVersion: jetstream.nats.io/v1beta1
kind: Stream
metadata:
  name: agent
spec:
  name: agent
  subjects: ["data.*"]
  storage: file
  maxAge: 1h
  replicas: 3
  retention: interest

There are two consumers to this stream. Each runs as queue subscriber in two services with 2 pod replicas. Note that I don’t care if message is not processed, this is why ack none is set.


// 2 pods for service A.
js.QueueSubscribe(
	"data.received",
	"service1_queue",
	func(msg *nats.Msg) {},
	nats.DeliverNew(),
	nats.AckNone(),
)

// 2 pods for service B.
s.js.QueueSubscribe(
	"data.received",
	"service2_queue",
	func(msg *nats.Msg) {},
	nats.DeliverNew(),
	nats.AckNone(),
)

Expected result:

Consumer receives messages.

Actual result:

Stream stats after few days:

agent                  │ File    │ 3         │ 28,258   │ 18 MiB  │ 0    │ 84      │ nats-js-0, nats-js-1*, nats-js-2

Consumers stats:

service1_queue │ Push │ None       │ 0.00s    │ 0           │ 0           │ 0           │ 60,756    │ nats-js-0, nats-js-1*, nats-js-2
service2_queue │ Push │ None       │ 0.00s    │ 0           │ 0           │ 8,193 / 28% │ 60,843    │ nats-js-0, nats-js-1*, nats-js-2
  1. Non of the nats server pods contains errors indicating any problem.
  2. Unprocessed messages count for second consumer stays the same and doesn’t decrease.
  3. The only fix which helped is after I changed second consumer raft leader with nats consumer cluster step-down. But after some time problem still comes back.
  4. There are active connections to the server. Checked with nats server report connections.

/cc @kozlovic @derekcollison

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 38 (19 by maintainers)

Commits related to this issue

Most upvoted comments

On my case it was not the issue. I tested with the latest Go client v1.12.1 both at publisher and subscriber, server is running v2.4.0 and the issue was still happening.

Information for Consumer DB > call_db created 2021-09-06T14:11:26+01:00

Configuration:

        Durable Name: call_db
    Delivery Subject: _INBOX.3aXsLI9XmM6LsZkiynMv69
      Filter Subject: db.clients
        Deliver Next: true
          Ack Policy: Explicit
            Ack Wait: 1m0s
       Replay Policy: Instant
     Max Ack Pending: 40
        Flow Control: false

Cluster Information:

                Name: nats
              Leader: nats-2
             Replica: nats-1, current, seen 0.03s ago

State:

   Last Delivered Message: Consumer sequence: 481038 Stream sequence: 484177
     Acknowledgment floor: Consumer sequence: 481038 Stream sequence: 484177
         Outstanding Acks: 0 out of maximum 40
     Redelivered Messages: 0
     Unprocessed Messages: 1
Information for Consumer DB > error_db created 2021-09-06T14:11:26+01:00

Configuration:

        Durable Name: error_db
    Delivery Subject: _INBOX.3aXsLI9XmM6LsZkiynMv69
      Filter Subject: db.errors
        Deliver Next: true
          Ack Policy: Explicit
            Ack Wait: 1m0s
       Replay Policy: Instant
     Max Ack Pending: 40
        Flow Control: false

Cluster Information:

                Name: nats
              Leader: nats-2
             Replica: nats-1, current, seen 0.03s ago

State:

   Last Delivered Message: Consumer sequence: 1915 Stream sequence: 484177
     Acknowledgment floor: Consumer sequence: 1915 Stream sequence: 484177
         Outstanding Acks: 0 out of maximum 40
     Redelivered Messages: 0
     Unprocessed Messages: 1

and stream info


Information for Stream DB created 2021-09-06T14:11:23+01:00

Configuration:

             Subjects: db.*
     Acknowledgements: true
            Retention: File - Interest
             Replicas: 2
       Discard Policy: Old
     Duplicate Window: 2m0s
     Maximum Messages: unlimited
        Maximum Bytes: 1.0 GiB
          Maximum Age: 8h0m0s
 Maximum Message Size: 5.8 MiB
    Maximum Consumers: unlimited


Cluster Information:

                 Name: nats
               Leader: nats-1
              Replica: nats-2, current, seen 0.06s ago

State:

             Messages: 352
                Bytes: 51 KiB
             FirstSeq: 325 @ 2021-09-06T13:11:26 UTC
              LastSeq: 492,161 @ 2021-09-06T17:43:09 UTC
     Deleted Messages: 491485
     Active Consumers: 2

One thing I saw is that the number of Messages in the stream starts to increase (now it’s just 352 but increments from time to time) and the consumer stops consuming them, leaving the queue subscribers starving for new messages. The messages are not delivered at all.

go client 1.12.1

/usr/local/etc/nats-server-post.conf

port: 4222
net: 192.168.109.177
http: 192.168.109.177:4223	# monitoring

server_name: "nats-server-post"

# logging options
debug:         true # enable on reload
trace:         false # enable on reload

logtime:       true # enable on reload
log_file:         "/var/log/nats-server-post.log"

pid_file:         "/var/run/nats-server-post.pid" # change on reload

#max_control_line: 512 # change on reload
#ping_interval:    5 # change on reload
#ping_max:         1 # change on reload
#write_deadline:   "3s" # change on reload
#max_payload:      1024 # change on reload

jetstream {
  max_mem = 10mb
  max_file = 900mb
  store_dir = "/var/nats"
}

It happened tonight but a reset has occurred and server has been transferred to another virtual machine of the provider.

Then it started but without getting messages.

Maybe it happens when the jetstream file gets corrupted.

I hope it helps

func main() {
    check_option()
    log_init()
    go capture_signals()
    db = db_connect(opt.db)
    db_read_sensor("")
    js = natsConnect()
    go natsPullSubscribe_sensor()
    go natsSubscribe_vehicleData()
    go natsPullSubscribe_track()
    go natsPullSubscribe_dbcode()
    go natsPullSubscribe_dbcode_slow()

    select{}
}



func natsConnect() (nats.JetStreamContext) {

    var err error
    nc, err = nats.Connect(
                    opt.broker,
                    nats.Name(NATS_CLIENT_NAME),
                    nats.RetryOnFailedConnect(true),
                    nats.MaxReconnects(-1),
                    nats.ReconnectWait(3 * time.Second),
                    nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
                        if !nc.IsClosed() {
                            fmt.Printf("nats disconnected due to: %s\n", err)
                            log_to_file(LOG_WARN, "nats disconnected due to: %s", err)
                        }
                    }),
                    nats.ReconnectHandler(func(nc *nats.Conn) {
                        fmt.Printf("nats reconnected [%s]", nc.ConnectedUrl())
                        log_to_file(LOG_INFO, "nats reconnected [%s]", nc.ConnectedUrl())
                    }),
                    nats.ClosedHandler(func(nc *nats.Conn) {
                        if !nc.IsClosed() {
                            fmt.Printf("nats Exiting: no servers available\n")
                            log_to_file(LOG_INFO, "nats Exiting: no servers available")
                        } else {
                            fmt.Printf("nats Exiting\n")
                            log_to_file(LOG_INFO, "nats Exiting")
                        }
                    }))

    if err != nil {
        fmt.Printf("nats nats.Connect() err: %v\n", err)
        log_to_file(LOG_INFO, "nats nats.Connect() err: %v", err)
    }
    //defer ncPost.Close()

    js, err := nc.JetStream()
    if err != nil {
        fmt.Printf("nats nc.JetStram error: %v\n", err)
        log_to_file(LOG_INFO, "nats nc.JetStram error: %v", err)
    }

    stream, err := js.StreamInfo(NATS_STREAM_NAME)
    if err != nil {
        fmt.Printf("nats js.StreamInfo error: %v\n", err)
        log_to_file(LOG_INFO, "nats js.StreamInfo error: %v", err)
    }

    if stream == nil {
        fmt.Printf("nats creating stream %q\n", NATS_STREAM_NAME)
        log_to_file(LOG_INFO, "nats creating stream %q", NATS_STREAM_NAME)
        _, err = js.AddStream(&nats.StreamConfig{
            Name:           NATS_STREAM_NAME,
            Subjects:       []string{
                                    NATS_SUBJECT_TRACK,
                                    NATS_SUBJECT_SENSOR,
                                    NATS_SUBJECT_VEHICLEDATA,
                                    NATS_SUBJECT_DBCODE,
                                    NATS_SUBJECT_DBCODE_SLOW,
                                    },
            NoAck:          false,
            //Retention:      nats.LimitsPolicy,    // to multiple consumers
            Retention:      nats.WorkQueuePolicy,   // to delete messages as they are acked
            Duplicates:     15 * time.Minute,
            Storage:        nats.FileStorage,
            Discard:        nats.DiscardOld,
            MaxAge:         7 * 24 * time.Hour,
            MaxConsumers:   -1,
            MaxMsgSize:     -1,
            Replicas:       1,
            MaxMsgs:        -1,
            MaxBytes:       419430400,   // 104857600 = 100mb
        })

        if err != nil {
            fmt.Printf("nats js.AddStream error: %v\n", err)
            log_to_file(LOG_INFO, "nats js.AddStream error: %v", err)
            os.Exit(1)
        }
    }

    return js
}


func natsPullSubscribe_sensor() {

    for {
        sub, err := js.PullSubscribe(
                            NATS_SUBJECT_SENSOR,
                            NATS_DURABLE_SENSOR,
                            nats.ManualAck(),
                            nats.DeliverAll(),
                            nats.MaxDeliver(20000),
                            nats.BindStream(NATS_STREAM_NAME),
                            )

        if err != nil {
            log_to_file(LOG_INFO, "NATS_PULL_SENSOR: js.PullSubscribe() err: %v", err)
        }

        for {

            if !sub.IsValid() {
                log_to_file(LOG_INFO, "NATS_PULL_SENSOR: sub invalid")
                break
            }

            msgs, err := sub.Fetch(200, nats.MaxWait(6 * time.Second))

            if err != nil {
                log_to_file(LOG_INFO, "NATS_SENSOR_PULL: sub.Fetch() err: %v", err)
                if err == nats.ErrTimeout {
                    //time.Sleep(2000 * time.Millisecond)  //TODO: Borrar linea
                    continue
                }

                break
            }

            process_sensors_msgs(msgs)
        }

        time.Sleep(1 * time.Second)
        sub.Unsubscribe()
        log_to_file(LOG_INFO, "NATS_PULL_SENSOR: new js.PullSubscribe")
    }

    return
}


func natsSubscribe_vehicleData() {

    js.Subscribe(NATS_SUBJECT_VEHICLEDATA, func(msg *nats.Msg) {
        log_to_file(LOG_INFO, "NATS_SUB_VEHICLEDATA: subject:%s, msg:%s", msg.Subject, msg.Data)
        process_vehicleData_msg(msg)

        //err = m.Ack()
        //if err != nil {
        //    fmt.Printf("Error on Ack: %v\n", err)
       // }

    },
    nats.Durable(NATS_DURABLE_VEHICLEDATA),
    nats.ManualAck(),
    nats.DeliverAll(),
    //nats.AckWait(3 * time.Second),
    nats.MaxDeliver(20000),
    nats.BindStream(NATS_STREAM_NAME),
    )

    return
}


func natsPullSubscribe_track() {

    for {
        sub, err := js.PullSubscribe(
                                NATS_SUBJECT_TRACK,
                                NATS_DURABLE_TRACK,
                                nats.ManualAck(),
                                nats.DeliverAll(),
                                nats.MaxDeliver(10000),
                                nats.BindStream(NATS_STREAM_NAME),
                                )

        if err != nil {
            log_to_file(LOG_INFO, "NATS_PULL_TRACK: js.PullSubscribe() err: %v", err)
        }

        for {
            msgs, err := sub.Fetch(1)
            if err != nil {
                log_to_file(LOG_INFO, "NATS_TRACK_PULL: sub.Fetch() err: %v", err)
                if err == nats.ErrTimeout {
                    //time.Sleep(2000 * time.Millisecond)  //TODO: Borrar linea
                    continue
                }

                break
            }

            for _, msg := range msgs {
                log_to_file(LOG_INFO, "NATS_PULL_TRACK: subject:%s, msg:%s", msg.Subject, msg.Data)
                process_track_msg(msg)
            }
        }

        time.Sleep(1 * time.Second)
        sub.Unsubscribe()
        log_to_file(LOG_INFO, "NATS_PULL_TRACK: new js.PullSubscribe")
    }

    return
}


func natsPullSubscribe_dbcode() {

    for {
        sub, err := js.PullSubscribe(
                                NATS_SUBJECT_DBCODE,
                                NATS_DURABLE_DBCODE,
                                nats.ManualAck(),
                                nats.DeliverAll(),
                                nats.MaxDeliver(10000),
                                nats.BindStream(NATS_STREAM_NAME),
                                )

        if err != nil {
            log_to_file(LOG_INFO, "NATS_PULL_DBCODE: js.PullSubscribe() err: %v", err)
        }

        for {
            msgs, err := sub.Fetch(1)

            if err != nil {
                log_to_file(LOG_INFO, "NATS_DBCODE_PULL: sub.Fetch() err: %v", err)
                if err == nats.ErrTimeout {
                    //time.Sleep(2000 * time.Millisecond)  //TODO: Borrar linea
                    continue
                }
                break
            }

            for _, msg := range msgs {
                log_to_file(LOG_INFO, "NATS_PULL_DBCODE: subject:%s, msg:%s", msg.Subject, msg.Data)
                process_dbcode_msg(msg)
            }
        }

        time.Sleep(1 * time.Second)
        sub.Unsubscribe()
        log_to_file(LOG_INFO, "NATS_PULL_DBCODE: new js.PullSubscribe")
    }

    return
}

@derekcollison sorry again but if you have a fast consumer (remove the sleep in the subscribe handker) the error is triggered again.

Versions of nats-server and affected client libraries used:

nats-server v2.5.0 nats-client v1.12.1

Steps or code to reproduce the issue:

This time I have a fast consumer connecting to a stream which already has ~50k messages and when the consumers start (15 this time), the receiving rate is high (fast consumer(no sleep) + a lot of messages to process) but after a while the receive rate is down to zero.

Publish some messages to the stream without consuming them and connect the consumers after it.

package main

import (
	"fmt"
	"sync/atomic"
	"time"

	"github.com/nats-io/nats.go"
)

func main() {
c2, err := nats.Connect("nats://192.168.1.200:4222")
	if err != nil {
		panic(err)
	}
	js2, err := c2.JetStream()
	if err != nil {
		panic(err)
	}
	recivedMsg := new(int64)
	publishedMsg := new(int64)
	go func() {
		t := time.NewTicker(5 * time.Second)
		var receivedOldMsg int64

		var publishedOldMsg int64

		for range t.C {
			receivedMsg := atomic.LoadInt64(recivedMsg)
			publishedMsg := atomic.LoadInt64(publishedMsg)
			fmt.Println("received", receivedMsg-receivedOldMsg, "published", publishedMsg-publishedOldMsg)
			receivedOldMsg = receivedMsg
			publishedOldMsg = publishedMsg
		}
	}()
	_, err = js2.AddStream(&nats.StreamConfig{
		Name:         "cluster",
		Subjects:     []string{"cluster.*"},
		Retention:    nats.InterestPolicy,
		Discard:      nats.DiscardOld,
		MaxAge:       8 * time.Hour,
		Storage:      nats.FileStorage,
		MaxConsumers: 0,
		MaxMsgs:      0,
		MaxBytes:     1073741824,

		MaxMsgSize: 6048576,

		Replicas: 2,
	})
	if err != nil {
		fmt.Println(err)
	}
	inbox := nats.NewInbox()
	_, err = js2.AddConsumer("cluster", &nats.ConsumerConfig{
		Durable:        "test_cluster_created",
		DeliverGroup:   "test_cluster_created",
		DeliverSubject: inbox,
		DeliverPolicy:  nats.DeliverAllPolicy,
		FilterSubject:  "cluster.created",
		AckPolicy:      nats.AckExplicitPolicy,
		AckWait:        60000000000,
		MaxAckPending:  65535,
		MaxDeliver:     0,
		RateLimit:      6553500000,
		//MaxWaiting:     5000,
	})

	if err != nil {
		fmt.Println(err)
	}
	inbox = nats.NewInbox()

	_, err = js2.AddConsumer(
		"cluster",
		&nats.ConsumerConfig{
			Durable:        "test2_cluster_created",
			DeliverGroup:   "test2_cluster_created",
			DeliverSubject: inbox,
			DeliverPolicy:  nats.DeliverAllPolicy,
			FilterSubject:  "cluster.sapo",
			AckPolicy:      nats.AckExplicitPolicy,
			AckWait:        60000000000,
			MaxAckPending:  65535,
			MaxDeliver:     0,

			//MaxWaiting:     5000,
		},
	)
	if err != nil {
		fmt.Println(err)
	}

	for i := 0; i < 15; i++ {
		go func() {
			b := make([]byte, 6500)
			for i := 0; i < len(b); i++ {
				b[i] = byte(i % 256)
			}
			c, err := nats.Connect("nats://192.168.1.200:4222")
			if err != nil {
				panic(err)
			}
			js, err := c.JetStream()
			if err != nil {
				panic(err)
			}
			for {
				_, err = js.Publish("cluster.created", b)
				if err != nil {
					fmt.Println(err)
				}

				atomic.AddInt64(publishedMsg, 1)
			}
		}()
	}

	time.Sleep(60 * time.Second)
	for i := 0; i < 15; i++ {
		time.Sleep(500 * time.Millisecond)
		go func() {

			c, err := nats.Connect("nats://192.168.1.200:4222", nats.ErrorHandler(func(c *nats.Conn, s *nats.Subscription, e error) {

				if err != nil {
					fmt.Println("slow", err)
				}

			}), nats.DisconnectErrHandler(func(c *nats.Conn, e error) {
				if err != nil {
					fmt.Println("slow2", err)
				}

			}),
				nats.DisconnectHandler(func(c *nats.Conn) {
					fmt.Println("disconnected")
				}),
				nats.Timeout(5*time.Second),
			)
			if err != nil {
				panic(err)
			}
			js, err := c.JetStream()
			if err != nil {
				panic(err)
			}
		
			_, err = js.QueueSubscribe(
				"cluster.created",
				"test_cluster_created",
				func(msg *nats.Msg) {
					//time.Sleep(50 * time.Millisecond)
					msg.Ack()
					atomic.AddInt64(recivedMsg, 1)
				},
				nats.ManualAck(),
				nats.Bind("cluster", "test_cluster_created"),
			)
			if err != nil {
				panic(err)
			}

			_, err = js.QueueSubscribe("cluster.sapo", "test2_cluster_created",
				func(msg *nats.Msg) {
					//	msg.InProgress(nats.AckWait(45 * time.Second))
					//	time.Sleep(50 * time.Millisecond)
					msg.Ack()
					atomic.AddInt64(recivedMsg, 1)
				},
				nats.ManualAck(),
				nats.Bind("cluster", "test2_cluster_created"),
			)
			if err != nil {
				panic(err)
			}

		}()
	}

	time.Sleep(50 * time.Minute)
}

services:
  n1:
    container_name: n1
    image: nats:2.4.0-alpine
    entrypoint: /usr/local/bin/nats-server
    command: "--config /config/jetstream.conf --server_name S1"
    networks:
      - nats
    ports:
      - 4222:4222
    volumes:
      - ./config:/config
      - ./persistent-data/server-n1/:/data/nats-server/jetstream

  n2:
    container_name: n2
    image: nats:2.4.0-alpine
    entrypoint: /usr/local/bin/nats-server
    command: "--config /config/jetstream.conf --server_name S2"
    networks:
      - nats
    ports:
      - 4223:4222
    volumes:
      - ./config:/config
      - ./persistent-data/server-n2/:/data/nats-server/jetstream

  n3:
    container_name: n3
    image: nats:2.4.0-alpine
    entrypoint: /usr/local/bin/nats-server
    command: "--config /config/jetstream.conf --server_name S3"
    networks:
      - nats
    ports:
      - 4224:4222
    volumes:
      - ./config:/config
      - ./persistent-data/server-n3/:/data/nats-server/jetstream

networks:
  nats: {}

config

debug: true
trace: false

# Each server can connect to clients on the internal port 4222
# (mapped to external ports in our docker-compose)
port: 4222

# Persistent JetStream data store
jetstream = {
  # Each server persists messages within the docker container
  # at /data/nats-server (mounted as ./persistent-data/server-n…
  # in our docker-compose)
  store_dir: "/data/nats-server/"
}

# Cluster formation
cluster = {
  name: "JSC"
  listen: "0.0.0.0:4245"

  # Servers can connect to one another at
  # the following routes
  routes = [
    "nats://n1:4245"
    "nats://n2:4245"
    "nats://n3:4245"
  ]
}

accounts: {
  SYS: { users: [ {user: "admin", password: "admin"} ] },
}
system_account: SYS

Expected result:

Constant or nearly constant receive rate

Actual result:

received 7359 published 524
received 7565 published 932
received 6338 published 1597
received 3789 published 3036
received 2994 published 2995
received 3215 published 3213
received 2715 published 3411
received 0 published 5428
received 0 published 4084
received 0 published 1212
received 0 published 5039
received 5022 published 929
received 6718 published 515
received 7219 published 805
received 5491 published 2841
received 0 published 5572
received 0 published 5891
received 0 published 5086
received 0 published 5526
received 0 published 5554
received 0 published 5602
received 0 published 5697
received 0 published 4859
received 4043 published 2376
received 7001 published 559
received 7217 published 774
received 6508 published 1759
received 925 published 4550
received 1152 published 4181
received 1183 published 4401
received 1206 published 4524
received 1131 published 4187
received 1134 published 4260
received 1067 published 3999
received 1136 published 4307
received 3435 published 2871
received 7431 published 631
received 7376 published 747
received 6563 published 1651
received 7337 published 1342
received 5672 published 1833
received 0 published 5433
received 0 published 5587
received 0 published 5596
received 0 published 5971
received 0 published 5947
received 0 published 5500
received 1764 published 4305
received 7675 published 728
received 7739 published 769
received 7630 published 1337
received 7743 published 1081
received 8093 published 469
received 6415 published 1835
received 0 published 4383
received 0 published 1047
received 0 published 4644
received 0 published 4969
received 0 published 5040
received 0 published 2576
received 0 published 3291
received 0 published 4753
received 0 published 3306
received 0 published 5168
received 0 published 4604
received 0 published 4768
received 3615 published 3155
received 3158 published 3490
received 0 published 4845
received 0 published 4764
received 0 published 4221
received 0 published 4754
received 0 published 5155
received 0 published 4681
received 0 published 4870
received 0 published 4762
received 0 published 5535
received 0 published 5560
received 0 published 4908
received 0 published 4790
received 0 published 4897
received 0 published 4730
received 0 published 4589
received 0 published 4620
received 0 published 4551
received 0 published 4209
received 0 published 4515
received 0 published 4754
received 0 published 4612
received 0 published 5160
received 0 published 5116
received 0 published 5159
received 0 published 5013

On this case the receivers started to receive messages again after a while but for most of my tests it never recovered

Is it better to create a new issue?