azure-sdk-for-net: [BUG] Service Bus SubscriptionClient: Messages are no longer received when connection changes

Describe the bug We are using the service bus to receive messages in Xamarin.Android application to receive messages when updates occur. However, we have found an issue with that we no longer receive any messages after the connection has changed without dropping.

Exception or Stack Trace N/A

To Reproduce As it is very difficult to reproduce this issue in the field we have figured out a manner to reproduce this 100% of the time.

Reproducible scenario:

  1. Use mobile device as hotspot
  2. Connect other device to that wifi
  3. Force mobile device to switch between network modes (4g -> 3g, 3g -> 2g, 2g -> 4g,…)
  4. The other device will no longer receive messages

Seems same issue when switching between cell towers

Code Snippet Our implementation:

                _subscriptionClient = new SubscriptionClient(_settingsService.PrimaryOutboundEndpoint,
                                                             _settingsService.PrimaryOutboundTopic,
                                                             _settingsService.PrimaryOutboundSubscription,
                                                            TokenProvider.CreateSharedAccessSignatureTokenProvider(_settingsService.PrimaryOutboundSasToken),
                                                             receiveMode: ReceiveMode.PeekLock,
                                                             retryPolicy: _retryPolicy);

            _subscriptionClient.RegisterMessageHandler(
                async (message, cancellationToken1) =>
                {
                    if (_subscriptionClient != null && !_subscriptionClient.IsClosedOrClosing)
                    {
                        await MessageHandler(message, cancellationToken1, _subscriptionClient);
                    }
                },
                new MessageHandlerOptions(LogMessageHandlerException) { AutoComplete = true, MaxConcurrentCalls = MAX_CONCURRENT_CALLS });

Expected behavior Subscription client reconnects when there is an issue with the connection

Setup (please complete the following information):

  • OS platform and version: Android 8.0.0
  • .NET Version: Xamarin
  • NuGet package version or commit ID: 3.4.0

Additional context Currently we implemented a work around which is far from optimal as the connection will be reset more than needed. The connection is reset every time we do not receive messages within the ReceiveTimeOut. Below the adjustments as they may provide further insight. Code Changes are specified between //Adjusted Part … //End Adjusted part

Microsoft.Azure.ServiceBus.Core.MessageReceiver

protected virtual async Task<IList<Message>> OnReceiveAsync(int maxMessageCount, TimeSpan serverWaitTime)
        {
            ReceivingAmqpLink receiveLink = null;

            if (this.isSessionReceiver)
            {
                this.ThrowIfSessionLockLost();
            }

            try
            {
                var timeoutHelper = new TimeoutHelper(serverWaitTime, true);
                if(!this.ReceiveLinkManager.TryGetOpenedObject(out receiveLink))
                {
                    MessagingEventSource.Log.CreatingNewLink(this.ClientId, this.isSessionReceiver, this.SessionIdInternal, false, this.LinkException);
                    receiveLink = await this.ReceiveLinkManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
                }

                IList<Message> brokeredMessages = null;
                this.ThrowIfClosed();

                IEnumerable<AmqpMessage> amqpMessages = null;
                var hasMessages = await Task.Factory.FromAsync(
                    (c, s) => receiveLink.BeginReceiveRemoteMessages(maxMessageCount, DefaultBatchFlushInterval, timeoutHelper.RemainingTime(), c, s),
                    a => receiveLink.EndReceiveMessages(a, out amqpMessages),
                    this).ConfigureAwait(false);
                Exception exception;
                if ((exception = receiveLink.GetInnerException()) != null)
                {
                    throw exception;
                }

                if (hasMessages && amqpMessages != null)
                {
                    foreach (var amqpMessage in amqpMessages)
                    {
                        if (this.ReceiveMode == ReceiveMode.ReceiveAndDelete)
                        {
                            receiveLink.DisposeDelivery(amqpMessage, true, AmqpConstants.AcceptedOutcome);
                        }

                        var message = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage);
                        if (brokeredMessages == null)
                        {
                            brokeredMessages = new List<Message>();
                        }

                        brokeredMessages.Add(message);
                    }
                }
//Adjusted Part
//Session is closed to ensure that a new AmqpLink is created
                else
                {
                    await receiveLink.Session.CloseAsync(timeoutHelper.RemainingTime());
                }
//End Adjusted part

                return brokeredMessages;
            }
            catch (Exception exception)
            {
                throw AmqpExceptionHelper.GetClientException(exception, receiveLink?.GetTrackingId(), null, receiveLink?.Session.IsClosing() ?? false);
            }
        }

