azure-sdk-for-python: Async AutoLockRenewer doesn't renew locks and throws azure.servicebus.exceptions.MessageLockLostError

  • Package Name: azure-servicebus
  • Package Version: 7.11.1
  • Operating System: Linux
  • Python Version: 3.10

Describe the bug I have an async application that listens to messages in a topic and runs some heavy CPU operations on each message. In my application I’m using azure.servicebus.aio.ServiceBusClient to create the servicebus client, azure.servicebus.aio.AutoLockRenewer to automatically renew the lock on the messages and azure.servicebus.aio.ServiceBusReceiver to register to the topic and receive the messages.

In some of the cases, although setting an heigh value for the AutoLockRenewer’s max_lock_renewal_duration param and completing the message handle before it passes, I get the error azure.servicebus.exceptions.MessageLockLostError: The lock on the message lock has expired. when calling ServiceBusReceiver’s complete_message().

Handling each message consumes lots of CPU - it includes processing big file from storage, and uploading data asynchronously back to the storage.
My assumption was that the default 10 seconds for the renewer’s _renew_period was too small and the lock renewal was not able to happen before the lock expires. So as a workaround I overwritten this value with an higher value which reduced the amount of times the exception is throws, but unfortunately it still happens sometimes.

To Reproduce The following code snippet describes my usage in the async client and message handling -

import asyncio
from typing import Callable, Awaitable

from azure.identity.aio import DefaultAzureCredential as AsyncDefaultAzureCredential
from azure.servicebus import ServiceBusMessage
from azure.servicebus.aio import ServiceBusClient as AsyncServiceBusClient, \
    AutoLockRenewer as AsyncAutoLockRenewer


async def async_listen_to_queue(callback: Callable[[ServiceBusMessage], Awaitable[None]], topic_name: str,
                                subscription_name: str,
                                message_batch_size: int, handle_timeout_min: int,
                                listen_timeout_min: int, session_id: str = None) -> None:
    async with AsyncServiceBusClient(
            fully_qualified_namespace=f"my_sb.servicebus.windows.net",
            credential=AsyncDefaultAzureCredential()) as async_client:
        async with AsyncAutoLockRenewer(max_lock_renewal_duration=handle_timeout_min * 60) as auto_lock_renewer:
            async with async_client.get_subscription_receiver(topic_name=topic_name,
                                                                           subscription_name=subscription_name,
                                                                           auto_lock_renewer=auto_lock_renewer,
                                                                           max_wait_time=listen_timeout_min * 60,
                                                                           session_id=session_id) as receiver:
                should_keep_listening = True
                while should_keep_listening:
                    received_msgs = await receiver.receive_messages(max_message_count=message_batch_size)
                    for msg in received_msgs:
                        msg_handling_succeeded = False
                        try:
                            await callback(msg)
                            msg_handling_succeeded = True
                        except Exception as e:
                            print(f"Failed to handle ServiceBus message. msg={msg}. error={e}")
                        finally:
                            if msg_handling_succeeded:
                                await receiver.complete_message(msg)
                            else:
                                await receiver.abandon_message(msg)


async def callback_example(msg: ServiceBusMessage):
    print(msg)
    await asyncio.sleep(10)


async def run_async_receiver():
    await async_listen_to_queue(
        callback=callback_example,
        topic_name="my_topic",
        subscription_name="my_subscription",
        message_batch_size=3,
        handle_timeout_min=15,
        listen_timeout_min=2
    )


if __name__ == '__main__':
    asyncio.run(run_async_receiver())

Expected behavior Messages are being consumed, handled and completed as long as they handled in the defined max_lock_renewal_duration of the AutoLockRenewer. azure.servicebus.exceptions.MessageLockLostError error is thrown if message handling too longer.

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 19 (7 by maintainers)

Most upvoted comments

