google-cloud-node: PubSub deadlock with flushQueue / ConnectionPool.isConnected() always false
Environment details
- OS: Debian Jessie
- Node.js version: 8.5.0
- google-cloud-node version: 0.14.2
Steps to reproduce
I think there is an issue with queue flushing and reconnecting in the PubSub client. Here’s a minimal example to reproduce it (locally, with the latest version of PubSub emulator):
const pubsub = require('@google-cloud/pubsub')();
let last = Date.now();
function onError(err) {
console.log('ON ERROR', err);
}
function onMessage(message) {
console.log('ON MESSAGE', message.attributes, (Date.now() - last) / 1000);
last = Date.now();
message.ack();
}
const topicName = `my-topic-${Math.floor(Math.random() * 10000)}`;
const subscriptionName = `my-subscription-${Math.floor(Math.random() * 10000)}`;
pubsub.createTopic(topicName, () => {
const topic = pubsub.topic(topicName);
const publisher = topic.publisher();
topic.createSubscription(subscriptionName, () => {
const subscription = pubsub.subscription(subscriptionName);
subscription.on('error', onError);
subscription.on('message', onMessage);
publisher.publish(new Buffer('x'.repeat(4000)), { batch: 'first' }, (err, messageId) => {
console.log('DONE publishing immediately', err, messageId);
});
setTimeout(() => {
publisher.publish(new Buffer('x'.repeat(4000)), { batch: 'second' }, (err, messageId) => {
console.log('DONE publishing after timeout', err, messageId);
});
}, 2000);
});
});
process.on('SIGINT', () => {
const subscription = pubsub.subscription(subscriptionName);
subscription.removeListener('message', onMessage);
subscription.removeListener('error', onError);
process.exit();
});
Output:
DONE publishing immediately null 229
ON MESSAGE { batch: 'first' } 1.36
DONE publishing after timeout null 230
ON MESSAGE { batch: 'second' } 1.988
ON MESSAGE { batch: 'second' } 10.008
ON MESSAGE { batch: 'second' } 10.006
As you can see the second message is re-delivered over and over again.
What I could find out so far is the following:
In
https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/subscription.js#L261
this.connectionPool.isConnected() always returns false for me.
Digging deeper I found out that in
https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/connection-pool.js#L364
connection.isConnected is always undefined for me, hence ConnectionPool.isConnected() always returns false.
So since connectionPool.isConnected() always returns false, the first ack() call will call setFlushTimeout_() in:
https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/subscription.js#L263
This will in fact run this.flushQueues_() exactly once (so the first message is actually acknowledged), but since this.flushQueues_() does not reset this.flushTimeoutHandle_ all subsequent calls to this.setFlushTimeout_() will bail out here:
https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/subscription.js#L980-L982
So basically I end up with two questions:
- Shouldn’t
this.flushQueues_()make sure to always unsetthis.flushTimeoutHandle_after the queues have been flushed? Otherwisethis.setFlushTimeout_()can effectively be called only exactly once during the runtime of the process to triggerthis.flushQueues_(), all subsequent calls will have no effect. I assume this is a bug. - Why is
connection.isConnectedalwaysundefined? Is that another bug in theconnectionPool.isConnected()method?
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 20 (7 by maintainers)
Nice catch! I’ll update the version number tonight and hopefully we can finally put this to bed 😃. Thanks again for all your help
Just for the record: I figured out that
grpc@1.4.1was the first grpc release where your patch works well.Anyways thanks for fixing the issue! Can’t wait to finally upgrade to the new pubsub client 😉 …
@ctavan this fix should be published, thanks for the help.
Argh, ok. Now I was messing up with yarn and
grpcversions. I finally managed to make sure that onlygrpc@1.6.0is installed which is pulled in through thegrpc@^1.3.1dependency of@google-cloud/common-grpc@0.4.0and thegrpc@^1.2dependency ofgoogle-gax@0.13.0.So with
grpc@1.6.0and your branch from #2627 the issue described in here is effectively fixed.Nonetheless I believe that the
grpcdependency version needs to be updated to prevent the issues that I was still seeing due to older versions ofgrpcthat were still in the specified dependency version range and were not updated due to being fixed by theyarn.lockfile and were consequently breaking the whole thing.So it did not work with
grpc@1.3.2(client.waitForReady is not a function) and it does work withgrpc@1.6.0.