Microsoft.Azure.ServiceBus.ServiceBusConnection

//Adjusted Part
        public void RecreateConnectionManager()
        {
            this.ConnectionManager = new FaultTolerantAmqpObject<AmqpConnection>(this.CreateConnectionAsync, CloseConnection);
        }
//End Adjusted part

Microsoft.Azure.ServiceBus.Amqp.AmqpLinkCreator

    public async Task<Tuple<AmqpObject, DateTime>> CreateAndOpenAmqpLinkAsync()
      {
          var timeoutHelper = new TimeoutHelper(this.serviceBusConnection.OperationTimeout);

          MessagingEventSource.Log.AmqpGetOrCreateConnectionStart();
//Adjusted Part
          serviceBusConnection.RecreateConnectionManager();
//End Adjusted part
          var amqpConnection = await this.serviceBusConnection.ConnectionManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
          MessagingEventSource.Log.AmqpGetOrCreateConnectionStop(this.entityPath, amqpConnection.ToString(), amqpConnection.State.ToString());

          // Authenticate over CBS
          var cbsLink = amqpConnection.Extensions.Find<AmqpCbsLink>();
          DateTime cbsTokenExpiresAtUtc = DateTime.MaxValue;

          foreach (var resource in this.audience)
          {
              MessagingEventSource.Log.AmqpSendAuthenticationTokenStart(this.endpointAddress, resource, resource, this.requiredClaims);
              cbsTokenExpiresAtUtc = TimeoutHelper.Min(
                  cbsTokenExpiresAtUtc, 
                  await cbsLink.SendTokenAsync(this.cbsTokenProvider, this.endpointAddress, resource, resource, this.requiredClaims, timeoutHelper.RemainingTime()).ConfigureAwait(false));
              MessagingEventSource.Log.AmqpSendAuthenticationTokenStop(); 
          }

          AmqpSession session = null;
          try
          {
              // Create Session
              var amqpSessionSettings = new AmqpSessionSettings { Properties = new Fields() };
              session = amqpConnection.CreateSession(amqpSessionSettings);
              await session.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
          }
          catch (Exception exception)
          {
              MessagingEventSource.Log.AmqpSessionCreationException(this.entityPath, amqpConnection, exception);
              session?.Abort();
              throw AmqpExceptionHelper.GetClientException(exception, null, session.GetInnerException());
          }

          AmqpObject link = null;
          try
          {
              // Create Link
              link = this.OnCreateAmqpLink(amqpConnection, this.amqpLinkSettings, session);
              await link.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false);
              return new Tuple<AmqpObject, DateTime>(link, cbsTokenExpiresAtUtc);
          }
          catch (Exception exception)
          {
              MessagingEventSource.Log.AmqpLinkCreationException(
                  this.entityPath,
                  session,
                  amqpConnection,
                  exception);

              session.SafeClose(exception);
              throw AmqpExceptionHelper.GetClientException(exception, null, link?.GetInnerException(), session.IsClosing());
          }
      }

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • [ x ] Bug Description Added
  • [ x ] Repro Steps Added
  • [ x ] Setup information Added

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 23 (7 by maintainers)

Most upvoted comments

@nemakam I am currently starting to work on a demo application which will provide an easy way to be able to reproduce the scenario and to be able to investigate the problem. Once available I will attach it to the issue.