azure-sdk-for-net: Service Bus: System.ObjectDisposedException when opening a receiver link
Hi,
I am getting thousands of ObjectDisplosedException a day throw out of Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope.OpenReceiverLinkAsync() at line 312.
By looking into the servicebus source code, the problem is the ActiveConnection was disposed. The version of Azure.Messaging.ServiceBus is 7.1.0.0. Is this an issue of servicebus code? Any suggestion? My sample code is provided at the end.
Thanks
Exception Detail:
{“HResult”:-2146232798,“Message”:“Cannot access a disposed object.\r\nObject name: 'FaultTolerantAmqpObject1'.","Source":"Microsoft.Azure.Amqp","ObjectName":"FaultTolerantAmqpObject
1”,“Type”:“System.ObjectDisposedException”}
Call Stack:
System.ObjectDisposedException:
at Microsoft.Azure.Amqp.Singleton`1+<GetOrCreateAsync>d__13.MoveNext (Microsoft.Azure.Amqp, Version=2.4.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1+ConfiguredTaskAwaiter.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Azure.Messaging.ServiceBus.Amqp.AmqpConnectionScope+<OpenReceiverLinkAsync>d__54.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1+ConfiguredTaskAwaiter.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+<OpenReceiverLinkAsync>d__30.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1+ConfiguredTaskAwaiter.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Microsoft.Azure.Amqp.FaultTolerantAmqpObject`1+<OnCreateAsync>d__6.MoveNext (Microsoft.Azure.Amqp, Version=2.4.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Microsoft.Azure.Amqp.Singleton`1+<GetOrCreateAsync>d__13.MoveNext (Microsoft.Azure.Amqp, Version=2.4.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Microsoft.Azure.Amqp.Singleton`1+<GetOrCreateAsync>d__13.MoveNext (Microsoft.Azure.Amqp, Version=2.4.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+<ReceiveMessagesAsyncInternal>d__34.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+<ReceiveMessagesAsyncInternal>d__34.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1+ConfiguredTaskAwaiter.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+<>c__DisplayClass33_0+<<ReceiveMessagesAsync>b__0>d.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy+<RunOperation>d__20.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy+<RunOperation>d__20.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver+<ReceiveMessagesAsync>d__33.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1+ConfiguredTaskAwaiter.GetResult (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Azure.Messaging.ServiceBus.ServiceBusReceiver+<ReceiveMessagesAsync>d__37.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Azure.Messaging.ServiceBus.ServiceBusReceiver+<ReceiveMessageAsync>d__39.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Private.CoreLib, Version=5.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e)
at Azure.Messaging.ServiceBus.ReceiverManager+<ReceiveAndProcessMessagesAsync>d__19.MoveNext (Azure.Messaging.ServiceBus, Version=7.1.0.0, Culture=neutral, PublicKeyToken=92742159e12e44c8)
My sample code:
public class QueueWorker<TMessage> : BackgroundService where TMessage : class
{
private readonly IProcessQueueMessages<TMessage> _messageProcessor;
private readonly ServiceBusClient _serviceBusClient;
protected ILogger<QueueWorker<TMessage>> Logger { get; }
public QueueWorker(ILogger<QueueWorker<TMessage>> logger, IProcessQueueMessages<TMessage> messageProcessor, ServiceBusClient serviceBusClient)
{
Logger = logger;
_messageProcessor = messageProcessor;
_serviceBusClient = serviceBusClient;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var queueName = _messageProcessor.QueueName;
if(string.IsNullOrWhiteSpace(queueName))
{
Logger.LogWarning(_messageProcessor.QueueNameMissingWarningMessage);
return;
}
var messageProcessor = CreateServiceBusProcessor(queueName);
messageProcessor.ProcessMessageAsync += HandleMessageAsync;
messageProcessor.ProcessErrorAsync += HandleReceivedExceptionAsync;
Logger.LogInformation($"Starting message pump on queue {queueName} in namespace {messageProcessor.FullyQualifiedNamespace}");
await messageProcessor.StartProcessingAsync(stoppingToken);
Logger.LogInformation("Message pump started");
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
Logger.LogInformation("Closing message pump");
messageProcessor.ProcessMessageAsync -= HandleMessageAsync;
messageProcessor.ProcessErrorAsync -= HandleReceivedExceptionAsync;
await messageProcessor.CloseAsync(cancellationToken: stoppingToken);
Logger.LogInformation("Message pump closed : {Time}", DateTimeOffset.UtcNow);
}
private ServiceBusProcessor CreateServiceBusProcessor(string queueName)
{
return _serviceBusClient.CreateProcessor(queueName);
}
private async Task HandleMessageAsync(ProcessMessageEventArgs processMessageEventArgs)
{
TMessage deserializedMessage = null;
try
{
var rawMessageBody = Encoding.UTF8.GetString(processMessageEventArgs.Message.Body.ToArray());
Logger.LogInformation("Received message {MessageId} with body {MessageBody}",
processMessageEventArgs.Message.MessageId, rawMessageBody);
deserializedMessage = JsonConvert.DeserializeObject<TMessage>(rawMessageBody);
if (deserializedMessage != null)
{
await _messageProcessor.ProcessMessage(deserializedMessage, processMessageEventArgs.Message.MessageId,
processMessageEventArgs.Message.ApplicationProperties,
processMessageEventArgs.CancellationToken);
}
else
{
Logger.LogError(
"Unable to deserialize to message contract {ContractName} for message {MessageBody}",
typeof(TMessage), rawMessageBody);
}
Logger.LogInformation("Message {MessageId} processed", processMessageEventArgs.Message.MessageId);
await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message);
}
catch (Exception ex)
{
Logger.LogError(ex, "Unable to handle message: {ProcessMessageEventArgs}", processMessageEventArgs);
if(!_messageProcessor.CanRetry(ex, processMessageEventArgs.Message?.Body, deserializedMessage))
{
await processMessageEventArgs.DeadLetterMessageAsync(processMessageEventArgs.Message);
}
}
}
private Task HandleReceivedExceptionAsync(ProcessErrorEventArgs exceptionEvent)
{
Logger.LogError(exceptionEvent.Exception, "Unable to process message");
return Task.CompletedTask;
}
}
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 34 (20 by maintainers)
Ah, cancellatinoToken really means to cancel stopProcessing. I misunderstood it. I will remove the cancellationToken parameter to test our web app tomorrow and will keep you updated. Thanks for the help.