google-cloud-ruby: Pubsub stops pulling messages

Thanks for stopping by to let us know something could be better!

Environment details

  • OS: Heroku app
  • Ruby version: 2.6.6
  • Gem name and version: google-cloud-pubsub (1.10.0)

Problem

We are using Google Pubsub as a backend for our background jobs processing. For some months we’ve been having some issues with the system in which it suddenly stops processing jobs. We suspect that this happens when there’s an elevated error rate in our system.

The last time it happened I added some logs to see if I could find something. We are logging for each subscriber’s stream pool these things:

I will share the information for one topic, but mostly the whole system was behaving the same way. For one topic we have this configuration:

{ streams: 3, inventory: 1000, callback_threads: 6, push_threads: 3 }

Here are the logs that I previously mentioned:

[{"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>0, "stream_status"=>"running"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>4, "stream_status"=>"running"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>0, "stream_status"=>"running"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>0, "stream_status"=>"running"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>0, "stream_status"=>"running"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>0, "stream_status"=>"running"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>293, "stream_status"=>"paused"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>331, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>329, "stream_status"=>"running"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>267, "stream_status"=>"running"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>277, "stream_status"=>"running"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>286, "stream_status"=>"running"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>299, "stream_status"=>"paused"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>300, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>256, "stream_status"=>"running"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>298, "stream_status"=>"paused"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>289, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>327, "stream_status"=>"paused"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>329, "stream_status"=>"running"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>278, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>304, "stream_status"=>"paused"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}},
 {"stream_0"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>282, "stream_status"=>"paused"},
  "stream_1"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>317, "stream_status"=>"paused"},
  "stream_2"=>{"background_thread_status"=>"sleep", "callback_thread_pool_running"=>true, "inventory"=>318, "stream_status"=>"paused"}}]

As you can see the last logs got stuck with these values as inventory: 282, 317, 318. And they were like that until we restarted the process.

We’ve been with this issue for lots of months and we can’t figure out what’s the problem. The only thing we know is that it usually happens when there’s an elevated error rate in our jobs.

I saw some other issues related but none of them helped.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 49 (48 by maintainers)

Most upvoted comments

is there a way to reproduce the scenario in which I receive the same message twice locally?

As described in At-Least-Once delivery:

Pub/Sub will repeatedly attempt to deliver any message that has not been acknowledged. While a message is outstanding to a subscriber, however, Pub/Sub tries not to deliver it to any other subscriber on the same subscription. The subscriber has a configurable, limited amount of time – known as the ackDeadline – to acknowledge the outstanding message. Once the deadline passes, the message is no longer considered outstanding, and Pub/Sub will attempt to redeliver the message.

Thus, to increase your chances of redelivery, I think you want to send just a few messages, shorten the ack deadline for those messages, and do not ack them.

@alanhala The bottom line (green) is a graph of successful transactions only then? So these jobs succeed very quickly? What do you know about the failures?

Each line represents a subscription. So only the purple is the one that takes some time. But as mentioned, that subscription is not the one that gets stuck. As for the failures, I don’t think they take a lot more time. I’ll try to get more information about them.

More code that might be relevant. As I mentioned before, we have some logic around checking whether a job is already being processed or not (detect duplicated messages). This is how we handle it:

begin
  job.perform_now
  @gcp_message.ack!
rescue JobAlreadyInProgress
  @gcp_message.modify_ack_deadline!(10.minutes.to_i)
end

So when we detect a duplicated message, we modify the ack deadline and let the other process to ack the message. We don’t want to ack the message because the other process might fail processing the message and we don’t want to lose messages. By doing this, we leave the responsibility of acking the message to the other process. Is this logic correct? I couldn’t figure out how to replicate the duplication locally, but looking at our logs I found this:

[
  {
    "insertId": "6eba10f6-f152-424b-8403-b6c4bb841581",
    "jsonPayload": {
      "job_id": "c1fa2eac-f188-4f44-b42a-36840ba6b438",
      "exception": null,
    },
    "timestamp": "2020-11-05T18:40:30.732168Z",
  },
  {
    "insertId": "0ce3ccd3-8dda-430c-9363-7594fd601e3f",
    "jsonPayload": {
      "exception": null,
      "job_id": "c1fa2eac-f188-4f44-b42a-36840ba6b438",
    },
    "timestamp": "2020-11-05T18:30:34.397501Z",
  },
  {
    "insertId": "2b293280-ba51-4d34-8fda-13fc8f826667",
    "jsonPayload": {
      "exception": "ApplicationJob::Exclusion::JobAlreadyInProgress",
      "job_id": "c1fa2eac-f188-4f44-b42a-36840ba6b438"
    },
    "timestamp": "2020-11-05T18:30:29.412439Z",
  }
]

The second log acked the message and as you can see, after 10 minutes, there’s another entry. When there’s an outage, there’s an increase in the JobAlreadyInProgress errors.

I have been looking but so far haven’t had any insights. You wrote that your configuration is:

{ streams: 3, inventory: 1000, callback_threads: 6, push_threads: 3 }

But callback_threads and push_threads are not parameters for Subscription#listen. Can you share your actual source code call(s) to this method and parameter values? Have you ever tried setting the inventory below 300? I was also wondering if you have experimented with other inventory configuration settings, such as max_outstanding_bytes. I know you wrote that memory is not a problem, but what is the typical size of message data?

callback_threads and push_threads are the values we use for the threads key.

subscriber_options = {
  streams: streams,
  inventory: inventory,
  threads: { callback: callback_threads, push: push_threads }
}
subscriber =
  subscription.listen(subscriber_options) do |message|
    process(message)
  end
subscriber.on_error do |error|
  send_error(error)
end

As for the inventory size or max_outstanding_bytes, we haven’t tried with other values. There’s another subscription that has an inventory of 200, but nothing less than that.

No, the process uses the half of the memory more or less constantly. I checked that day and I couldn’t find any weird things on the memory usage.