azure-sdk-for-python: Async AutoLockRenewer doesn't renew locks and throws azure.servicebus.exceptions.MessageLockLostError
- Package Name: azure-servicebus
- Package Version: 7.11.1
- Operating System: Linux
- Python Version: 3.10
Describe the bug
I have an async application that listens to messages in a topic and runs some heavy CPU operations on each message.
In my application I’m using azure.servicebus.aio.ServiceBusClient
to create the servicebus client, azure.servicebus.aio.AutoLockRenewer
to automatically renew the lock on the messages and azure.servicebus.aio.ServiceBusReceiver
to register to the topic and receive the messages.
In some of the cases, although setting an heigh value for the AutoLockRenewer
’s max_lock_renewal_duration
param and completing the message handle before it passes, I get the error azure.servicebus.exceptions.MessageLockLostError: The lock on the message lock has expired.
when calling ServiceBusReceiver
’s complete_message()
.
Handling each message consumes lots of CPU - it includes processing big file from storage, and uploading data asynchronously back to the storage.
My assumption was that the default 10 seconds for the renewer’s _renew_period
was too small and the lock renewal was not able to happen before the lock expires. So as a workaround I overwritten this value with an higher value which reduced the amount of times the exception is throws, but unfortunately it still happens sometimes.
To Reproduce The following code snippet describes my usage in the async client and message handling -
import asyncio
from typing import Callable, Awaitable
from azure.identity.aio import DefaultAzureCredential as AsyncDefaultAzureCredential
from azure.servicebus import ServiceBusMessage
from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient, \
AutoLockRenewer as AsyncAutoLockRenewer
async def async_listen_to_queue(callback: Callable[[ServiceBusMessage], Awaitable[None]], topic_name: str,
subscription_name: str,
message_batch_size: int, handle_timeout_min: int,
listen_timeout_min: int, session_id: str = None) -> None:
async with AsyncServiceBusClient(
fully_qualified_namespace=f"my_sb.servicebus.windows.net",
credential=AsyncDefaultAzureCredential()) as async_client:
async with AsyncAutoLockRenewer(max_lock_renewal_duration=handle_timeout_min * 60) as auto_lock_renewer:
async with async_client.get_subscription_receiver(topic_name=topic_name,
subscription_name=subscription_name,
auto_lock_renewer=auto_lock_renewer,
max_wait_time=listen_timeout_min * 60,
session_id=session_id) as receiver:
should_keep_listening = True
while should_keep_listening:
received_msgs = await receiver.receive_messages(max_message_count=message_batch_size)
for msg in received_msgs:
msg_handling_succeeded = False
try:
await callback(msg)
msg_handling_succeeded = True
except Exception as e:
print(f"Failed to handle ServiceBus message. msg={msg}. error={e}")
finally:
if msg_handling_succeeded:
await receiver.complete_message(msg)
else:
await receiver.abandon_message(msg)
async def callback_example(msg: ServiceBusMessage):
print(msg)
await asyncio.sleep(10)
async def run_async_receiver():
await async_listen_to_queue(
callback=callback_example,
topic_name="my_topic",
subscription_name="my_subscription",
message_batch_size=3,
handle_timeout_min=15,
listen_timeout_min=2
)
if __name__ == '__main__':
asyncio.run(run_async_receiver())
Expected behavior
Messages are being consumed, handled and completed as long as they handled in the defined max_lock_renewal_duration
of the AutoLockRenewer
. azure.servicebus.exceptions.MessageLockLostError
error is thrown if message handling too longer.
About this issue
- Original URL
- State: closed
- Created a year ago
- Comments: 19 (7 by maintainers)
The logs are attached, you can see my log
Got message, sleeping for 6 minutes
when start handling the message and the second logDone handling
before completing the message.It does seems that the autolock_renewer works in parallel to the message handling
I was just able to resolve this. My issue was that the core code that was being worked on was a synchronous method and presumably blocking the main thread from doing any processing. I would only ever see the
DEBUG:azure.servicebus.aio._async_auto_lock_renewer:Running async lock auto-renew for XXX seconds
(from the_async_auto_lock_renewer.py
in the logs after processing completed.My solution was to run my code on the
ThreadPoolExecutor
.