google-cloud-node: PubSub deadlock with flushQueue / ConnectionPool.isConnected() always false

Environment details

  • OS: Debian Jessie
  • Node.js version: 8.5.0
  • google-cloud-node version: 0.14.2

Steps to reproduce

I think there is an issue with queue flushing and reconnecting in the PubSub client. Here’s a minimal example to reproduce it (locally, with the latest version of PubSub emulator):

const pubsub = require('@google-cloud/pubsub')();

let last = Date.now();

function onError(err) {
  console.log('ON ERROR', err);
}
function onMessage(message) {
  console.log('ON MESSAGE', message.attributes, (Date.now() - last) / 1000);
  last = Date.now();
  message.ack();
}

const topicName = `my-topic-${Math.floor(Math.random() * 10000)}`;
const subscriptionName = `my-subscription-${Math.floor(Math.random() * 10000)}`;

pubsub.createTopic(topicName, () => {
  const topic = pubsub.topic(topicName);
  const publisher = topic.publisher();
  topic.createSubscription(subscriptionName, () => {
    const subscription = pubsub.subscription(subscriptionName);
    subscription.on('error', onError);
    subscription.on('message', onMessage);

    publisher.publish(new Buffer('x'.repeat(4000)), { batch: 'first' }, (err, messageId) => {
      console.log('DONE publishing immediately', err, messageId);
    });
    setTimeout(() => {
      publisher.publish(new Buffer('x'.repeat(4000)), { batch: 'second' }, (err, messageId) => {
        console.log('DONE publishing after timeout', err, messageId);
      });
    }, 2000);
  });
});

process.on('SIGINT', () => {
  const subscription = pubsub.subscription(subscriptionName);
  subscription.removeListener('message', onMessage);
  subscription.removeListener('error', onError);

  process.exit();
});

Output:

DONE publishing immediately null 229
ON MESSAGE { batch: 'first' } 1.36
DONE publishing after timeout null 230
ON MESSAGE { batch: 'second' } 1.988
ON MESSAGE { batch: 'second' } 10.008
ON MESSAGE { batch: 'second' } 10.006

As you can see the second message is re-delivered over and over again.

What I could find out so far is the following:

In https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/subscription.js#L261 this.connectionPool.isConnected() always returns false for me.

Digging deeper I found out that in https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/connection-pool.js#L364 connection.isConnected is always undefined for me, hence ConnectionPool.isConnected() always returns false.

So since connectionPool.isConnected() always returns false, the first ack() call will call setFlushTimeout_() in: https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/subscription.js#L263

This will in fact run this.flushQueues_() exactly once (so the first message is actually acknowledged), but since this.flushQueues_() does not reset this.flushTimeoutHandle_ all subsequent calls to this.setFlushTimeout_() will bail out here: https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/subscription.js#L980-L982

So basically I end up with two questions:

  • Shouldn’t this.flushQueues_() make sure to always unset this.flushTimeoutHandle_ after the queues have been flushed? Otherwise this.setFlushTimeout_() can effectively be called only exactly once during the runtime of the process to trigger this.flushQueues_(), all subsequent calls will have no effect. I assume this is a bug.
  • Why is connection.isConnected always undefined? Is that another bug in the connectionPool.isConnected() method?

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 20 (7 by maintainers)

Most upvoted comments

Nice catch! I’ll update the version number tonight and hopefully we can finally put this to bed 😃. Thanks again for all your help

Just for the record: I figured out that grpc@1.4.1 was the first grpc release where your patch works well.

Anyways thanks for fixing the issue! Can’t wait to finally upgrade to the new pubsub client 😉 …

@ctavan this fix should be published, thanks for the help.

Argh, ok. Now I was messing up with yarn and grpc versions. I finally managed to make sure that only grpc@1.6.0 is installed which is pulled in through the grpc@^1.3.1 dependency of @google-cloud/common-grpc@0.4.0 and the grpc@^1.2 dependency of google-gax@0.13.0.

So with grpc@1.6.0 and your branch from #2627 the issue described in here is effectively fixed.

Nonetheless I believe that the grpc dependency version needs to be updated to prevent the issues that I was still seeing due to older versions of grpc that were still in the specified dependency version range and were not updated due to being fixed by the yarn.lock file and were consequently breaking the whole thing.

So it did not work with grpc@1.3.2 (client.waitForReady is not a function) and it does work with grpc@1.6.0.