azure-sdk-for-python: Service Bus python SDK output log shows 3s interval before throw exception.
- Package Name: azure_servicebus
- Package Version: 7.11.2
- Operating System:Azure Function App
- Python Version: TBD
Describe the bug
- The latency issue occurs intermittently without any pattern when send message to service bus standard tier namesapce.
- When the code running without latency, the application log is like this:
- When the code running with latency, I can see 2 exceptions, “AMQPConnectionError('Error condition: ErrorCondition.SocketError” and "AMQPLinkError(“Error condition: amqp🔗detach-forced”. The traffic is public to public, client side is without high cpu/memory
When the socket error occurred, the SDK output as following, there’s a 3s delay between link sender state changed and exception throw
10/27/2023 5:58:02 AM | Executing ‘Functions.enqueueData1’ (Reason=‘This function was programmatically called via the host APIs.’, Id=2f0b2079-75b7-4345-907e-90256c4fde97) 10/27/2023 5:58:02 AM | Python HTTP trigger function processed a request. 10/27/2023 5:58:02 AM | Message validation begins 10/27/2023 5:58:02 AM | Message validation completed 10/27/2023 5:58:02 AM | Sending message 10/27/2023 5:58:02 AM | Session state changed: <SessionState.UNMAPPED: 0> -> <SessionState.END_SENT: 4> 10/27/2023 5:58:02 AM | An error occurred when closing the connection: AMQPConnectionError(‘Error condition: ErrorCondition.SocketError\n Error Description: Can not send frame out due to exception: [Errno 104] Connection reset by peer’) 10/27/2023 5:58:02 AM | Connection state changed: <ConnectionState.OPENED: 9> -> <ConnectionState.END: 13> 10/27/2023 5:58:02 AM | Connection state changed: None -> <ConnectionState.START: 0> 10/27/2023 5:58:02 AM | Connection state changed: <ConnectionState.START: 0> -> <ConnectionState.HDR_SENT: 2> 10/27/2023 5:58:02 AM | Connection state changed: <ConnectionState.HDR_SENT: 2> -> <ConnectionState.HDR_SENT: 2> 10/27/2023 5:58:02 AM | Connection state changed: <ConnectionState.HDR_SENT: 2> -> <ConnectionState.OPEN_PIPE: 4> 10/27/2023 5:58:02 AM | Session state changed: <SessionState.UNMAPPED: 0> -> <SessionState.BEGIN_SENT: 1> 10/27/2023 5:58:02 AM | Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 10/27/2023 5:58:02 AM | Management link receiver state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 10/27/2023 5:58:02 AM | Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 10/27/2023 5:58:02 AM | Management link sender state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 10/27/2023 5:58:02 AM | Connection state changed: <ConnectionState.OPEN_PIPE: 4> -> <ConnectionState.OPEN_SENT: 7> 10/27/2023 5:58:02 AM | Connection state changed: <ConnectionState.OPEN_SENT: 7> -> <ConnectionState.OPENED: 9> 10/27/2023 5:58:02 AM | Session state changed: <SessionState.BEGIN_SENT: 1> -> <SessionState.MAPPED: 3> 10/27/2023 5:58:02 AM | Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 10/27/2023 5:58:02 AM | Management link receiver state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 10/27/2023 5:58:02 AM | Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 10/27/2023 5:58:02 AM | Management link sender state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 10/27/2023 5:58:05 AM | AMQP error occurred: (AMQPLinkError(“Error condition: amqp🔗detach-forced\n Error Description: The link ‘G20:104524099:6f059c89-6e30-4486-9be2-37f33cd4d84a’ is force detached. Code: publisher(link65173734). Details: AmqpMessagePublisher.IdleTimerExpired: Idle timeout: 00:10:00.”)), condition: (b’amqp🔗detach-forced’), description: (b"The link ‘G20:104524099:6f059c89-6e30-4486-9be2-37f33cd4d84a’ is force detached. Code: publisher(link65173734). Details: AmqpMessagePublisher.IdleTimerExpired: Idle timeout: 00:10:00."). 10/27/2023 5:58:05 AM | Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4> 10/27/2023 5:58:05 AM | Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4> 10/27/2023 5:58:05 AM | Management link sender state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4> 10/27/2023 5:58:05 AM | Session state changed: <SessionState.MAPPED: 3> -> <SessionState.END_SENT: 4> 10/27/2023 5:58:05 AM | Connection state changed: <ConnectionState.OPENED: 9> -> <ConnectionState.CLOSE_SENT: 11> 10/27/2023 5:58:05 AM | Management link receiver state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4> 10/27/2023 5:58:05 AM | Connection state changed: <ConnectionState.CLOSE_SENT: 11> -> <ConnectionState.END: 13> 10/27/2023 5:58:05 AM | Session state changed: <SessionState.END_SENT: 4> -> <SessionState.DISCARDING: 6> 10/27/2023 5:58:05 AM | Link state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0> 10/27/2023 5:58:05 AM | Management link sender state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0> 10/27/2023 5:58:05 AM | Link state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0> 10/27/2023 5:58:05 AM | Management link receiver state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0> 10/27/2023 5:58:06 AM | ‘servicebus.pysdk-ad8e5bf4’ has an exception (ServiceBusConnectionError(“The link ‘G20:104524099:6f059c89-6e30-4486-9be2-37f33cd4d84a’ is force detached. Code: publisher(link65173734). Details: AmqpMessagePublisher.IdleTimerExpired: Idle timeout: 00:10:00. Error condition: amqp🔗detach-forced.”)). Retrying… 10/27/2023 5:58:06 AM | Connection state changed: None -> <ConnectionState.START: 0> 10/27/2023 5:58:06 AM | Connection state changed: <ConnectionState.START: 0> -> <ConnectionState.HDR_SENT: 2> 10/27/2023 5:58:06 AM | Connection state changed: <ConnectionState.HDR_SENT: 2> -> <ConnectionState.OPEN_PIPE: 4> 10/27/2023 5:58:06 AM | Session state changed: <SessionState.UNMAPPED: 0> -> <SessionState.BEGIN_SENT: 1> 10/27/2023 5:58:06 AM | Connection state changed: <ConnectionState.HDR_SENT: 2> -> <ConnectionState.HDR_SENT: 2> 10/27/2023 5:58:06 AM | Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 10/27/2023 5:58:06 AM | Management link receiver state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 10/27/2023 5:58:06 AM | Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 10/27/2023 5:58:06 AM | Management link sender state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 10/27/2023 5:58:06 AM | Connection state changed: <ConnectionState.OPEN_PIPE: 4> -> <ConnectionState.OPEN_SENT: 7> 10/27/2023 5:58:06 AM | Connection state changed: <ConnectionState.OPEN_SENT: 7> -> <ConnectionState.OPENED: 9> 10/27/2023 5:58:06 AM | Session state changed: <SessionState.BEGIN_SENT: 1> -> <SessionState.MAPPED: 3> 10/27/2023 5:58:06 AM | Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 10/27/2023 5:58:06 AM | Management link receiver state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 10/27/2023 5:58:06 AM | Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 10/27/2023 5:58:06 AM | Management link sender state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 10/27/2023 5:58:07 AM | Link state changed: <LinkState.DETACHED: 0> -> <LinkState.ATTACH_SENT: 1> 10/27/2023 5:58:07 AM | Link state changed: <LinkState.ATTACH_SENT: 1> -> <LinkState.ATTACHED: 3> 10/27/2023 5:58:07 AM | Message sent successfully 10/27/2023 5:58:07 AM | Executed ‘Functions.enqueueData1’ (Succeeded, Id=2f0b2079-75b7-4345-907e-90256c4fde97, Duration=5085ms)
-
found a smilar github bug Async Eventhub Consumer with Pyamqp: Link Detached after long idle period Results in High “User Errors” volume · Issue #28996 · Azure/azure-sdk-for-python · GitHub
-
support ticket opened 2310260030004667
To Reproduce Steps to reproduce the behavior:
- code we are using `---------------------------------------------------------------------------------- shared code
def get_service_bus_queue_sender(self, conn, queue): client = ServiceBusClient.from_connection_string(conn, retry_mode= “exponential” , retry_backoff_factor= 0.5) logging.info(“Service bus sender initializing”) sender = client.get_queue_sender(queue) logging.info(“Service bus sender initialization completed”) return sender
Application code
import os import sys import json import azure.functions as func import logging from Common.shared_code.error_response_helper import ErrorResponse from azure.servicebus import ServiceBusMessage from datetime import timezone import datetime as dt from Common.shared_code.shared import Shared, Trace
msg_expiry_time = os.environ[‘MSG_EXPIRY_TIME_IN_HOURS’]
from pathlib import Path script_location = Path(file).absolute().parents[1]
sender = None sender_001= None sender_002= None sender_003= None sender_004= None sender_005= None sender_006= None sender_007= None sender_008= None
class Enqueue: “”" Enqueue class to get json output"“” def init(self): self.error = None global sender global sender_001 global sender_002 global sender_003 global sender_004 global sender_005 global sender_006 global sender_007 global sender_008
self.__queue_001 = "ellipsefoundation001"
self.__queue_002 = "ellipsefoundation002"
self.__queue_003 = "ellipsefoundation003"
self.__queue_004 = "ellipsefoundation004"
self.__queue_005 = "ellipsefoundation005"
self.__queue_006 = "ellipsefoundation006"
self.__queue_007 = "ellipsefoundation007"
self.__queue_008 = "ellipsefoundation008"
self.__api_name = "enqueueData1"
self.__is_valid = True
if sender_001 is None:
sender_001 = Shared().get_service_bus_queue_sender(os.environ["SERVICE_BUS_CONNECTION_STR_ellipsefoundation001"], self.__queue_001)
if sender_002 is None:
sender_002 = Shared().get_service_bus_queue_sender(os.environ["SERVICE_BUS_CONNECTION_STR_ellipsefoundation002"], self.__queue_002)
if sender_003 is None:
sender_003 = Shared().get_service_bus_queue_sender(os.environ["SERVICE_BUS_CONNECTION_STR_ellipsefoundation003"], self.__queue_003)
if sender_004 is None:
sender_004 = Shared().get_service_bus_queue_sender(os.environ["SERVICE_BUS_CONNECTION_STR_ellipsefoundation004"], self.__queue_004)
if sender_005 is None:
sender_005 = Shared().get_service_bus_queue_sender(os.environ["SERVICE_BUS_CONNECTION_STR_ellipsefoundation005"], self.__queue_005)
if sender_006 is None:
sender_006 = Shared().get_service_bus_queue_sender(os.environ["SERVICE_BUS_CONNECTION_STR_ellipsefoundation006"], self.__queue_006)
if sender_007 is None:
sender_007 = Shared().get_service_bus_queue_sender(os.environ["SERVICE_BUS_CONNECTION_STR_ellipsefoundation007"], self.__queue_007)
if sender_008 is None:
sender_008 = Shared().get_service_bus_queue_sender(os.environ["SERVICE_BUS_CONNECTION_STR_ellipsefoundation008"], self.__queue_008)
def push_data(self, input_json):
""" function which send output response """
try:
req_data = json.loads(input_json)
# Validating the request payload
schema = script_location /'enqueueData'/'request_payload_schema.json'
with open(schema) as f:
schema = json.load(f)
logging.info("Message validation begins")
self.__is_valid = Shared().validate_json(req_data, schema)
logging.info("Message validation completed")
if self.__is_valid[0]:
configured_tables = list(dict(os.environ).keys())
table = req_data["control"]["table"]
if table in configured_tables:
table_config = os.environ[table]
else:
table_config = os.environ["ELLIPSE_TABLES"]
queue_name = table_config
session_id = table
correlation_id = req_data["messageContext"]["correlationId"]
message_id = req_data["messageContext"]["messageId"]
if queue_name == self.__queue_001:
sender = sender_001
elif queue_name == self.__queue_002:
sender = sender_002
elif queue_name == self.__queue_003:
sender = sender_003
elif queue_name == self.__queue_004:
sender = sender_004
elif queue_name == self.__queue_005:
sender = sender_005
elif queue_name == self.__queue_006:
sender = sender_006
elif queue_name == self.__queue_007:
sender = sender_007
elif queue_name == self.__queue_008:
sender = sender_008
current_time = dt.datetime.utcnow()
current_time = current_time.replace(tzinfo=timezone(dt.timedelta(hours= 0), name="GMT"))
insertion_time = current_time.strftime(f"%A, %d %b %Y %H:%M:%S %Z")
expiry_time = current_time + dt.timedelta(hours= int(msg_expiry_time))
expiry_time = expiry_time.strftime(f"%A, %d %b %Y %H:%M:%S %Z")
logging.info("Sending message")
sender.send_messages(ServiceBusMessage(input_json, session_id=session_id, correlation_id=correlation_id, message_id= message_id, content_type='application/json', time_to_live= dt.timedelta(hours= int(msg_expiry_time))))
logging.info("Message sent successfully")
response_file = script_location /'enqueueData'/'sample_response.json'
with open(response_file) as f:
response = json.load(f)
response["queue"]["queueName"] = queue_name
response["queue"]["queueMessagesList"][0]["queueMessage"]["messageId"] = message_id
response["queue"]["queueMessagesList"][0]["queueMessage"]["insertionTime"] = insertion_time
response["queue"]["queueMessagesList"][0]["queueMessage"]["expirationTime"] = expiry_time
response =json.dumps(response)
Trace().trace_attributes(self.__api_name, message_id, table, queue_name)
return func.HttpResponse(response, status_code=201, mimetype = 'application/json; charset=utf8')
else:
self.error = self.__is_valid[1]
error_json = {
"message": self.error.message,
"path": self.error.json_path
}
error_json = json.dumps(error_json)
return func.HttpResponse(error_json, status_code=400)
except Exception:
retmsg = 'Error - Error Info :' + str(sys.exc_info())
logging.error(f"{retmsg}")
return ErrorResponse().error_log('Internal Server Error',500) `
Expected behavior when there’s link detach error, the retry should soon success without 3s interval.
Screenshots If applicable, add screenshots to help explain your problem.
Additional context Add any other context about the problem here.
About this issue
- Original URL
- State: open
- Created 8 months ago
- Comments: 24 (8 by maintainers)
Hi @swathipil thanks for the reply, the log is all SDK activities and it is similar to the previous debug log we provided, it seems that when there was idle connection exception, the SDK has some logic spend 3s up to 2 mins to perform retry, 3s is acceptable but 2mins is too long waiting for an exception to retry, could you provide any possiable cause for this or is there any logic/configuration we can try to reduce this time
Hi @kashifkhan Im able to reproduce it with following code snippet
Here are my debug logs
As you can see from the logs. Once the link is detached it takes close to 7 seconds to re-establish link and send message