azure-sdk-for-java: [BUG] Messages aren't picked from the Service Bus Subscription after 11 days of deployment
Describe the bug
- Observed a strange scenario in which the messages aren’t picked from the subscription after 11 days of deployment
- All the pods are running fine but the messages aren’t picked and processed by the webjob
- The Deployment was done on April 11th and a total of 3873 messages were processed till April 23rd
- Post that none of the messages are picked by the Azure Service bus client which sounds very strange
- Due to the inconsistent behaviour, we are doubtful whether the
azure-messaging-servicebus7.1.0 is production ready?😟
Exception or Stack Trace No exceptions were found in AppInsights logging. Hence we are attaching the Kubernetes pod log of the webjobs
INFO [2021-04-27 14:23:06,076] org.mongodb.driver.cluster: Discovered replica set primary cdb-ms-prod-eastus1-fd11.documents.azure.com:10255
INFO [2021-04-27 14:23:06,076] org.mongodb.driver.cluster: Discovered replica set primary cdb-ms-prod-eastus1-fd11.documents.azure.com:10255
INFO [2021-04-27 14:23:06,078] org.mongodb.driver.cluster: Monitor thread successfully connected to server with description ServerDescription{address=cdb-ms-prod-eastus1-fd11.documents.azure.co
m:10255, type=REPLICA_SET_PRIMARY, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 0]}, minWireVersion=0, maxWireVersion=2, maxDocumentSize=16777216, logicalSessionTimeoutMinutes
=null, roundTripTimeNanos=29987235, setName='globaldb', canonicalAddress=cdb-ms-prod-eastus1-fd11.documents.azure.com:10255, hosts=[cdb-ms-prod-eastus1-fd11.documents.azure.com:10255], passiv
es=[], arbiters=[], primary='cdb-ms-prod-eastus1-fd11.documents.azure.com:10255', tagSet=TagSet{[]}, electionId=null, setVersion=1, lastWriteDate=null, lastUpdateTimeNanos=7839726471962}
INFO [2021-04-27 14:23:06,079] org.mongodb.driver.cluster: Setting max set version to 1 from replica set primary cdb-ms-prod-eastus1-fd11.documents.azure.com:10255
INFO [2021-04-27 14:23:06,079] org.mongodb.driver.cluster: Discovered replica set primary cdb-ms-prod-eastus1-fd11.documents.azure.com:10255
INFO [2021-04-27 14:23:06,086] org.mongodb.driver.connection: Opened connection [connectionId{localValue:12, serverValue:-1026845694}] to cdb-ms-prod-eastus1-fd11.documents.azure.com:10255
INFO [2021-04-27 14:23:06,101] org.mongodb.driver.cluster: Monitor thread successfully connected to server with description ServerDescription{address=cdb-ms-prod-eastus1-fd11.documents.azure.co
m:10255, type=REPLICA_SET_PRIMARY, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 0]}, minWireVersion=0, maxWireVersion=2, maxDocumentSize=16777216, logicalSessionTimeoutMinutes
=null, roundTripTimeNanos=14151911, setName='globaldb', canonicalAddress=cdb-ms-prod-eastus1-fd11.documents.azure.com:10255, hosts=[cdb-ms-prod-eastus1-fd11.documents.azure.com:10255], passiv
es=[], arbiters=[], primary='cdb-ms-prod-eastus1-fd11.documents.azure.com:10255', tagSet=TagSet{[]}, electionId=null, setVersion=1, lastWriteDate=null, lastUpdateTimeNanos=7839748883738}
INFO [2021-04-27 14:23:06,101] org.mongodb.driver.cluster: Setting max set version to 1 from replica set primary cdb-ms-prod-eastus1-fd11.documents.azure.com:10255
INFO [2021-04-27 14:23:06,101] org.mongodb.driver.cluster: Discovered replica set primary cdb-ms-prod-eastus1-fd11.documents.azure.com:10255
INFO [2021-04-27 14:23:06,133] org.mongodb.driver.cluster: Cluster created with settings {hosts=[nsnonprod.documents.azure.com:10255], mode=MULTIPLE, requiredClusterType=REPLICA_SET, serverSelecti
onTimeout='60000 ms', maxWaitQueueSize=3500, requiredReplicaSetName='globaldb'}
INFO [2021-04-27 14:23:06,133] org.mongodb.driver.cluster: Adding discovered server nsnonprod.documents.azure.com:10255 to client view of cluster
INFO [2021-04-27 14:23:06,161] org.mongodb.driver.connection: Opened connection [connectionId{localValue:13, serverValue:-937342664}] to nsnonprod.documents.azure.com:10255
INFO [2021-04-27 14:23:06,163] org.mongodb.driver.cluster: Monitor thread successfully connected to server with description ServerDescription{address=nsnonprod.documents.azure.com:10255, type=REPL
ICA_SET_PRIMARY, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 0]}, minWireVersion=0, maxWireVersion=2, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=null, roundTripTi
meNanos=1741414, setName='globaldb', canonicalAddress=cdb-ms-prod-eastus1-fd11.documents.azure.com:10255, hosts=[cdb-ms-prod-eastus1-fd11.documents.azure.com:10255], passives=[], arbiters=[],
primary='cdb-ms-prod-eastus1-fd11.documents.azure.com:10255', tagSet=TagSet{[]}, electionId=null, setVersion=1, lastWriteDate=null, lastUpdateTimeNanos=7839811648130}
INFO [2021-04-27 14:23:06,168] org.mongodb.driver.cluster: Adding discovered server cdb-ms-prod-eastus1-fd11.documents.azure.com:10255 to client view of cluster
INFO [2021-04-27 14:23:06,190] org.mongodb.driver.cluster: Server nsnonprod.documents.azure.com:10255 is no longer a member of the replica set. Removing from client view of cluster.
Could not get a resource from the pool
INFO [2021-04-27 14:23:06,240] org.mongodb.driver.connection: Opened connection [connectionId{localValue:14, serverValue:-1291490047}] to nsnonprod.documents.azure.com:10255
INFO [2021-04-27 14:23:06,249] org.mongodb.driver.cluster: Monitor thread successfully connected to server with description ServerDescription{address=nsnonprod.documents.azure.com:10255, type=REPL
ICA_SET_PRIMARY, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 0]}, minWireVersion=0, maxWireVersion=2, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=null, roundTripTi
meNanos=8826069, setName='globaldb', canonicalAddress=cdb-ms-prod-eastus1-fd11.documents.azure.com:10255, hosts=[cdb-ms-prod-eastus1-fd11.documents.azure.com:10255], passives=[], arbiters=[],
primary='cdb-ms-prod-eastus1-fd11.documents.azure.com:10255', tagSet=TagSet{[]}, electionId=null, setVersion=1, lastWriteDate=null, lastUpdateTimeNanos=7839897806105}
INFO [2021-04-27 14:23:06,250] org.mongodb.driver.cluster: Adding discovered server cdb-ms-prod-eastus1-fd11.documents.azure.com:10255 to client view of cluster
INFO [2021-04-27 14:23:06,252] org.mongodb.driver.cluster: Server nsnonprod.documents.azure.com:10255 is no longer a member of the replica set. Removing from client view of cluster.
mkdir: cannot create directory ‘tmp’: File exists
14:23:06,928 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml]
14:23:06,928 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy]
14:23:06,928 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [jar:file:/usr/local/extraction/extraction.jar!/logback.xml]
14:23:06,955 |-INFO in ch.qos.logback.core.joran.spi.ConfigurationWatchList@233c0b17 - URL [jar:file:/usr/local/extraction/extraction.jar!/logback.xml] is not of type file
14:23:07,239 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
14:23:07,250 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT]
14:23:07,261 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
14:23:07,312 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.rolling.RollingFileAppender]
14:23:07,317 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [SIZE_AND_TIME_BASED_FILE]
14:23:07,360 |-INFO in c.q.l.core.rolling.TimeBasedRollingPolicy@787867107 - No compression will be used
14:23:07,362 |-INFO in c.q.l.core.rolling.TimeBasedRollingPolicy@787867107 - Will use the pattern logs/ocr.%d{yyyy-MM-dd-HH}.%i.log for the active file
14:23:07,365 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@58651fd0 - The date pattern is 'yyyy-MM-dd-HH' from file name pattern 'logs/ocr.%d{yyyy-MM-dd-HH}.%i.log'.
14:23:07,365 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@58651fd0 - Roll-over at the top of every hour.
14:23:07,373 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@58651fd0 - Setting initial period to Tue Apr 27 12:14:27 GMT 2021
14:23:07,374 |-WARN in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@58651fd0 - SizeAndTimeBasedFNATP is deprecated. Use SizeAndTimeBasedRollingPolicy instead
14:23:07,374 |-WARN in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@58651fd0 - For more information see http://logback.qos.ch/manual/appenders.html#SizeAndTimeBasedRollingPolicy
14:23:07,380 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
14:23:07,385 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[SIZE_AND_TIME_BASED_FILE] - Active log file name: logs/ocr.log
14:23:07,385 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[SIZE_AND_TIME_BASED_FILE] - File property is set to [logs/ocr.log]
14:23:07,406 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO
14:23:07,407 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT]
14:23:07,407 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
14:23:07,408 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@5c7fa833 - Registering current configuration as safe fallback point
INFO [2021-04-27 14:23:09,047] org.eclipse.jetty.util.log: Logging initialized @2402ms to org.eclipse.jetty.util.log.Slf4jLog
INFO [2021-04-27 14:23:09,186] io.dropwizard.server.DefaultServerFactory: Registering jersey handler with root path prefix: /
INFO [2021-04-27 14:23:09,199] io.dropwizard.server.DefaultServerFactory: Registering admin handler with root path prefix: /admin
INFO [2021-04-27 14:23:09,203] io.dropwizard.assets.AssetsBundle: Registering AssetBundle with name: swagger-assets for path /swagger-static/*
INFO [2021-04-27 14:23:09,261] io.dropwizard.assets.AssetsBundle: Registering AssetBundle with name: swagger-oauth2-connect for path /o2c.html/*
To Reproduce Steps to reproduce the behavior:
- Try sending multiple messages to the Subscription in interval of 3 hours
- This needs to be tested after n days of deployment(say in intervals of 10 days)
- Azure Service Bus client will stop picking messages after n days of deployment from the subscription
Code Snippet
public class WebJobListener {
private ServiceBusReceiverAsyncClient serviceBusReceiverAsyncClient;
private Gson gson;
private ILogger log;
private static final Type requestModelType = new TypeToken<RequestModel>() {}.getType();
@Inject
public WebJobListener(ILogger log) {
this.serviceBusReceiverAsyncClient = new ServiceBusClientBuilder()
.connectionString(System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING"))
.retryOptions(new AmqpRetryOptions().setTryTimeout(Duration.ofSeconds(30)))
.receiver()
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.topicName("MY_TOPIC")
.subscriptionName("MY_SUBSCRIPTION")
.prefetchCount(0) // Turn of prefetch messages
.maxAutoLockRenewDuration(Duration.ofMinutes(10))
.disableAutoComplete()
.buildAsyncClient();
this.jobManager = jobManager;
this.captureJob = new CaptureJob();
this.gson = new Gson();
this.log = log;
}
public void run() {
log.trace("Webjob worker starting to Listen", LogLevels.Verbose, null);
serviceBusReceiverAsyncClient.receiveMessages().flatMap(message -> {
// Process message. If an exception is thrown from this consumer, the message is abandoned.
// Otherwise, it is completed.
if(message.getSubject().contentEquals("MY_SUBSCRIPTION")) {
RequestModel requestModel = gson.fromJson(message.getBody().toString(), requestModelType);
// My logic goes here
//my_test_function(requestModel);
// Complete the message after function completes
return serviceBusReceiverAsyncClient.complete(message);
}
},
error -> log.trace("Error occurred while receiving message from subscription: " + error , LogLevels.Error, null));
}
}
Expected behavior Azure Service Bus client should pick up messages after n days of deployment from the subscription
Screenshots NA
Setup (please complete the following information):
- OS: Ubuntu 18.04 LTS
- IDE : IntelliJ
- Version of the Library used: azure-messaging-servicebus library 7.1.0 / Java 1.8
Additional context
Already raised a similar issue which got resolved by upgrading the azure-messaging-servicebus from 7.0.1 to 7.0.2
https://github.com/Azure/azure-sdk-for-java/issues/19254
Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
- Bug Description Added
- Repro Steps Added
- Setup information Added
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 33 (17 by maintainers)
@sudharsan2020 Thanks for the update. We’re targetting another release update next week, which includes a couple of reliability fixes. I just want to let you know so that you can plan to pick it for future deployments.
@anuchandy We haven’t faced any connectivity exception with the Service Bus Client over the past 4 weeks and the connection looks stable. Thanks for your patience and support in resolving the issue
The new bits are released: https://search.maven.org/artifact/com.azure/azure-messaging-servicebus/7.4.0/jar .
@sudharsan2020 Awesome that you can upgrade to 7.2.2 (it’s even better if you can use the latest version 7.2.3 since we just released). Could you give me the new POM file again? After the version upgrade, I’ve some questions / suggestions for your code:
ServiceBusProcessorClientwhich is a higher level API than the receiver client (ServiceBusReceiverAsyncClientandServiceBusReceiverClient). It indefinitely retry to connect in case of errors. I’m not sure if it works for your case but for your consideration.retryOptions(new AmqpRetryOptions().setTryTimeout(Duration.ofSeconds(30))). Today I found a bug that may cause the complete message to be stuck if tryTimeOut is long and there is a network error. It will be fixed soon. But for now please use a shorter time. b. Also for the retryOptions. You better set a larger maxRetries just in case the network is down. By default it’s 3 times. So if the network to the SB service is down for a few minutes, the client will stop retry and receiving will stop. c.return serviceBusReceiverAsyncClient.complete(message)may throw exception when the network is down, or the lock on the message expires. You may use.onErrorResumeand return aMono.emtpy()so the stream doesn’t error out. The message will be returned to the queue again and be received again.One more thing is, could you configure your logging policy to set debug level for
com.azure.messagingandcom.azure.core.amqp. If the problem happens again, I’ll have logs to look into.I have tests running for 7.2.3. Haven’t seen receiving stop so far. Will wait for another few days. I’ll also look into the code to see if there is any potential problems.
Hi @sudharsan2020 I don’t recommend a beta version for production use. It’s usually to preview new features. Could you reach us via the support system and let’s have a meeting to see what we can do to fix the pom.xml file for SB 7.2.2 or 7.2.3.
Hi @sudharsan2020 I do find a bug with receiveMessages(), which doesn’t reconnect after an network issue. I’m working on a fix. I’ll update you once I fix it.