google-cloud-dotnet: PubSub Subscription randomly stops pulling messages from a topic.

PubSub Subscription randomly stops pulling messages from a topic. It seems to happen about once every day or so. Sometimes after a period of time it will fix itself and other times it will never start consuming messages. The messages published to the topic are always very small. There are no logs in pubsub with any errors and it never shows any messages failing to be processed due to a timeout error or any other error. (I have gone through Google’s Support and can confirm pubsub is working as expected.) The subscription does not return or have any exceptions and so never had to retry via Polly. We suspect the issue is not a problem with pubsub but rather the .net client library? We have other subscriptions for the same topic receiving messages but they have been written in go and they are working as expected. We can see in the .net subscription for the topic that there are messages not being pulled from the queue. The publisher is working ok and we can confirm that all messages are sent to the topic.

I am currently running a single subscriber per subscription and a single instance of the application with the following

  • .net version: mcr.microsoft.com/dotnet/core/aspnet:3.1-buster-slim
  • running: docker - linux
  • nuget: \google.cloud.pubsub.v1\2.0.0

I have re-written the code a few times and it still happens. There are multiple (3 to 6) subscriptions running at once for different topics. The messages are very sporadic, sometimes there will be no messages for hours and other times there will be many. The failures happen with no pattern that we can determine. Unfortunately I can not reproduce reliably as it is intermittent but here is the code.

namespace EventBus.GcpPubSub
{
    public class Subscription<E, T>
        where E : ISubscribeEvent, new()
        where T : class, ISubscriptionHandler<E>
    {
        private readonly ILogger<Subscription> logger;
        private readonly GcpPubSubSettings gcpPubSubSettings;
        private readonly IServiceProvider serviceProvider;

        private readonly string topicId;
        private readonly string subscriptionId;

        public Subscription(ILogger<Subscription> logger, IOptions<GcpPubSubSettings> gcpPubSubSettingsAccessor, IServiceProvider serviceProvider)
        {
            this.logger = logger;
            gcpPubSubSettings = gcpPubSubSettingsAccessor.Value;
            this.serviceProvider = serviceProvider;

            var @event = new E();
            topicId = @event.TopicId;
            subscriptionId = @event.SubscriptionId;
        }

        public void Start()
        {
            AddResilientSubscriptionAndHandler()
                .ContinueWith(t => logger.LogError("[pub/sub diagnostics] Unrecoverable error occured with {EventName} with {EventHandler}", subscriptionId, topicId),
                    TaskContinuationOptions.OnlyOnFaulted);
        }

        private async Task AddResilientSubscriptionAndHandler()
        {
            try
            {
                var retries = 0;
                var retryOnException = Polly.Policy
                    .Handle<Exception>()
                    .WaitAndRetryForeverAsync(
                        attempt => TimeSpan.FromSeconds(1),
                        (exception, calculatedWaitDuration, timespan) =>
                        {
                            retries++;
                            logger.LogError(exception, "[pub/sub diagnostics] Subscription Exception detected on attempt {retry}", retries);
                        }
                    );

                var retryOnConnectionLost = Polly.Policy
                    .HandleResult<Task>(t => t.IsCompleted)
                    .WaitAndRetryForeverAsync(
                        attempt => TimeSpan.FromSeconds(1),
                        (result, calculatedWaitDuration, timespan) =>
                        {
                            retries++;
                            logger.LogError("[pub/sub diagnostics] Subscription disconnection detected on attempt {retry}", retries);
                        }
                    );

                await retryOnException.ExecuteAsync(async () =>
                {
                    await retryOnConnectionLost.ExecuteAsync(async () =>
                    {
                        await AddSubscriptionAndHandler().ConfigureAwait(false);

                        logger.LogError("[pub/sub diagnostics] Subscriber returned. restarting");

                        return Task.CompletedTask;
                    }).ConfigureAwait(false);

                    logger.LogError("[pub/sub diagnostics] Subscription failed for some unknown reason. restarting");

                }).ConfigureAwait(false);
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "[pub/sub diagnostics] Subscription failed with exception.");
            }
            logger.LogError("[pub/sub diagnostics] Subscription failed and cannot restart for some unknown reason.");
        }

        private async Task AddSubscriptionAndHandler()
        {
            var subscriptionName = SubscriptionName.FromProjectSubscription(gcpPubSubSettings.ProjectId, subscriptionId);
            var subscriberClient = await SubscriberClient.CreateAsync(
                subscriptionName,
                settings: new SubscriberClient.Settings
                {
                    AckExtensionWindow = (TimeSpan?)TimeSpan.FromSeconds(30),
                    AckDeadline = (TimeSpan?)TimeSpan.FromSeconds(60),
                    FlowControlSettings = new Google.Api.Gax.FlowControlSettings(
                        maxOutstandingElementCount: 10,
                        MaxOutstandingByteCount: null
                    )
                }).ConfigureAwait(false);

            logger.LogTrace("[pub/sub diagnostics] Start Subscriber {@subscription}", subscriptionName);

            await subscriberClient.StartAsync(HandlerAsync).ConfigureAwait(false);
        }

        private async Task<SubscriberClient.Reply> HandlerAsync(PubsubMessage message, CancellationToken cancellationToken)
        {
            logger.LogInformation("[pub/sub diagnostics] Received message {MessageId} on topic {topicId}, published at {PublishTime}: {contents}", message?.MessageId, topicId, message?.PublishTime?.ToDateTime(), message?.Data?.ToStringUtf8());
            try
            {
                var data = message?.Data?.ToStringUtf8();
                var @event = data == null ? null : JsonSerializer.Deserialize<MessageDto>(data);

                var handler = serviceProvider.GetRequiredService<T>();
                await handler.Handle(@event, cancellationToken).ConfigureAwait(false);
                return SubscriberClient.Reply.Ack;
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "[pub/sub diagnostics] Message/Subscription failed {MessageId}: {message}", message?.MessageId, message?.Data?.ToStringUtf8());
            }
            return SubscriberClient.Reply.Ack;
        }
    }
}

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 24 (5 by maintainers)

Most upvoted comments

@gothraven: Not a problem. Might as well leave the comments, in case they’re useful for anyone else who happens to see something similar. If you reference this issue when you create a Node one, the link will be there for future folks…