google-cloud-cpp: C++ Pub/Sub stops receiving
Using the online documentation and the code in the sample repo, I’ve implemented PubSub for a single incoming and multiple outgoing. The pull PubSub is an ordering subscription (would much prefer an ordered and deliever once, but that doesn’t seem allowed). I have dead letter configured for each PubSub. Repeatably, the code will stop pulling messages. Sometimes in 5 minutes, sometimes after a couple of days. This morning, there were 55k messages waiting to be pulled. No messages in the dead letter. Based on the website, I expected something from the then() handler in the subscription, but there is no error messages. Here is my code.
/* These are based on https://github.com/googleapis/google-cloud-cpp/blob/HEAD/google/cloud/pubsub/samples/samples.cc */
/* https://cloud.google.com/pubsub/docs/handling-failures#dead-letter_topic */
/* https://cloud.google.com/blog/products/application-development/running-cplusplus-apps-with-the-help-of-pubsub-and-gke */
/* https://cloud.google.com/pubsub/docs/samples/pubsub-subscriber-error-listener#code-sample */
#include <google/cloud/pubsub/publisher.h>
#include <google/cloud/storage/client.h>
#include <google/cloud/pubsub/subscriber.h>
#include <google/cloud/pubsub/subscription_admin_client.h>
#include <google/cloud/pubsub/topic_admin_client.h>
#include <nlohmann/json.hpp>
void createTopic(const std::string& topicId, const std::string& dlTopicId, google::cloud::pubsub::TopicAdminClient& client)
{
namespace pubsub = ::google::cloud::pubsub;
[](pubsub::TopicAdminClient client, std::string project_id, std::string topic_id, std::string dl_topic_id)
{
auto topic = client.CreateTopic(pubsub::TopicBuilder(pubsub::Topic(project_id, topic_id)));
// Note that kAlreadyExists is a possible error when the library retries.
if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists)
{
;
}
else if (!topic)
{
throw std::runtime_error(topic.status().message());
}
else
{
LOGn << "The topic " << topic_id << " was successfully created: " << topic->DebugString();
}
auto dl_topic = client.CreateTopic(pubsub::TopicBuilder(pubsub::Topic(project_id, dl_topic_id)));
// Note that kAlreadyExists is a possible error when the library retries.
if (dl_topic.status().code() == google::cloud::StatusCode::kAlreadyExists)
{
;
}
else if (!dl_topic)
{
throw std::runtime_error(dl_topic.status().message());
}
else
{
LOGn << "The deadletter topic " << dl_topic_id << " was successfully created: " << dl_topic->DebugString();
}
}
(client, pubSubProjectId, topicId, dlTopicId);
}
void createOrderingSubscription(const std::string& topicId, const std::string& subscriptionId, const std::string& dlTopicId, const std::string& dlSubscriptionId, google::cloud::pubsub::SubscriptionAdminClient& client)
{
namespace pubsub = ::google::cloud::pubsub;
return [](pubsub::SubscriptionAdminClient client, std::string const& project_id, std::string const& topic_id, std::string const& subscription_id, std::string const& dl_topic_id, std::string const& dl_subscription_id)
{
static const constexpr int DEAD_LETTER_DELIVERY_ATTEMPTS = 5;
auto sub = client.CreateSubscription(
pubsub::Topic(project_id, std::move(topic_id)),
pubsub::Subscription(project_id, subscription_id),
pubsub::SubscriptionBuilder{}
.enable_message_ordering(true)
.set_ack_deadline(std::chrono::seconds(60))
.set_dead_letter_policy(
pubsub::SubscriptionBuilder::MakeDeadLetterPolicy(
pubsub::Topic(project_id, dl_topic_id),
DEAD_LETTER_DELIVERY_ATTEMPTS))
);
if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists)
{
;
}
else if (!sub)
{
throw std::runtime_error(sub.status().message());
}
else
{
LOGd << "The subscription " << subscription_id << " was successfully created: " << sub->DebugString();
}
auto dl_sub = client.CreateSubscription(
pubsub::Topic(project_id, std::move(dl_topic_id)),
pubsub::Subscription(project_id, dl_subscription_id)
);
if (dl_sub.status().code() == google::cloud::StatusCode::kAlreadyExists)
{
;
}
else if (! dl_sub)
{
throw std::runtime_error(sub.status().message());
}
else
{
LOGd << "The deadletter subscription " << dl_subscription_id << " was successfully created: " << dl_sub->DebugString();
}
return;
}
(client, pubSubProjectId, topicId, subscriptionId, dlTopicId, dlSubscriptionId);
}
void run()
{
while (running)
{
try
{
namespace pubsub = ::google::cloud::pubsub;;
int concurrency = 1;
pubsub::TopicAdminClient topicClient(pubsub::MakeTopicAdminConnection());
pubsub::SubscriptionAdminClient subscriptionClient(pubsub::MakeSubscriptionAdminConnection());
createTopicSubscriptionId(eId, fId, "", fTopicId, fSubscriptionId, fDLTopicId, fDLSubscriptionId);
createTopic(fTopicId, fDLTopicId, topicClient);
createOrderingSubscription(fTopicId, fSubscriptionId, fDLTopicId, fDLSubscriptionId, subscriptionClient);
fSubscriptionProblem = false;
auto const subscription = pubsub::Subscription(pubSubProjectId, fSubscriptionId);
auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
subscription,
pubsub::SubscriberOptions{}
.set_max_outstanding_messages(concurrency)
.set_max_outstanding_bytes(concurrency * 1024)
.set_max_concurrency(concurrency),
pubsub::ConnectionOptions{}.set_background_thread_pool_size(concurrency))
);
auto session = subscriber.Subscribe([](pubsub::Message const& m, pubsub::AckHandler h) {
ProcessMsg::CommTopic topic = static_cast<ProcessMsg::CommTopic>(std::stoi(m.attributes()["topic"]));
bool g = true;
std::string msg = m.data();
LOGi << "Received message, id " << m.message_id() << ", attempt " << h.delivery_attempt() <<", ordering key '" << m.ordering_key() << "', topic " << ProcessMsg::commTopic(topic) << " (" << topic << ")";
messageCallback(msg);
std::move(h).ack();
})
.then([](::google::cloud::future<google::cloud::Status> f) {
LOGe << "Subscription problem : " << f.get();
fSubscriptionProblem = true;
});
if (! session.valid())
{
LOGe << "Subscribe session invalid";
break;
}
while (running && ! fSubscriptionProblem)
{
if (outgoingQueue.empty())
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
continue;
}
std::string gTopicId = "";
std::string gSubscriptionId = "";
std::string gDLTopicId = "";
std::string gDLSubscriptionId = "";
const OutgoingMsg omsg = outgoingQueue.front();
createTopicSubscriptionId(0, 0, omsg.identifier, gTopicId, gSubscriptionId, gDLTopicId, gDLSubscriptionId);
createTopic(gTopicId, gDLTopicId, topicClient);
createOrderingSubscription(gTopicId, gSubscriptionId, gDLTopicId, gDLSubscriptionId, subscriptionClient);
try
{
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(pubsub::Topic(pubSubProjectId, gTopicId)));
auto id = publisher.Publish(pubsub::MessageBuilder{}
.SetData(omsg.msg)
.SetAttribute("timestamp", std::to_string(time(NULL)))
.Build()).get();
if (! id)
{
LOGe << "Failed to publish message " << id.status().message();
}
else
{
LOGd << "Published message, id : " << *id << " (" << omsg.msg << ") to " << pubSubProjectId << "/" << gTopicId;
outgoingQueue.pop_front();
}
}
catch (std::exception const& e)
{
LOGe << "Failed to publish message " << e.what();
}
}
if (running)
{
LOGn << "GCP problem, cancelling session";
session.cancel();
LOGd << "Waiting 15 seconds";
std::this_thread::sleep_for(std::chrono::seconds(15));
}
}
catch (std::runtime_error &e)
{
LOGe << "RuntimeError exception : " << e.what();
}
catch(std::exception& e)
{
LOGe << "Exception : " << e.what();
}
}
}
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 25 (15 by maintainers)
Let’s keep it for now. I am trying to repro the issue, if I am right and it is the client library, I will ping you and remove it. Otherwise it is good info as we pass on the bug to the Pub/Sub service folks.