azure-sdk-for-java: [BUG] ASB consumer stops with "ReactorDispatcher instance is closed", only recovers after restart
Describe the bug The behaviour looks quite similar to this issue, although we are using recent library versions: https://github.com/Azure/azure-service-bus-java/issues/335
Multiple (independent) applications, reading from different topics/queues stopped consuming messages from the servicebus roughly in the same time span. (see increase of active messages)

we recently upgraded to ‘azure-messaging-servicebus’ version: ‘7.4.1’ and did not see this behaviour before, but shortly after upgrading it happened with 2 different servicebus namespaces.
Exception or Stack Trace
the logs of “orderdataservice” were showing the following errors:
ReactorDispatcher instance is closed.
...
Cannot add credits to closed link: dgl-s1-order-topic/subscriptions/dgl-s1-orderdataservice_e0f4d2_1635333545378
...
Operator called default onErrorDropped
thrown_cause_extendedStackTrace
java.lang.IllegalStateException: Cannot add credits to closed link: dgl-s1-order-topic/subscriptions/dgl-s1-orderdataservice_e0f4d2_1635333545378
at com.azure.core.amqp.implementation.ReactorReceiver.addCredits(ReactorReceiver.java:176) ~[azure-core-amqp-2.3.2.jar:2.3.2]
at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.checkAndAddCredits(ServiceBusReceiveLinkProcessor.java:537) ~[azure-messaging-servicebus-7.4.1.jar:7.4.1]
at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.onNext(ServiceBusReceiveLinkProcessor.java:242) ~[azure-messaging-servicebus-7.4.1.jar:7.4.1]
at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.onNext(ServiceBusReceiveLinkProcessor.java:43) ~[azure-messaging-servicebus-7.4.1.jar:7.4.1]
at reactor.core.publisher.FluxRepeatPredicate$RepeatPredicateSubscriber.onNext(FluxRepeatPredicate.java:86) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.Operators$MonoSubscriber.request(Operators.java:1906) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.SerializedSubscriber.request(SerializedSubscriber.java:151) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.9.jar:3.4.9]
at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.requestUpstream(ServiceBusReceiveLinkProcessor.java:413) ~[azure-messaging-servicebus-7.4.1.jar:7.4.1]
at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.lambda$onNext$5(ServiceBusReceiveLinkProcessor.java:237) ~[azure-messaging-servicebus-7.4.1.jar:7.4.1]
at reactor.core.publisher.LambdaSubscriber.onComplete(LambdaSubscriber.java:132) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onComplete(FluxSubscribeOn.java:166) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxDistinct$DistinctFuseableSubscriber.onComplete(FluxDistinct.java:501) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:805) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:898) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxReplay$ReplaySubscriber.onComplete(FluxReplay.java:1273) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxDistinctUntilChanged$DistinctUntilChangedSubscriber.onComplete(FluxDistinctUntilChanged.java:173) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:805) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:898) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.ReplayProcessor.tryEmitComplete(ReplayProcessor.java:465) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.SinkManySerialized.tryEmitComplete(SinkManySerialized.java:64) ~[reactor-core-3.4.9.jar:3.4.9]
at reactor.core.publisher.InternalManySink.emitComplete(InternalManySink.java:68) ~[reactor-core-3.4.9.jar:3.4.9]
at com.azure.core.amqp.implementation.handler.Handler.close(Handler.java:132) ~[azure-core-amqp-2.3.2.jar:2.3.2]
at com.azure.core.amqp.implementation.handler.LinkHandler.handleRemoteLinkClosed(LinkHandler.java:115) ~[azure-core-amqp-2.3.2.jar:2.3.2]
at com.azure.core.amqp.implementation.handler.LinkHandler.onLinkRemoteClose(LinkHandler.java:61) ~[azure-core-amqp-2.3.2.jar:2.3.2]
at com.azure.core.amqp.implementation.handler.ReceiveLinkHandler.onLinkRemoteClose(ReceiveLinkHandler.java:193) ~[azure-core-amqp-2.3.2.jar:2.3.2]
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176) ~[proton-j-0.33.8.jar:?]
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) ~[proton-j-0.33.8.jar:?]
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) ~[proton-j-0.33.8.jar:?]
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) ~[proton-j-0.33.8.jar:?]
at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:86) ~[azure-core-amqp-2.3.2.jar:2.3.2]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) [reactor-core-3.4.9.jar:3.4.9]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) [reactor-core-3.4.9.jar:3.4.9]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
the logs of “userdataservice” look similar:
ReactorDispatcher instance is closed.
...
Cannot add credits to closed link: dgl-s1-cdc-webhook-queue_a9432f_1634858677706
...
lockToken[eeac4396-6bb3-4e3e-b376-bcff2ecd15fc]. state[Accepted{}]. Cannot update disposition with no link.
...
Operator called default onErrorDropped
...
ReactorSender connectionId[MF_28cd08_1635368968533] linkName[dgl-s1-cdc-webhook-queuedgl-s1-cdc-webhook-queue]: Waiting for send and receive handler to be ACTIVE
Retries exhausted: 3/3
To Reproduce
- run consumer application
- wait for possible temporary network issue
- consumers do not recover automatically
- restart consumers -> everything works again.
Code Snippet Consumer is created as a spring bean as follows and not closed anywhere
@Bean
public static ServiceBusProcessorClient serviceBusAfsQueueConsumer(
@Value("${servicebus.afsQueue.connectionString}") String connectionString,
@Value("${servicebus.afsQueue.name}") String afsQueueName,
@Value("${servicebus.afsQueue.maxConcurrentThreads:#{1}}") int maxConcurrentThreads,
Consumer<ServiceBusReceivedMessageContext> afsQueueAsbConsumer,
Consumer<ServiceBusErrorContext> asbErrorLoggingConsumer) {
ServiceBusProcessorClient client = new ServiceBusClientBuilder()
.connectionString(connectionString)
.processor()
.disableAutoComplete()
.queueName(afsQueueName)
.maxConcurrentCalls(maxConcurrentThreads)
.processMessage(afsQueueAsbConsumer)
.processError(asbErrorLoggingConsumer)
.buildProcessorClient();
client.start();
return client;
}
Expected behavior Consumers recover by themselves again after network issues and don’t get closed. (Note that our code nowhere explicitly closes processorclients)
Setup (please complete the following information):
- Java 11
- ‘com.azure’, name: ‘azure-core’, version: ‘1.20.0’
- ‘com.azure’, name: ‘azure-messaging-servicebus’, version: ‘7.4.1’
- running in docker in kubernetes cluster on Azure cloud (AKS)
Additional context
- affected servicebus namespace names: epp-staging, epp-prod
- date of incidents: Oct 27/28 2021
Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
- Bug Description Added
- Repro Steps Added
- Setup information Added
Please let me know if I can provide more information to you.
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Reactions: 2
- Comments: 15 (8 by maintainers)
@p4p4 Unfortunately, there is not enough info in the INFO log to confirm the reason for this to happen. The only thing I could find is, there is a graceful closure of endpoint (without error), the only known (fixed) issue we could map it to this one https://github.com/Azure/azure-sdk-for-java/blob/azure-core-amqp_2.3.3/sdk/core/azure-core-amqp/CHANGELOG.md#bugs-fixed, which is addressed in SB 7.4.2, but you’re on 7.4.1. Again I’m unable to confirm this since the error pattern is available only in DEBUG level.
@jiyongseong the “ReactorDispatcher instance is closed” is misleading (I think we suppressed it in the recent version because it isn’t actually the cause of errors, I need to check). If you have any DEBUG logs we can see what happened before this exception.