azure-sdk-for-java: ReactiveException while executing updateCheckpoint

Describe the bug When updateCheckpoint() is executed in the same time frame of eventProcessorClient.stop(). Threads which are polling messages from partition and doing updateCheckpoint hits “ReactiveException: java.lang.InterruptedException”

Exception or Stack Trace

reactor.core.Exceptions$ReactiveException: java.lang.InterruptedException
	at reactor.core.Exceptions.propagate(Exceptions.java:392)
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:91)
	at reactor.core.publisher.Mono.block(Mono.java:1706)
	at com.azure.messaging.eventhubs.models.EventContext.updateCheckpoint(EventContext.java:101)
	at com.alight.upoint.azure.consumer.service.AzureConsumerService.processEvent(AzureConsumerService.java:427)
	at com.azure.messaging.eventhubs.EventProcessorClientBuilder$1.processEvent(EventProcessorClientBuilder.java:595)
	at com.azure.messaging.eventhubs.PartitionPumpManager.processEvent(PartitionPumpManager.java:274)
	at com.azure.messaging.eventhubs.PartitionPumpManager.processEvents(PartitionPumpManager.java:318)
	at com.azure.messaging.eventhubs.PartitionPumpManager.lambda$startPartitionPump$2(PartitionPumpManager.java:235)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
	... 18 more

To Reproduce Steps to reproduce the behavior: Put consumer under heavy load when eventProcessorClient.stop() is about to be executed.

Code Snippet Let us know if that’s needed, will share the abridged code removing business logic.

Expected behavior updateCheckpoint() should execute successfully without interruption.

Screenshots None

Setup (please complete the following information):

OS: linux

IDE: STS

Library/Libraries: azure-client-sdk-parent:1.7.0 azure-messaging-eventhubs:5.10.1 azure-storage-blob:12.14.0 azure-identity:1.4.0 azure-messaging-eventhubs-checkpointstore-blob:1.10.0 azure-core-http-netty:1.11.1 reactor-core:3.4.11

Java version: 8

App Server/Environment: Docker

Frameworks: Spring Boot

** Other available logs **

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

Bug Description Added Repro Steps Added Setup information Added

About this issue

  • Original URL
  • State: open
  • Created 2 years ago
  • Comments: 15 (10 by maintainers)

Most upvoted comments

@ZejiaJiang Yesterday I saw questions on the checkpoint store. Answered them in bold text below in case you need it.

  1. Are there several clients use the same checkpoint store? There is only one client.
  2. What kind of checkpoint that the client is using? Blob Storage. If the checkpoint store is customized, could you please provide the code? It’s not customized.