azure-sdk-for-java: ServiceBus deadlocks when receiving the messages
Describe the bug There are multiple situations when we might get into a deadlock while receiving messages from ServiceBus. That is critical cause makes it almost impossible to use ServiceBus reliably if we want to control message acknowledgment.
To Reproduce There are multiple cases, in all cases, we want to acknowledge the message manually.
- If and the message processing time is larger than the message lock duration set for the subscription and
maxAutoLockRenewDuration
isDuration.ZERO
, the message will be failed to be completed which is expected, but we also expect that the message will be redelivered, but the service bus just hangs and no new messages will be received. - If and the message processing time is larger than the message lock duration set for the subscription and
maxAutoLockRenewDuration
is set to a large value, the message will be competed which is expected, but the service bus just hangs and no new messages will be received. - If the message is not acknowledged at all, we expect that message will be redelivered, but the service bus will just hand forever and no new messages will be processed.
Code Snippet There is an example for case number 2, but that easy to reproduce the rest.
The message lock duration should be set to 10 seconds, in the sample the time to process the message will be 12 seconds. We expect that message will be redelivered over and over, but the consumer will just hang after the first try.
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.connectionString(connectionString)
.receiver()
.disableAutoComplete()
.prefetchCount(1)
.maxAutoLockRenewDuration(Duration.ZERO)
.topicName("test")
.subscriptionName("test")
.buildAsyncClient();
Disposable subscription = receiver.receiveMessages()
.flatMap(message -> {
System.out.printf("Sequence #: %s. Contents: %s%n", message.getSequenceNumber(), message.getBody());
try {
Thread.sleep(12_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return receiver
.complete(message)
.onErrorResume(e -> {
e.printStackTrace();
return Mono.empty();
});
})
.doOnError(e -> System.out.println("ERROR: " + e.getMessage()))
.doOnComplete(() -> System.out.println("DONE"))
.subscribe();
Expected behavior In all the cases ServiceBus should not hang and should proceed consuming new messages or redeliver late messages.
Setup (please complete the following information):
- OS: macOS
- IDE : IntelliJ
- Version of the Library used
7.0.1
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 16 (6 by maintainers)
I am seeing this exact same issue, except I am using a queue and have disabled auto complete. My instantiation looks like this: ServiceBusProcessorClient processorClient = createServiceBusClientBuilder() .connectionString(connectionString) .processor() .queueName(queueName) .disableAutoComplete() .receiveMode(ServiceBusReceiveMode.PEEK_LOCK) .processMessage(messageProcessor) .processError(errorProcessor) .buildProcessorClient();
Do you have an ETA of when it will be fixed?
@pashashiz I am planning to release a fix for three original scenario reported in this issue in March month. It should come out in next 2 week or before.
@pashashiz I have done some testing for case1, case 2 scenario and confirm that this issue exists. I am working on solution. We want to make sure it is tested thoroughly. I am going to test more with case 3 scenario you explained.
Thank You @pashashiz . I am looking into this and will update you on this.