stan.js: Error on large number of subscriptions

@aricart

I am trying to publish messages to 1 million subjects and subscribing to those subjects from another process. I get the following error:

NatsError: Subject must be supplied
    at Client.publish (/root/node_modules/nats/lib/nats.js:1022:13)
    at Message.ack (/root/node_modules/node-nats-streaming/lib/stan.js:804:24)
    at Message.maybeAutoAck (/root/node_modules/node-nats-streaming/lib/stan.js:791:10)
    at Object.callback (/root/node_modules/node-nats-streaming/lib/stan.js:690:11)
    at Client.processMsg (/root/node_modules/nats/lib/nats.js:957:11)
    at Client.processInbound (/root/node_modules/nats/lib/nats.js:885:14)
    at Socket.<anonymous> (/root/node_modules/nats/lib/nats.js:468:12)
    at emitOne (events.js:96:13)
    at Socket.emit (events.js:188:7)
    at readableAddChunk (_stream_readable.js:176:18)

Publisher code:

var stan = require('node-nats-streaming').connect('test-cluster', 'test',{maxPubAcksInflight: 1100000});
let total = 1000000;
stan.on('connect', function () {
  for(let i=0;i<total;i++)
  {
    stan.publish('foo'+i, 'Hello node-nats-streaming:' + i);
    if(i%1000 == 0)
      console.log(i);
    }
});

Subscriber code:

var stan = require('node-nats-streaming').connect('test-cluster', 'test3');

let total = 1000000;
let a = [];
let j = 0;
stan.on('connect', function () {
  for(let i=0;i<total;i++)
  {
    var opts = stan.subscriptionOptions();
    opts.setDeliverAllAvailable();
    opts.setMaxInFlight(1000000);
    opts.setDurableName('my-durable' + i);
    a.push(stan.subscribe('foo'+i, opts));
//    a[i].on('ready',function() {
      a[i].on('error', function (err) {
        console.log('subscription for ' + this.subject + " raised an error: " + err + '\n' + err.stack);
      });
      a[i].on('message', function (msg) {
        j += 1;
        if(j%1000 == 0){
          console.log(j);
          console.log('Received a message [' + msg.getSequence() + '] ' + msg.getData());
        }
//      });
    });
  }
});

Running nats streaming from a docker using: docker run -p 4222:4222 -ti nats-streaming --max_channels 1000000 --max_subs 1000000

Please help me out figuring the issue.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 17 (11 by maintainers)

Most upvoted comments

Yes, so here is what would make it “work”:

var stan = require('node-nats-streaming').connect('test-cluster', 'test3');
let total = 500000;
let subsBatchSize = 1000;
let numSubs = 0;
let a = [];
let j = 0;

stan.on('connect', function () {
  subSome();
});

function subSome() {
  var max = numSubs+subsBatchSize;
  for (i=numSubs; i<max;i++,numSubs++) {
    var opts = stan.subscriptionOptions();
    opts.setDeliverAllAvailable();
    opts.setDurableName('my-durable' + i);
    a.push(stan.subscribe('foo'+i, opts));
      a[i].on('error', function (err) {
        console.log('subscription for ' + this.subject + " raised an error: " + err + '\n' + err.stack);
        process.exit(1);
      });
      a[i].on('message', function (msg) {
        j += 1;
        if(j%1000 == 0){
          console.log(j);
          console.log('Received a message [' + msg.getSequence() + '] ' + msg.getData());
        }
    });
  }
  stan.nc.flush(function() {
    if (numSubs < total) {
      process.nextTick(subSome);
    }
  });
}

The publisher code:

var stan = require('node-nats-streaming').connect('test-cluster', 'test');
let total = 500000;
let pubBatchSize = 1000;
let count = 0;
let callbackCount = 0;

stan.on('connect', function () {
  sendSome();
});

function sendSome() {
  var max = count+pubBatchSize;
  for(let i=count;i<max;i++,count++)
  {
    stan.publish('foo'+i, 'Hello node-nats-streaming:' + i, function (err, guid) {
      if (err) {
        console.log(i + " " + err);
        process.exit(1);
      }
      // console.log("recv ack for guid " + guid);
      callbackCount++;
      if (callbackCount === total) {
        stan.close(function() {
          process.exit(0);
        });
      } else {
        if (callbackCount === max) {
          process.nextTick(sendSome);
        }
      }
    });
    if(i%1000 == 0)
      console.log(i);
  }
}

I did run this with a separate gnatsd to better monitor the memory and cpu usage. I also made use of a feature in nats streaming in master to reduce the number of streaming server’s internal subscriptions to receive application acks. NATS server:

gnatsd

NATS Streaming Server:

nats-streaming-server -mc 0 -msu 0 -m 8222 -ack_subs 10 -ns nats://localhost:4222

Still, this is a lot of subscriptions (and I have reduce to 500,000 instead of 1M). You may want to split the load on different servers/processes.

Looked at the fix on the server and looks ok to me. Let’s see if @karteek-gamooga make sure that it connects to master server (and not a running docker version that is still 0.3.8) and see if he can still reproduce.