google-cloud-python: PubSub: Subscriber Client stops acking messages after several seconds
-
Specify the API at the beginning of the title (for example, “BigQuery: …”) General, Core, and Other are also allowed as types
-
OS type and version python:3-alpine docker image running in GKE
-
Python version and virtual environment information
python --version -
google-cloud-python version
pip show google-cloud,pip show google-<service>orpip freezegoogle-cloud-pubsub==0.28.4 grpcio==1.6.3 -
Stacktrace if available
-
Steps to reproduce Run example code, watch stackdriver metrics for acks vs modify_ack_deadline
-
Code example
import os
import sys
import logging
import traceback
import json
import time
import argparse
import grequests
import grpc
from item_store import ItemStore
from google.cloud.pubsub_v1.subscriber.policy.thread import Policy
from google.cloud import pubsub_v1
requests = ItemStore()
class OurPolicy(Policy):
"""
We occasionally see errors that google code doesn't
recover from, set a flag that let's the outer thread respond
by restarting the client.
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated
with (StatusCode.UNAVAILABLE, OS Error)>
"""
_exception_caught = None
def __init__(self, *args, **kws):
return super(OurPolicy, self).__init__(*args, **kws)
def on_exception(self, exc):
# If this is DEADLINE_EXCEEDED, then we want to retry by returning
# None instead of raise-ing
deadline_exceeded = grpc.StatusCode.DEADLINE_EXCEEDED
code_value = getattr(exc, 'code', lambda: None)()
if code_value == deadline_exceeded:
return
OurPolicy._exception_caught = exc
# will just raise exc
return super(OurPolicy, self).on_exception(exc)
class InvalidSchemaException(Exception):
pass
def log_unhandled_exception(type, value, traceback):
logger.error(type, value, traceback)
sys.excepthook = log_unhandled_exception
def send_to_data_insertion(message):
requests.add(grequests.post('http://%s:8080/url_here=%s' % (
address, token),
data=message.data.decode('utf-8')))
############### Subscriber logic here ####################
def receive_messages(project, subscription_name):
subscriber = pubsub_v1.SubscriberClient(policy_class=OurPolicy)
subscription_path = subscriber.subscription_path(
project, subscription_name)
def callback(message):
message.ack()
try:
send_to_data_insertion(message)
except InvalidSchemaException as e:
return
except Exception as e:
return
while live_forever:
if subscriber is None:
logger.warning('Starting pubsub subscriber client')
subscriber = pubsub_v1.SubscriberClient(policy_class=OurPolicy)
subscriber.subscribe(subscription_path).open(callback=callback)
try:
while True:
grequests.map(requests.getAll())
time.sleep(sleep_interval)
if OurPolicy._exception_caught:
exc = OurPolicy._exception_caught
OurPolicy._exception_caught = None
raise exc
except KeyboardInterrupt:
break
except Exception as e:
subscriber = None
# otherwise, sleep for one interval and exit
time.sleep(sleep_interval)
#####################################################
if __name__ == "__main__":
parser.add_argument('project', help='Google cloud project ID')
parser.add_argument('subscription', help="Google cloud subscription name")
args = parser.parse_args()
receive_messages(args.project, args.subscription)

Using GitHub flavored markdown can help make your request clearer. See: https://guides.github.com/features/mastering-markdown/
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 25 (11 by maintainers)
Any updates on this?
@lukesneeringer I see a couple of the other issues are being resolved do these effect this issue at all? Is there any other updates on this its been over 3 weeks since any update.
I might be seeing this issue as well. I have a single subscriber listening to infrequent changes in a GCS bucket using a PubSub topic. My subscriber callback looks like this:
Old messages keep getting delivered over and over, and eventually I think the un-ack()'d backlog blocks new messages from being processed.
I’m using pubsub 0.29.3, cloud 0.31.0, core 0.28.0, grpcio 1.7.3 and Python 3.6.3.
Edit: I rewrote my script using google-cloud-pubsub==0.27.0 as mentioned by @anorth2, and it works correctly. I’ll also try the newly released 0.29.4 to see how it works.
Edit 2: 0.29.4 seemed to handle message pulling a bit better but not perfectly (once it hung on receiving messages, but they seemed to be acknowledged when I restarted the app). However, it exhibits the separate behavior of eventually consuming 100% CPU that I reported in #4563, so that forced me to use 0.27.0 anyway.
Any updates on this? Our team is trying to decide if the production version of our project can rely on the Python client or if we need to switch to Go/Java.
(Status update: Work still in progress trying to fix this.)
@anorth2 I’ve downgraded and the subscriber is working nicely. Thanks.
Don’t have any solutions for you, but I’m also trying out the new API and seeing similar issues.
I’m also seeing
StatusCode.UNAVAILABLEkilling the consumer thread. My approach was just to let the policy itself retry by returningNoneinon_exception, same asDEADLINE_EXCEEDED. I don’t understand why this or something similar wouldn’t be the case in the default policy, as occasionalUNAVAILABLEresponses seem expected and inevitable, and the default policy’s approach is to let the thread die without any way to detect or recover it in the main thread.In any case, that modification of
on_exceptiondefinitely helps. I am still seeing a very large number of repeat messages, though I haven’t done thorough enough testing to even pin it down to the library or my own code.EDIT: I see there are several similar issues already opened. Between this, #4186, and #3886 upgrading Pub/Sub has been a real pain, but the old API holds back every other client 😕