google-cloud-cpp: C++ Pub/Sub stops receiving

Using the online documentation and the code in the sample repo, I’ve implemented PubSub for a single incoming and multiple outgoing. The pull PubSub is an ordering subscription (would much prefer an ordered and deliever once, but that doesn’t seem allowed). I have dead letter configured for each PubSub. Repeatably, the code will stop pulling messages. Sometimes in 5 minutes, sometimes after a couple of days. This morning, there were 55k messages waiting to be pulled. No messages in the dead letter. Based on the website, I expected something from the then() handler in the subscription, but there is no error messages. Here is my code.

/* These are based on https://github.com/googleapis/google-cloud-cpp/blob/HEAD/google/cloud/pubsub/samples/samples.cc */
/* https://cloud.google.com/pubsub/docs/handling-failures#dead-letter_topic */
/* https://cloud.google.com/blog/products/application-development/running-cplusplus-apps-with-the-help-of-pubsub-and-gke */
/* https://cloud.google.com/pubsub/docs/samples/pubsub-subscriber-error-listener#code-sample */
#include <google/cloud/pubsub/publisher.h>
#include <google/cloud/storage/client.h>
#include <google/cloud/pubsub/subscriber.h>
#include <google/cloud/pubsub/subscription_admin_client.h>
#include <google/cloud/pubsub/topic_admin_client.h>
#include <nlohmann/json.hpp>

    void createTopic(const std::string& topicId, const std::string& dlTopicId, google::cloud::pubsub::TopicAdminClient& client)
    {
        namespace pubsub = ::google::cloud::pubsub;

        [](pubsub::TopicAdminClient client, std::string project_id, std::string topic_id, std::string dl_topic_id)
            {
                auto topic = client.CreateTopic(pubsub::TopicBuilder(pubsub::Topic(project_id, topic_id)));
                // Note that kAlreadyExists is a possible error when the library retries.
                if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists)
                {
                    ;
                }
                else if (!topic)
                {
                    throw std::runtime_error(topic.status().message());
                }
                else
                {
                    LOGn << "The topic " << topic_id << " was successfully created: " << topic->DebugString();
                }

                auto dl_topic = client.CreateTopic(pubsub::TopicBuilder(pubsub::Topic(project_id, dl_topic_id)));
                // Note that kAlreadyExists is a possible error when the library retries.
                if (dl_topic.status().code() == google::cloud::StatusCode::kAlreadyExists)
                {
                    ;
                }
                else if (!dl_topic)
                {
                    throw std::runtime_error(dl_topic.status().message());
                }
                else
                {
                    LOGn << "The deadletter topic " << dl_topic_id << " was successfully created: " << dl_topic->DebugString();
                }

            }
        (client, pubSubProjectId, topicId, dlTopicId);
    }
    void createOrderingSubscription(const std::string& topicId, const std::string& subscriptionId, const std::string& dlTopicId, const std::string& dlSubscriptionId, google::cloud::pubsub::SubscriptionAdminClient& client)
    {
        namespace pubsub = ::google::cloud::pubsub;

        return [](pubsub::SubscriptionAdminClient client, std::string const& project_id, std::string const& topic_id, std::string const& subscription_id, std::string const& dl_topic_id, std::string const& dl_subscription_id)
            {
                static const constexpr int DEAD_LETTER_DELIVERY_ATTEMPTS = 5;

                auto sub = client.CreateSubscription(
                    pubsub::Topic(project_id, std::move(topic_id)),
                    pubsub::Subscription(project_id, subscription_id),
                    pubsub::SubscriptionBuilder{}
                        .enable_message_ordering(true)
                        .set_ack_deadline(std::chrono::seconds(60))
                        .set_dead_letter_policy(
                            pubsub::SubscriptionBuilder::MakeDeadLetterPolicy(
                            pubsub::Topic(project_id, dl_topic_id),
                            DEAD_LETTER_DELIVERY_ATTEMPTS))
                );
                if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists)
                {
                    ;
                }
                else if (!sub)
                {
                    throw std::runtime_error(sub.status().message());
                }
                else
                {
                    LOGd << "The subscription " << subscription_id << " was successfully created: " << sub->DebugString();
                }

                auto dl_sub = client.CreateSubscription(
                    pubsub::Topic(project_id, std::move(dl_topic_id)),
                    pubsub::Subscription(project_id, dl_subscription_id)
                );
                if (dl_sub.status().code() == google::cloud::StatusCode::kAlreadyExists)
                {
                    ;
                }
                else if (! dl_sub)
                {
                    throw std::runtime_error(sub.status().message());
                }
                else
                {
                    LOGd << "The deadletter subscription " << dl_subscription_id << " was successfully created: " << dl_sub->DebugString();
                }

                return;
            }
        (client, pubSubProjectId, topicId, subscriptionId, dlTopicId, dlSubscriptionId);
    }
