azure-sdk-for-java: ReactiveException while executing updateCheckpoint
Describe the bug When updateCheckpoint() is executed in the same time frame of eventProcessorClient.stop(). Threads which are polling messages from partition and doing updateCheckpoint hits “ReactiveException: java.lang.InterruptedException”
Exception or Stack Trace
reactor.core.Exceptions$ReactiveException: java.lang.InterruptedException
at reactor.core.Exceptions.propagate(Exceptions.java:392)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:91)
at reactor.core.publisher.Mono.block(Mono.java:1706)
at com.azure.messaging.eventhubs.models.EventContext.updateCheckpoint(EventContext.java:101)
at com.alight.upoint.azure.consumer.service.AzureConsumerService.processEvent(AzureConsumerService.java:427)
at com.azure.messaging.eventhubs.EventProcessorClientBuilder$1.processEvent(EventProcessorClientBuilder.java:595)
at com.azure.messaging.eventhubs.PartitionPumpManager.processEvent(PartitionPumpManager.java:274)
at com.azure.messaging.eventhubs.PartitionPumpManager.processEvents(PartitionPumpManager.java:318)
at com.azure.messaging.eventhubs.PartitionPumpManager.lambda$startPartitionPump$2(PartitionPumpManager.java:235)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
... 18 more
To Reproduce Steps to reproduce the behavior: Put consumer under heavy load when eventProcessorClient.stop() is about to be executed.
Code Snippet Let us know if that’s needed, will share the abridged code removing business logic.
Expected behavior updateCheckpoint() should execute successfully without interruption.
Screenshots None
Setup (please complete the following information):
OS: linux
IDE: STS
Library/Libraries: azure-client-sdk-parent:1.7.0 azure-messaging-eventhubs:5.10.1 azure-storage-blob:12.14.0 azure-identity:1.4.0 azure-messaging-eventhubs-checkpointstore-blob:1.10.0 azure-core-http-netty:1.11.1 reactor-core:3.4.11
Java version: 8
App Server/Environment: Docker
Frameworks: Spring Boot
** Other available logs **
Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
Bug Description Added Repro Steps Added Setup information Added
About this issue
- Original URL
- State: open
- Created 2 years ago
- Comments: 15 (10 by maintainers)
@ZejiaJiang Yesterday I saw questions on the checkpoint store. Answered them in bold text below in case you need it.