google-cloud-python: PubSub: Subscriber Client stops acking messages after several seconds

  1. Specify the API at the beginning of the title (for example, “BigQuery: …”) General, Core, and Other are also allowed as types

  2. OS type and version python:3-alpine docker image running in GKE

  3. Python version and virtual environment information python --version

  4. google-cloud-python version pip show google-cloud, pip show google-<service> or pip freeze google-cloud-pubsub==0.28.4 grpcio==1.6.3

  5. Stacktrace if available

  6. Steps to reproduce Run example code, watch stackdriver metrics for acks vs modify_ack_deadline

  7. Code example

import os
import sys
import logging
import traceback
import json
import time
import argparse
import grequests
import grpc

from item_store import ItemStore

from google.cloud.pubsub_v1.subscriber.policy.thread import Policy
from google.cloud import pubsub_v1
requests = ItemStore()

class OurPolicy(Policy):
    """
    We occasionally see errors that google code doesn't
    recover from, set a flag that let's the outer thread respond
    by restarting the client.
    grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated
    with (StatusCode.UNAVAILABLE, OS Error)>
    """
    _exception_caught = None
    def __init__(self, *args, **kws):
        return super(OurPolicy, self).__init__(*args, **kws)
    def on_exception(self, exc):
        # If this is DEADLINE_EXCEEDED, then we want to retry by returning
        # None instead of raise-ing
        deadline_exceeded = grpc.StatusCode.DEADLINE_EXCEEDED
        code_value = getattr(exc, 'code', lambda: None)()
        if code_value == deadline_exceeded:
            return
        OurPolicy._exception_caught = exc
        # will just raise exc
        return super(OurPolicy, self).on_exception(exc)

class InvalidSchemaException(Exception):
    pass

def log_unhandled_exception(type, value, traceback):
    logger.error(type, value, traceback)

sys.excepthook = log_unhandled_exception

def send_to_data_insertion(message):
    requests.add(grequests.post('http://%s:8080/url_here=%s' % (
        address, token),
        data=message.data.decode('utf-8')))


###############  Subscriber logic here ####################
def receive_messages(project, subscription_name):
    subscriber = pubsub_v1.SubscriberClient(policy_class=OurPolicy)
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        message.ack()
        try:
            send_to_data_insertion(message)

        except InvalidSchemaException as e:
            return
        except Exception as e:
            return

    while live_forever:
        if subscriber is None:
            logger.warning('Starting pubsub subscriber client')
            subscriber = pubsub_v1.SubscriberClient(policy_class=OurPolicy)

        subscriber.subscribe(subscription_path).open(callback=callback)

        try:
            while True:
                grequests.map(requests.getAll())
                time.sleep(sleep_interval)

                if OurPolicy._exception_caught:
                    exc = OurPolicy._exception_caught
                    OurPolicy._exception_caught = None
                    raise exc
        except KeyboardInterrupt:
            break
        except Exception as e:
            subscriber = None

    # otherwise, sleep for one interval and exit
    time.sleep(sleep_interval)

#####################################################


if __name__ == "__main__":
    parser.add_argument('project', help='Google cloud project ID')
    parser.add_argument('subscription', help="Google cloud subscription name")
    args = parser.parse_args()
    receive_messages(args.project, args.subscription)

screen shot 2017-10-27 at 1 06 46 pm

Using GitHub flavored markdown can help make your request clearer. See: https://guides.github.com/features/mastering-markdown/

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 25 (11 by maintainers)

Most upvoted comments

Any updates on this?

@lukesneeringer I see a couple of the other issues are being resolved do these effect this issue at all? Is there any other updates on this its been over 3 weeks since any update.

I might be seeing this issue as well. I have a single subscriber listening to infrequent changes in a GCS bucket using a PubSub topic. My subscriber callback looks like this:

def add_to_queue(message):
  message_queue.put(message)
  message.ack()

Old messages keep getting delivered over and over, and eventually I think the un-ack()'d backlog blocks new messages from being processed. image I’m using pubsub 0.29.3, cloud 0.31.0, core 0.28.0, grpcio 1.7.3 and Python 3.6.3.

Edit: I rewrote my script using google-cloud-pubsub==0.27.0 as mentioned by @anorth2, and it works correctly. I’ll also try the newly released 0.29.4 to see how it works.

Edit 2: 0.29.4 seemed to handle message pulling a bit better but not perfectly (once it hung on receiving messages, but they seemed to be acknowledged when I restarted the app). However, it exhibits the separate behavior of eventually consuming 100% CPU that I reported in #4563, so that forced me to use 0.27.0 anyway.

Any updates on this? Our team is trying to decide if the production version of our project can rely on the Python client or if we need to switch to Go/Java.

(Status update: Work still in progress trying to fix this.)

@anorth2 I’ve downgraded and the subscriber is working nicely. Thanks.

Don’t have any solutions for you, but I’m also trying out the new API and seeing similar issues.

I’m also seeing StatusCode.UNAVAILABLE killing the consumer thread. My approach was just to let the policy itself retry by returning None in on_exception, same as DEADLINE_EXCEEDED. I don’t understand why this or something similar wouldn’t be the case in the default policy, as occasional UNAVAILABLE responses seem expected and inevitable, and the default policy’s approach is to let the thread die without any way to detect or recover it in the main thread.

In any case, that modification of on_exception definitely helps. I am still seeing a very large number of repeat messages, though I haven’t done thorough enough testing to even pin it down to the library or my own code.

EDIT: I see there are several similar issues already opened. Between this, #4186, and #3886 upgrading Pub/Sub has been a real pain, but the old API holds back every other client 😕