azure-sdk-for-java: [BUG] ServiceBus SDK constantly fails on complete() action with invalid lock, even for no-op subscribers

Describe the bug

We ported to the new SDK and are running some long-running perf tests to see how it is doing. Our logs are flooded with constant exceptions like the one below when we call message context complete()


com.azure.messaging.servicebus.ServiceBusException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:ae4628ac-df69-4ae3-a0e0-2ad31ced38c1, TrackingId:00955d4d0002000600000943607636a0_G5_B45, SystemTracker:mq4az-gocd-pipeline:Topic:mq4az_perftest_standard|perm_default, Timestamp:2021-04-14T00:32:10, errorContext[NAMESPACE: mq4az-gocd-pipeline.servicebus.windows.net, PATH: **************/subscriptions/perm_default, REFERENCE_ID: mq4az_perftest_standard/subscriptions/perm_default_cbff0a_1618359968251, LINK_CREDIT: 0]
	at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.lambda$updateDisposition$43(ServiceBusReceiverAsyncClient.java:1155)
	at reactor.core.publisher.Mono.lambda$onErrorMap$30(Mono.java:3384)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
	at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1862)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onError(MonoIgnoreThen.java:243)
	at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
	at com.azure.messaging.servicebus.implementation.ServiceBusReactorReceiver.completeWorkItem(ServiceBusReactorReceiver.java:367)
	at com.azure.messaging.servicebus.implementation.ServiceBusReactorReceiver.updateOutcome(ServiceBusReactorReceiver.java:303)
	at com.azure.messaging.servicebus.implementation.ServiceBusReactorReceiver.decodeDelivery(ServiceBusReactorReceiver.java:215)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
	at reactor.core.publisher.SinkManyBestEffort$DirectInner.directEmitNext(SinkManyBestEffort.java:356)
	at reactor.core.publisher.DirectProcessor.tryEmitNext(DirectProcessor.java:233)
	at reactor.core.publisher.DirectProcessor.emitNext(DirectProcessor.java:192)
	at reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:217)
	at reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:618)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:154)
	at com.azure.core.amqp.implementation.handler.ReceiveLinkHandler.onDelivery(ReceiveLinkHandler.java:125)
	at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:185)
	at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
	at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
	at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
	at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:82)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
		at reactor.core.publisher.Mono.block(Mono.java:1703)
		at com.azure.messaging.servicebus.ServiceBusReceivedMessageContext.complete(ServiceBusReceivedMessageContext.java:56)
		at com.pros.commons.mq4az.servicebus.SubscriptionMessageTypeHandler.lambda$accept$2(SubscriptionMessageTypeHandler.java:79)
		at com.pros.commons.mq4az.servicebus.SubscriptionMessageTypeHandler.execSafely(SubscriptionMessageTypeHandler.java:97)
		at com.pros.commons.mq4az.servicebus.SubscriptionMessageTypeHandler.accept(SubscriptionMessageTypeHandler.java:79)
		at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:219)
		at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:196)
		at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
		at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:136)
		at reactor.core.publisher.ParallelMergeSequential$MergeSequentialMain.drainLoop(ParallelMergeSequential.java:278)
		at reactor.core.publisher.ParallelMergeSequential$MergeSequentialMain.onNext(ParallelMergeSequential.java:211)
		at reactor.core.publisher.ParallelMergeSequential$MergeSequentialInner.onNext(ParallelMergeSequential.java:400)
		at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
		at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
		at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
		at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
		... 5 more

We create the processor in a standard way:

this.subscriptionProcessor =
            sharedConnectionBuilder
                .processor()
                .topicName(configuration.getTopic())
                .subscriptionName(sub.getSubscriptionName())
                .prefetchCount(160)
                .maxConcurrentCalls(160)
                .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                .processMessage(subscriptionMessageTypeHandler::accept)
                .processError(new MessageProcessorErrorHandler())
                .disableAutoComplete() // we do manual error handling
                .buildProcessorClient();

Notice there is no option in the configuration to specify message lock. We presume it uses some built-in default we can’t override.

Our actual subscribe processing logic is basically a no-op to verify we got the message and just logs a statement to a log, nothing else. So the messaging processing is basically instant.

And yet our logs are flooded with thousands of such exceptions that the lock is expired.

Setup (please complete the following information):

  • OS: [e.g. iOS] Linux
  • IDE : [e.g. IntelliJ] IntelliJ
  • Version of the Library used: 7.1.0

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 17 (8 by maintainers)

Commits related to this issue

Most upvoted comments

let me see, this test suite is a simple Java class with main(), so logging is just basic console, will try to enable all of that and re-run the test

hi all. I managed to capture the log of a full run of this to reproduce it. Attaching the log

Here’s how the test was run:

  • latest SDK version 7.3.0-beta.1
  • create a processor with prefetch count = 0 and max concurrent calls = 8 (the number of cores)
  • generate 100,000 messages
  • wait for the processor to receive the 100,000 back successfully

It was all slowly chugging along till around 34K messages.

Suddenly it got an invalid lock exception on line 183053 of the attached log and after that that exception is just appearing over and over again, flooding the logs.

Hope this helps. Right now we are holding off with adopting the new SDK as we have not been able to complete a single successful test run with it, so it’s a blocker for us.

log.zip