void run()
{
    while (running)
    {
        try
        {
            namespace pubsub = ::google::cloud::pubsub;;
            int concurrency = 1;

            pubsub::TopicAdminClient topicClient(pubsub::MakeTopicAdminConnection());
            pubsub::SubscriptionAdminClient subscriptionClient(pubsub::MakeSubscriptionAdminConnection());

            createTopicSubscriptionId(eId, fId, "", fTopicId, fSubscriptionId, fDLTopicId, fDLSubscriptionId);
            createTopic(fTopicId, fDLTopicId, topicClient);
            createOrderingSubscription(fTopicId, fSubscriptionId, fDLTopicId, fDLSubscriptionId, subscriptionClient);
            fSubscriptionProblem = false;
            auto const subscription = pubsub::Subscription(pubSubProjectId, fSubscriptionId);
            auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
                subscription,
                pubsub::SubscriberOptions{}
                    .set_max_outstanding_messages(concurrency)
                    .set_max_outstanding_bytes(concurrency * 1024)
                    .set_max_concurrency(concurrency),
                pubsub::ConnectionOptions{}.set_background_thread_pool_size(concurrency))
            );
            auto session = subscriber.Subscribe([](pubsub::Message const& m, pubsub::AckHandler h) {
                ProcessMsg::CommTopic topic = static_cast<ProcessMsg::CommTopic>(std::stoi(m.attributes()["topic"]));
                bool g = true;
                std::string msg = m.data();

                LOGi << "Received message, id " << m.message_id() << ", attempt " << h.delivery_attempt() <<", ordering key '" << m.ordering_key() << "', topic " << ProcessMsg::commTopic(topic) << " (" << topic << ")";
                messageCallback(msg);
                std::move(h).ack();
            })
            .then([](::google::cloud::future<google::cloud::Status> f) {
                LOGe << "Subscription problem : " << f.get();
                fSubscriptionProblem = true;
            });
            if (! session.valid())
            {
                LOGe << "Subscribe session invalid";
                break;
            }

            while (running && ! fSubscriptionProblem)
            {
                if (outgoingQueue.empty())
                {
                    std::this_thread::sleep_for(std::chrono::milliseconds(500));
                    continue;
                }

                std::string gTopicId = "";
                std::string gSubscriptionId = "";
                std::string gDLTopicId = "";
                std::string gDLSubscriptionId = "";
                const OutgoingMsg omsg = outgoingQueue.front();

                createTopicSubscriptionId(0, 0, omsg.identifier, gTopicId, gSubscriptionId, gDLTopicId, gDLSubscriptionId);
                createTopic(gTopicId, gDLTopicId, topicClient);
                createOrderingSubscription(gTopicId, gSubscriptionId, gDLTopicId, gDLSubscriptionId, subscriptionClient);
                try
                {
                    auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(pubsub::Topic(pubSubProjectId, gTopicId)));
                    auto id = publisher.Publish(pubsub::MessageBuilder{}
                                                    .SetData(omsg.msg)
                                                    .SetAttribute("timestamp", std::to_string(time(NULL)))
                                                    .Build()).get();
                    if (! id)
                    {
                        LOGe << "Failed to publish message " << id.status().message();
                    }
                    else
                    {
                        LOGd << "Published message, id : " << *id << " (" << omsg.msg << ") to " << pubSubProjectId << "/" << gTopicId;
                        outgoingQueue.pop_front();
                    }
                }
                catch (std::exception const& e)
                {
                    LOGe << "Failed to publish message " << e.what();
                }
            }
            if (running)
            {
               LOGn << "GCP problem, cancelling session";
               session.cancel();
               LOGd << "Waiting 15 seconds";
               std::this_thread::sleep_for(std::chrono::seconds(15));
            }
        }
        catch (std::runtime_error &e)
        {
            LOGe << "RuntimeError exception : " << e.what();
        }
        catch(std::exception& e)
        {
            LOGe << "Exception : " << e.what();
        }
    }
}

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 25 (15 by maintainers)

Most upvoted comments

Let’s keep it for now. I am trying to repro the issue, if I am right and it is the client library, I will ping you and remove it. Otherwise it is good info as we pass on the bug to the Pub/Sub service folks.