google-cloud-ruby: Pubsub stops pulling messages
Thanks for stopping by to let us know something could be better!
Environment details
- OS: Heroku app
- Ruby version: 2.6.6
- Gem name and version: google-cloud-pubsub (1.10.0)
Problem
We are using Google Pubsub as a backend for our background jobs processing. For some months we’ve been having some issues with the system in which it suddenly stops processing jobs. We suspect that this happens when there’s an elevated error rate in our system.
The last time it happened I added some logs to see if I could find something. We are logging for each subscriber’s stream pool these things:
- inventory
- stream_status: https://github.com/googleapis/google-cloud-ruby/blob/master/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/stream.rb#L380
- background_thread_status: https://github.com/googleapis/google-cloud-ruby/blob/master/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/stream.rb#L386
- callback_thread_pool_running: we check if the callback thread pool is running or not https://github.com/googleapis/google-cloud-ruby/blob/master/google-cloud-pubsub/lib/google/cloud/pubsub/subscriber/stream.rb#L35
I will share the information for one topic, but mostly the whole system was behaving the same way. For one topic we have this configuration:
{ streams: 3, inventory: 1000, callback_threads: 6, push_threads: 3 }
Here are the logs that I previously mentioned:
[{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>0, "stream_status"=>"running"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>4, "stream_status"=>"running"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>0, "stream_status"=>"running"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>0, "stream_status"=>"running"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>0, "stream_status"=>"running"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>0, "stream_status"=>"running"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>293, "stream_status"=>"paused"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>331, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>329, "stream_status"=>"running"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>267, "stream_status"=>"running"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>277, "stream_status"=>"running"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>286, "stream_status"=>"running"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>299, "stream_status"=>"paused"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>300, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>256, "stream_status"=>"running"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>298, "stream_status"=>"paused"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>289, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>327, "stream_status"=>"paused"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>329, "stream_status"=>"running"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>278, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>304, "stream_status"=>"paused"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
"stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
"stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}}]
As you can see the last logs got stuck with these values as inventory: 282, 317, 318. And they were like that until we restarted the process.
We’ve been with this issue for lots of months and we can’t figure out what’s the problem. The only thing we know is that it usually happens when there’s an elevated error rate in our jobs.
I saw some other issues related but none of them helped.
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 49 (48 by maintainers)
As described in At-Least-Once delivery:
Thus, to increase your chances of redelivery, I think you want to send just a few messages, shorten the ack deadline for those messages, and do not ack them.
Each line represents a subscription. So only the purple is the one that takes some time. But as mentioned, that subscription is not the one that gets stuck. As for the failures, I don’t think they take a lot more time. I’ll try to get more information about them.
More code that might be relevant. As I mentioned before, we have some logic around checking whether a job is already being processed or not (detect duplicated messages). This is how we handle it:
So when we detect a duplicated message, we modify the ack deadline and let the other process to ack the message. We don’t want to ack the message because the other process might fail processing the message and we don’t want to lose messages. By doing this, we leave the responsibility of acking the message to the other process. Is this logic correct? I couldn’t figure out how to replicate the duplication locally, but looking at our logs I found this:
The second log acked the message and as you can see, after 10 minutes, there’s another entry. When there’s an outage, there’s an increase in the
JobAlreadyInProgress
errors.callback_threads
andpush_threads
are the values we use for thethreads
key.As for the inventory size or max_outstanding_bytes, we haven’t tried with other values. There’s another subscription that has an inventory of 200, but nothing less than that.
No, the process uses the half of the memory more or less constantly. I checked that day and I couldn’t find any weird things on the memory usage.