azure-sdk-for-java: [BUG] EventHub Consumer stops consuming messages until we restart

Describe the bug EventHub consumer stops consuming messages until we restart the consumer. Have deployed my consumer in Azure Kubernetes (AKS). Initially, it consumes a few messages, all of a sudden it stops consuming. If we restart the consumer, it works like charm. Until we restart it, all the messages sit in EH. Even it is 2-3 days, none of the messages is consumed.

Exception or Stack Trace This is the consumer log just captured before restarting: consumerlogmessages.txt Picked up this stack trace from log:

2020-12-03 14:56:02.126 [ERROR] [Loggers$Slf4JLogger:319] Scheduler worker in group main failed with an uncaught exception java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 60000ms in 'takeUntil' (and no fallback has been configured)
	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:289)
	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:274)
	at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:396)

To Reproduce This has happened very often.

Code Snippet

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.roo.connect.soa.config.IEventHubReceiverConfig;
import com.roo.connect.soa.log.ServiceLogger;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class IEventHubReceiver implements Runnable {

    private EventProcessorClient eventProcessorClient;
    private String consumerGroupName="";
    private static final String IEVENTHUBRECIEVER = "IEventHubReceiver : ";

    @Autowired
    private ServiceLogger log;

    public IEventHubReceiver(IEventHubReceiverConfig config){

        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
                .connectionString(config.getStorageConnectionString())
                .containerName(config.getStorageContainerName())
                .buildAsyncClient();

        eventProcessorClient = new EventProcessorClientBuilder()
                .consumerGroup(config.getConsumerGroup())
                .connectionString(config.getConnectionString())
                .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
                .processEvent(eventContext -> {
                    EventData eventData = eventContext.getEventData();
                    String receivedData = eventData.getBodyAsString();
                    consumerGroupName = config.getConsumerGroup();
                    log.info("IEventHubReceiver - received from : " + consumerGroupName + " : receivedData : " + receivedData);
                    onEvents(eventData);
                    if(isAlive()) {
                        //if onEvent is success, context updates the 'checkpoint'
                        eventContext.updateCheckpoint();
                    }
                })
                .processError(errorContext -> {
                    log.error(IEVENTHUBRECIEVER + consumerGroupName + " - Error occurred while processing events :: " + errorContext.getThrowable().getMessage());
                })
                .buildEventProcessorClient();
    }

    //This method is implemented in child classes with the logic to process the eventData
    public abstract void onEvents(EventData eventData);

    public void stop() {

        eventProcessorClient.stop();
        log.info(IEVENTHUBRECIEVER + consumerGroupName + " - Client stopped...");
    }

    public boolean isAlive() {

        boolean isAlive = eventProcessorClient.isRunning();
        log.info(IEVENTHUBRECIEVER + consumerGroupName + " - isAlive:: " + isAlive);
        return isAlive;
    }

    @Override
    public void run() {

        // This will start the processor. It will start processing events from all partitions.
        eventProcessorClient.start();
        log.info(IEVENTHUBRECIEVER + consumerGroupName + " - Client started...");
    }
}

Expected behavior I could see a few connection exception errors in logs, but I am wondering why it stopped consuming messages forever. Immediately after restarting messages started consuming.

Setup (please complete the following information):

  • OS: Ubuntu (AKS)
  • IDE : IntelliJ
  • Version of the Library used
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
      <version>1.3.1</version>
    </dependency>

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 24 (8 by maintainers)

Commits related to this issue

Most upvoted comments

@jyyy-57

java.lang.NoClassDefFoundError: reactor/core/publisher/Sinks

Usually NoClassDefFoundError is because there is a dependency mismatch. Spring Boot brings in reactor-core and so does our client library. You can understand what the dependency conflicts via mvn dependency:tree -Dverbose. Our Spring starters consume the Event Hubs library underneath the covers, and we try to ship on the same cadence.

/cc @saragluna may have more insights for when they will update with 5.7.0

We released 5.7.0 this morning. This should recover the consumer after closing when using the EventProcessorClient.

https://repo1.maven.org/maven2/com/azure/azure-messaging-eventhubs/5.7.0/ https://repo1.maven.org/maven2/com/azure/azure-messaging-eventhubs-checkpointstore-blob/1.6.0/

Cheers, Connie

Hey @jyyy-57 , the PR associated with this issue, https://github.com/Azure/azure-sdk-for-java/pull/19924, we are currently stress testing for the next three days. We plan to merge it on Monday if all goes well.

Hey @conniey, could you please share updates on this issue? I believe we’re facing a similar issue. We’re using BlobContainerAsyncClient and EventProcessorClientBuilder. We have iot hub built-in endpoint. We need to keep eventProcessorClient open to receive the message in the built-in endpoint., but we notice that load balancing is done regularly, but until I restart the service it is not picking up new events. But it’s hard to reproduce and debug, as it is intermittent and usually happens 10-14 days after we restart the service/client. Also, the log always shows “Load balancing completed successfully”, even when it cannot pick any new messages. the version:

                <dependency>
			<groupId>com.azure</groupId>
			<artifactId>azure-messaging-eventhubs</artifactId>
			<version>5.1.1</version>
		</dependency>
		<dependency>
			<groupId>com.azure</groupId>
			<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
			<version>1.1.1</version>
		</dependency>

@jyyy-57, we are using spring boot 2.3.7.RELEASE and we faced issues only with reactor-core and not reactor-netty. As @conniey mentioned, this has to be worked out at the application end to sort of jar conflicts I guess.

Hi @gandhirajan, thank you so much!! Your method works! I can start the application now. Reactor-core might be the key to issues. However, I got another exception:java.lang.NoSuchMethodError: reactor.core.publisher.Mono.retry(Ljava/util/function/Predicate;)Lreactor/core/publisher/Mono; So I tried to remove the ‘exclusions’ from the 5.7.0 dependency and add these to dependencies. It works fine for me, but not sure if it is a solution. @conniey could you please confirm? Thanks!

      <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.4.5</version>
        </dependency>
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty</artifactId>
            <version>0.9.15.RELEASE</version>
        </dependency>

@jyyy-57 Hi, We had to exclude the ‘reactor-core’ dependency from eventhub dependency and included ‘reactor-core 3.4.3’ to workaround this issue. Changes as follows:

  <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs</artifactId>
      <version>5.7.0</version>
      <exclusions>
          <exclusion>
              <artifactId>reactor-core</artifactId>
              <groupId>io.projectreactor</groupId>
          </exclusion>
      </exclusions>
  </dependency>
  
  <!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
  <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-core</artifactId>
      <version>3.4.5</version>
  </dependency>

@conniey, hope this will be taken care at SDK end.