INFO:azure.identity.aio._credentials.environment:No environment configuration found.
INFO:azure.identity.aio._credentials.managed_identity:ManagedIdentityCredential will use IMDS
INFO:azure.servicebus._pyamqp.aio._connection_async:Connection state changed: None -> <ConnectionState.START: 0>
INFO:azure.servicebus._pyamqp.aio._connection_async:Connection state changed: <ConnectionState.START: 0> -> <ConnectionState.HDR_SENT: 2>
INFO:azure.servicebus._pyamqp.aio._connection_async:Connection state changed: <ConnectionState.HDR_SENT: 2> -> <ConnectionState.HDR_SENT: 2>
INFO:azure.servicebus._pyamqp.aio._connection_async:Connection state changed: <ConnectionState.HDR_SENT: 2> -> <ConnectionState.OPEN_PIPE: 4>
INFO:azure.servicebus._pyamqp.aio._session_async:Session state changed: <SessionState.UNMAPPED: 0> -> <SessionState.BEGIN_SENT: 1>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link receiver state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link sender state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1>
INFO:azure.servicebus._pyamqp.aio._connection_async:Connection state changed: <ConnectionState.OPEN_PIPE: 4> -> <ConnectionState.OPEN_SENT: 7>
INFO:azure.servicebus._pyamqp.aio._connection_async:Connection state changed: <ConnectionState.OPEN_SENT: 7> -> <ConnectionState.OPENED: 9>
INFO:azure.servicebus._pyamqp.aio._session_async:Session state changed: <SessionState.BEGIN_SENT: 1> -> <SessionState.MAPPED: 3>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link receiver state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link sender state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3>
INFO:azure.servicebus._pyamqp.aio._cbs_async:CBS completed opening with status: <ManagementOpenResult.OK: 1>
INFO:azure.core.pipeline.policies.http_logging_policy:Request URL: 'http://169.254.169.254/metadata/identity/oauth2/token?api-version=REDACTED&resource=REDACTED'
Request method: 'GET'
Request headers:
    'User-Agent': 'azsdk-python-identity/1.13.0 Python/3.10.9 (Linux-5.10.16.3-microsoft-standard-WSL2-x86_64-with-glibc2.31)'
No body was attached to the request
INFO:azure.identity.aio._credentials.chained:DefaultAzureCredential acquired a token from AzureCliCredential
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3>
Got message, sleeping for 6 minutes
DEBUG:azure.servicebus.aio._async_auto_lock_renewer:Running async lock auto-renew for 900 seconds
DEBUG:azure.servicebus.aio._async_auto_lock_renewer:10 seconds or less until lock expires - auto renewing.
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link receiver state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link sender state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link receiver state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link sender state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3>
DEBUG:azure.servicebus.aio._async_auto_lock_renewer:10 seconds or less until lock expires - auto renewing.
DEBUG:azure.servicebus.aio._async_auto_lock_renewer:10 seconds or less until lock expires - auto renewing.
DEBUG:azure.servicebus.aio._async_auto_lock_renewer:10 seconds or less until lock expires - auto renewing.
DEBUG:azure.servicebus.aio._async_auto_lock_renewer:10 seconds or less until lock expires - auto renewing.
DEBUG:azure.servicebus.aio._async_auto_lock_renewer:10 seconds or less until lock expires - auto renewing.
DEBUG:azure.servicebus.aio._async_auto_lock_renewer:10 seconds or less until lock expires - auto renewing.
Done handling
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link receiver state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link sender state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link sender state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link receiver state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
INFO:azure.servicebus._pyamqp.aio._session_async:Session state changed: <SessionState.MAPPED: 3> -> <SessionState.END_SENT: 4>
INFO:azure.servicebus._pyamqp.aio._connection_async:Connection state changed: <ConnectionState.OPENED: 9> -> <ConnectionState.CLOSE_SENT: 11>
INFO:azure.servicebus._pyamqp.aio._connection_async:Connection state changed: <ConnectionState.CLOSE_SENT: 11> -> <ConnectionState.END: 13>
INFO:azure.servicebus._pyamqp.aio._session_async:Session state changed: <SessionState.END_SENT: 4> -> <SessionState.DISCARDING: 6>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link sender state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link receiver state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management link sender state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
INFO:azure.servicebus._pyamqp.aio._link_async:Link state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
INFO:azure.servicebus._pyamqp.aio._management_link_async:Management l

The logs are attached, you can see my log Got message, sleeping for 6 minutes when start handling the message and the second log Done handling before completing the message.

It does seems that the autolock_renewer works in parallel to the message handling

I was just able to resolve this. My issue was that the core code that was being worked on was a synchronous method and presumably blocking the main thread from doing any processing. I would only ever see the DEBUG:azure.servicebus.aio._async_auto_lock_renewer:Running async lock auto-renew for XXX seconds (from the _async_auto_lock_renewer.py in the logs after processing completed.

My solution was to run my code on the ThreadPoolExecutor.

_executor = ThreadPoolExecutor()
...
loop = asyncio.get_event_loop()
ingest_outputs = await loop.run_in_executor(_executor, doWork, doWorkParam1)