azure-sdk-for-java: [BUG] ServiceBusReceiverClient client never returning after receiveMessages method call (maxWaitTime is set to 1 sec)
Describe the bug I am using the the latest beta version of SB 7.15.0-beta.4. Using ServiceBusReceiverClient client. create like:
ServiceBusClientBuilder()
.connectionString(sbTopicConnUri)
.receiver()
.topicName(azureConfig.getSbTopicName())
.subscriptionName(azureConfig.getSbTopicSubscription())
.disableAutoComplete()
.prefetchCount(500)
.buildClient();
Using this client I am pulling messages from subscription periodically. For doing so I have a thread pool of 5 threads which internally uses the same receiver client object to receive messages from SB subscription.
While doing so I noticed that as long as the messages are available in subscription the client keep returning them. Once no messages are available the these thread stuck while calling receiveMessages(batch, Duration.ofSeconds(1)). Although the wait time is 1 sec still these threads never returns after the call.
Exception or Stack Trace As such I dont see any stack trace or exception during this.
To Reproduce Create multiple threads sharing the same receiver client. All threads should actively try to read messages using same client at the same time. Once the messages are exhaust on subscription, then threads must return without any message.
Code Snippet Add the code snippet that causes the issue. Creating the receiver client (as above)
AzureSBMsgProcessor[] jobs = { new AzureSBMsgProcessor(barrier, this.messageReceiver),
new **AzureSBMsgProcessor**(barrier, this.messageReceiver),
new AzureSBMsgProcessor(barrier, this.messageReceiver),
new AzureSBMsgProcessor(barrier, this.messageReceiver),
new AzureSBMsgProcessor(barrier, this.messageReceiver)
AzureSBMsgProcessor is a class which pull messages from sb using the receiver client.
while(recordChangeList.size() < batchSize && (maxPullIter-- != 0) && isMsgPresentForDp) {
List<FutureTask<List<RecordChange>>> futureTaskList = new ArrayList<>();
for (AzureSBMsgProcessor job: jobs) {
futureTaskList.add((FutureTask<List<RecordChange>>) service.submit(job));
}
List<RecordChange> tempRecordChangeList = new ArrayList<>();
try {
barrier.await();
futureTaskList.forEach(msgList -> {
try {
tempRecordChangeList.addAll(msgList.get());
} catch (InterruptedException | ExecutionException ex) {
log.error("Error occurred while fetching values from future tasks", ex);
if (ex instanceof InterruptedException)
Thread.currentThread().interrupt();
}
});
} catch (InterruptedException | BrokenBarrierException e) {
log.error("Error occurred while waiting & processing future tasks", e);
if (e instanceof InterruptedException)
Thread.currentThread().interrupt();
}
if (tempRecordChangeList.isEmpty())
isMsgPresentForDp = false;
else
recordChangeList.addAll(tempRecordChangeList);
barrier.reset();
}
Expected behavior It is expected that when no messages are available with the SB subscription then all the thread must immediately return after the maxWaitTime, which is 1 second in my case.
Screenshots If applicable, add screenshots to help explain your problem.
Setup (please complete the following information):
- OS: [e.g. iOS] ubuntu
- IDE: [e.g. IntelliJ] InteliJ
- Library/Libraries: [e.g. com.azure:azure-core:1.16.0 (groupId:artifactId:version)]
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.15.0-beta.4</version>
</dependency>
- Java version: [e.g. 8] java 17
- App Server/Environment: [e.g. Tomcat, WildFly, Azure Function, Apache Spark, Databricks, IDE plugin or anything special]
- Frameworks: [e.g. Spring Boot, Micronaut, Quarkus, etc] springboot 2.7
If you suspect a dependency version mismatch (e.g. you see NoClassDefFoundError, NoSuchMethodError or similar), please check out Troubleshoot dependency version conflict article first. If it doesn’t provide solution for the problem, please provide:
- verbose dependency tree (
mvn dependency:tree -Dverbose) - exception message, full stack trace, and any available logs
Additional context Add any other context about the problem here.
Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
- Bug Description Added
- Repro Steps Added
- Setup information Added
About this issue
- Original URL
- State: open
- Created 8 months ago
- Comments: 16 (10 by maintainers)
Hello @anuchandy,
To answer above question:
When the final version of above beta will be released, any tentative time lines? Also whether it will also address the thread blocking issue (in case of concurrent threads using same receiver client) ?
Some updates: With my further testing I am seeing better results now with this implementation approach. For now we seems to be good with this. Just need a small confirmation from your side once the final version of this beta is out along with the fix (i reported originally with this issue). I am able to see 100,000 messages processing within 22 mins (with the thread pool config I am using). Which seems to be fine to us for now.
Also, during my testing I observed the below messages (from sdk) getting logged intermittently. Just wanted to know if these are just info msgs or if there is anything concerning in my implementation:
Please look at them let confirm it to me once. Otherwise I am good with this issue once we get the fixed version of this lib.
One more query on the side notes: Is there any way I can mock
this.messageReceiver.receiveMessages(batch, Duration.ofMillis(500));method to return anIterableStream<ServiceBusReceivedMessage>. I need it for writing junit for mu receiver class (in case of this sync receiver clinet).During mock on receiverMessages method I want to return an iterable stream obj of ServiceBusReceivedMessage. Where the iterable list contains some dummy messages. Unfortunately, I am unable to create obj of ServiceBusReceivedMessage class as its final and constructors are defined with default access (hence can not init it). I request if you can provide a way or reference code to mock or write junit for this receiver method so it able to return an iterable obj having some actual dummy msgs.
Thanks.
Hello, @dakshme, good to hear that the initial testing is positive.
The reason for no or less occurrence of error on complete call is likely is due to disabling the “Explicit Prefetch” (while building client), which reduces the chances of buffering there by expiration of message lock while in buffer. Also, beware of the “Implicit Prefetch” we discussed earlier.
We both seems to observe the same measurement in “One Receiver Client per One Thread model”. 😃
Two questions –
V2 stack is the new upcoming underlying engine of messaging libraries. We addressed many known issues as part of V2 stack. Some of the main improvements are – removal of locks in critical paths, made the AMQP-Credit calculation more stable, optimized internal buffering that were causing complete() to timeout, addressed out of order message delivery in processor clients, optimized use of threads (so less thread hoping, context switch), and there are more, improving overall stability.
We call the underlying engine of currently GA-ed versions V1 stack. The V2 stack today exists in the beta releases (e.g. 7.15.0-beta.4), but it exists side by side with V1 stack. The library allows opt-in or opt-out V1 or V2 Stack.
For example, in latest beta, the ProcessorClient (session unaware one) by default uses V2 stack and it can be opted-out to use V1 stack. Similarly, the pull based synchronous client by default is on V1 stack and can be opted-in to use V2 stack. The configuration you saw in the test program is for opting-in the synchronous client to use V2 stack. The plan is to eventually phase out the V1 stack, so all clients run on V2. Hope this clarifies.
Hello @dakshme, I was trying to find an official documentation about Standard tier and came across this.
My understanding from the above documentation about Standard tier is –
My understanding here is - let’s say, application received 1000 messages in 50ms since the start of the current second i, consuming all credits, then any receive attempt made in the remaining 50ms of the second i will be throttled. Once that second i elapses, and next second i+1 starts with 1000 credit.
I don’t see the credit cost of complete|abandon in the doc or it’s part of the receive cost. Any other operations (e.g. Send) performed consumes its own credit. Different operations have different credit costs.
Hello @dakshme,
I’ve got to prepare and run more tests. Once it passes and proves change can be shipped, I’ll update plan around ship date and version of SDK.
Thanks for sharing the additional context around the reason to try the split approach.
The rate at which messages can be pulled and completed also depends on the tier, the network condition and how close the consumer and broker are. After completing the previously mentioned tests, I’ll try set up an environment to get a general idea.
Expect this to take 1-2 weeks.
I agree with your thoughts about using separate Receiver Client instance per Thread. With this approach you should be able to perform concurrent pulling given message arrival will no longer be serialized via a single channel (Each Receiver Client uses separate channel) and assuming egress does not throttle the service.
I wish we could use the ProcessorClient which has out-of-the-box support for threading but given there is a no-push-model application constrain, we cannot use ProcessorClient. The synchronous Receiver is the only client type for pull-model.