google-cloud-node: PubSub deadlock with flushQueue / ConnectionPool.isConnected() always false
Environment details
- OS: Debian Jessie
- Node.js version: 8.5.0
- google-cloud-node version: 0.14.2
Steps to reproduce
I think there is an issue with queue flushing and reconnecting in the PubSub client. Here’s a minimal example to reproduce it (locally, with the latest version of PubSub emulator):
const pubsub = require('@google-cloud/pubsub')();
let last = Date.now();
function onError(err) {
console.log('ON ERROR', err);
}
function onMessage(message) {
console.log('ON MESSAGE', message.attributes, (Date.now() - last) / 1000);
last = Date.now();
message.ack();
}
const topicName = `my-topic-${Math.floor(Math.random() * 10000)}`;
const subscriptionName = `my-subscription-${Math.floor(Math.random() * 10000)}`;
pubsub.createTopic(topicName, () => {
const topic = pubsub.topic(topicName);
const publisher = topic.publisher();
topic.createSubscription(subscriptionName, () => {
const subscription = pubsub.subscription(subscriptionName);
subscription.on('error', onError);
subscription.on('message', onMessage);
publisher.publish(new Buffer('x'.repeat(4000)), { batch: 'first' }, (err, messageId) => {
console.log('DONE publishing immediately', err, messageId);
});
setTimeout(() => {
publisher.publish(new Buffer('x'.repeat(4000)), { batch: 'second' }, (err, messageId) => {
console.log('DONE publishing after timeout', err, messageId);
});
}, 2000);
});
});
process.on('SIGINT', () => {
const subscription = pubsub.subscription(subscriptionName);
subscription.removeListener('message', onMessage);
subscription.removeListener('error', onError);
process.exit();
});
Output:
DONE publishing immediately null 229
ON MESSAGE { batch: 'first' } 1.36
DONE publishing after timeout null 230
ON MESSAGE { batch: 'second' } 1.988
ON MESSAGE { batch: 'second' } 10.008
ON MESSAGE { batch: 'second' } 10.006
As you can see the second message is re-delivered over and over again.
What I could find out so far is the following:
In
https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/subscription.js#L261
this.connectionPool.isConnected()
always returns false
for me.
Digging deeper I found out that in
https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/connection-pool.js#L364
connection.isConnected
is always undefined
for me, hence ConnectionPool.isConnected()
always returns false
.
So since connectionPool.isConnected()
always returns false, the first ack()
call will call setFlushTimeout_()
in:
https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/subscription.js#L263
This will in fact run this.flushQueues_()
exactly once (so the first message is actually acknowledged), but since this.flushQueues_()
does not reset this.flushTimeoutHandle_
all subsequent calls to this.setFlushTimeout_()
will bail out here:
https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/subscription.js#L980-L982
So basically I end up with two questions:
- Shouldn’t
this.flushQueues_()
make sure to always unsetthis.flushTimeoutHandle_
after the queues have been flushed? Otherwisethis.setFlushTimeout_()
can effectively be called only exactly once during the runtime of the process to triggerthis.flushQueues_()
, all subsequent calls will have no effect. I assume this is a bug. - Why is
connection.isConnected
alwaysundefined
? Is that another bug in theconnectionPool.isConnected()
method?
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 20 (7 by maintainers)
Nice catch! I’ll update the version number tonight and hopefully we can finally put this to bed 😃. Thanks again for all your help
Just for the record: I figured out that
grpc@1.4.1
was the first grpc release where your patch works well.Anyways thanks for fixing the issue! Can’t wait to finally upgrade to the new pubsub client 😉 …
@ctavan this fix should be published, thanks for the help.
Argh, ok. Now I was messing up with yarn and
grpc
versions. I finally managed to make sure that onlygrpc@1.6.0
is installed which is pulled in through thegrpc@^1.3.1
dependency of@google-cloud/common-grpc@0.4.0
and thegrpc@^1.2
dependency ofgoogle-gax@0.13.0
.So with
grpc@1.6.0
and your branch from #2627 the issue described in here is effectively fixed.Nonetheless I believe that the
grpc
dependency version needs to be updated to prevent the issues that I was still seeing due to older versions ofgrpc
that were still in the specified dependency version range and were not updated due to being fixed by theyarn.lock
file and were consequently breaking the whole thing.So it did not work with
grpc@1.3.2
(client.waitForReady is not a function
) and it does work withgrpc@1.6.0
.