azure-sdk-for-java: [BUG] ServiceBusReceiverClient stops consuming messages after some time though messages are present in subscription.
Describe the bug We are using ServiceBusReceiverClient as receiver but we have observed that though the services were running and messages were present in subscription it has stopped consuming the messages .
Exception or Stack Trace
2022-01-06` 15:14:25,169 ERROR [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Error in SendLinkHandler. Disposing unconfirmed sends. The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96] com.azure.core.amqp.exception.AmqpException: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96] at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:85) at com.azure.core.amqp.implementation.handler.LinkHandler.handleRemoteLinkClosed(LinkHandler.java:110) at com.azure.core.amqp.implementation.handler.LinkHandler.onLinkRemoteClose(LinkHandler.java:61) at com.azure.core.amqp.implementation.handler.SendLinkHandler.onLinkRemoteClose(SendLinkHandler.java:29) at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176) at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:92) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2022-01-06 15:14:25,196 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Closing request/response channel. 2022-01-06 15:14:25,196 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Closing request/response channel. 2022-01-06 15:14:25,220 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel:$cbs - namespace[MF_7b61df_1641477262829] entityPath[$cbs]: Retry #1. Transient error occurred. Retrying after 4964 ms. com.azure.core.amqp.exception.AmqpException: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96] 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] terminating 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]). 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] terminating 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]). 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] completed the termination of 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]). 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] completed the termination of 0 unconfirmed sends (reason: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25, errorContext[NAMESPACE: ################.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 96]). 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Channel already closed. 2022-01-06 15:14:25,221 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] Channel already closed. 2022-01-06 15:14:25,222 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] SendLinkHandler disposed. Remaining: 1 2022-01-06 15:14:25,222 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] SendLinkHandler disposed. Remaining: 1 2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - onLinkRemoteClose connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver], errorCondition[amqp:connection:forced] errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25] 2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - onLinkRemoteClose connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver], errorCondition[amqp:connection:forced] errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container ‘LinkTracker’. TrackingId:7381d0b956d84c6d8cd4016b7ecefec0_G1, SystemTracker:gateway5, Timestamp:2022-01-06T15:14:25] 2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver] state[ACTIVE] Local link state is not closed. 2022-01-06 15:14:25,224 INFO [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.handler.ReceiveLinkHandler - connectionId[MF_7b61df_1641477262829] linkName[cbs:receiver] state[ACTIVE] Local link state is not closed. 2022-01-06 15:14:25,224 DEBUG [reactor-executor-4(109288)] {} com.azure.core.amqp.implementation.RequestResponseChannel - connectionId[MF_7b61df_1641477262829] linkName[cbs] ReceiveLinkHandler disposed. Remaining: 0
To Reproduce This issue is coming randomly.
Expected behavior ServiceBusReceiverClient should consume the messages available in subscription.
Setup (please complete the following information):
- OS: Linux
- IDE: IntelliJ
- Library/Libraries: com.azure:azure-messaging-servicebus-7.5.1.jar
- Java version: Java 8
Additional context Similar to issue https://github.com/Azure/azure-sdk-for-java/issues/26138
Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
- Bug Description Added
- Repro Steps Added
- Setup information Added
About this issue
- Original URL
- State: open
- Created 2 years ago
- Comments: 24 (9 by maintainers)
Commits related to this issue
- CodeGen from PR 26465 in Azure/azure-rest-api-specs fixing typo elasticSanResourceId (#26465) — committed to azure-sdk/azure-sdk-for-java by deleted user 8 months ago
@anuchandy thank you for such a detailed response. I can try to get a camel example in the next couple days. I also tried the ServiceBusProcessorClient in a simple test service (not using Camel) and did the same “disable” the Service Bus queue test like I mentioned previously and with that it had no problems reconnecting and picking up messages again. Seems to align with what @nomisRev was describing.
I did want to add some log messages that I got after enabling WARN level logs in prod and then this issue occurred again yesterday. Fortunately I have a monitor/alert in place watching queue depths as a way to alert me when this happens. I looked through logs that Application Insights was able to give me and pulled out any of the relevant logs that I could find for when this happened again yesterday. I will post them below to see if that maybe gives any additional insight for you or anyone else. Apologies for the format, but I didn’t find a good way to export them in a nice looking log stream format out of Application Insights for just the specific logs I wanted to share here:
Hi @nomisRev, to give you some background, the messages arrive via an
amqp-linkthat has the life cycle “open-event -> message-delivery-events -> close-event”. TheLowLevelClientcreates a newamqp-linkwhen the currentamqp-linkemits a close-event. You asked me in the past where this link recovery logic is, which I missed to follow up; the logic is in the class “ServiceBusReceiveLinkProcessor” here.An
amqp-sessionowns theamqp-link, and multipleamqp-sessionare owned by anamqp-connection. Both of these ancestors have the life-cycle, and has recovery logic. Theamqp-connectionrecovery logic is in the generic class “AmqpChannelProcessor” here .As you can see, the underlying types “react” to close (successful or error) events by creating a new
amqp-link/amqp-connection(collectively called channels).The expectation is that these “reactive” logic should take care of recovery, but there were edge cases (which are very hard to repro) where these “reactive” handlers never get to kick off recovery due to signal missing. If you go through the SDK changelog, you can find multiple signal loss root causing via DEBUG logs. A signal loss bug fix for the “reactive” recovery is coming in mid-July release (details here).
The historical reason for “HighLevelClient” using a timer to “proactive” channel check (in addition to “LowLevelClient”'s “reactive” channel check) is something I need to check.
Interestingly, you’re not (so far) seem to have recovery problems with the queue entity but with the topic entity. I’ll kick off a long-running topic test with DEBUG enabled to see if we can get some helpful log.
Hi @mseiler90, thanks for the Camel project; I’ll use it to debug to understand how Camel is doing wiring with SDK.
But your observation “It seems that this has only been happening a specific queue where we are picking up the message and then sending a REST API call to another service” is giving some hint. I think what could be happening here is -
As you investigate the REST API call connection (read) timeout, see if app was running into the above flow.
I’ll find sometime go over the Camel sample to understand the Camel’s bridging with the SDK and see if there is any issue in bridging.
Hey @anuchandy,
I can confirm that we’re not throwing an exception from our own code, all our code is wrapped in
try/catchwhere it interleaves code with the Azure SDK. So we havetry/catcharound our processing of the messages, as well as around the settlement of the messages. We have added logs everywhere in our Azure interaction and cannot see errors coming from Azure nor from our own code. We’re seeing this on Topics, and we don’t seem to be having issues with Queues btw.As mentioned in my comments above,
HighLevelClientdoesn’t seem to have this issue because it implements a patch aroundLowLevelClient. I am curious why this patch is placed inside theHighLevelClientrather than theLowLevelClient, theLowLevelClientshould also act on changes in theServiceBusConnectionProcessor#isChannelClosedno?Currently I can also see that
HighLevelClientis polling thepublic boolean isChannelClosed()state, but it seems this can also be implemented in a reactive manner to improve perf and reaction time.I’m happy to discuss further, or share any more details if you need more information. I can sadly not share our production code, but I can rewrite them as I’ve done above to share a patch. We seem to have success with this patch, but more time / battle testing in production will confirm that.
Hey @anuchandy,
I made a quick demo camel repo which can be found here https://github.com/mseiler90/camel-demo
You asked for a number of configurations which I did not include because Camel is taking care of everything for us by default. We have the ability to set certain configurations as seen in the Camel documentation here https://camel.apache.org/components/3.17.x/azure-servicebus-component.html, but I wanted to create this using the “out of the box” and default settings which we are currently using in production. For example, we are not disabling auto-complete, we aren’t setting our own retry options, and we don’t have any custom logic built around the Service Bus clients. We have tried tests with things like setting retry options, but nothing has helped. As you can see by the example repo, Camel with the help of the Azure SDK is taking care of all of our configurations for us as it should.
For testing, if you just update the application.yml file with a Service Bus connection string and a queue name, then you should be able to just start it up in your IDE and it will automatically put a message on the queue every 10 seconds. There is a Camel route that is listening on that same queue and just logs out the body of the message. Once that is up and running, if you do the “disable” on the queue and after several seconds or so, reactivate the queue. You will see that messages still get sent to the queue, but will no longer get picked up until restarting the application.
As we already have discussed, the High Level Client would successfully reconnect on the receiver, but the Low Level Client doesn’t. We just would like to see the same “isChannelClosed” logic in the Low Level Client and it seems everything should work as we would hope for.
I do agree that we have an issue causing the connection to get lost to begin with, but regardless of it losing the connection to begin with, we should be able to automatically reconnect on the receiver without writing custom logic to do so. I did find some additional information on our production issue. It seems that this has only been happening a specific queue where we are picking up the message and then sending a REST API call to another service. I have found logs in Application Insights showing that there was a
java.net.SocketException: Connection timed out (Read failed)for the API call at around the same time the Service Bus issue occurs. Looking back at all occurrences (which is like 8 times now in the past couple weeks), we see this connection timeout error at the same time. So it seems that may be causing the lost connection on the queue somehow and we’ll need to investigate why that connection timeout is occurringWe are seeing similar issues in our production environment where we have an Azure App Service connected to Service Bus with multiple queues either sending to, receiving from, or both depending on the queue and business flow. We are using the ServiceBusSenderAsyncClient and ServiceBusReceiverAsyncClient in this App Service. We have 4 randomly occurring occasions where a queue just starts filling up with messages and the receiver is no longer picking up messages. We get a alerted as we have monitoring alerts setup, but at this time, the quick “fix” is to restart the App Service and it will start picking up the messages again. We haven’t been able to quite figure out what exactly is causing the connection to randomly be lost, but regardless, we would expect that even if there were a network issue or Service Bus server error, the ServiceBusReceiverAsyncClient would be able to reestablish a connection.
I have found a way to replicate this in our nonprod environment by doing the following:
At this point, the sending of the message to the queue works as expected without issues. The connection on the send was reestablished, however, the message just sits in the queue and will never get picked up by the receiver because it has not reconnected.
This may not be exactly what is happening in production to cause the lost connection to begin with, but we should expect that after making the queue “active” again in this test that our ServiceBusReceiverAsyncClient would have noticed the lost connection and made a new one. Or at least attempt to make a new connection until it succeeds. Even setting AmqpRetryOptions on the ServiceBusClientBuilder doesn’t help.
It seems like there are a number of both open and closed issues that are similar to this issue (though not specifically this and seeing some similar ones that were for eventhubs and blob storage).