spring-cloud-stream: Cannot handle errors from a binding with destination defined as a list of kafka topics

I have a SCS application that listens on messages from 18 kafka topics, say topic1, topic2, … topic18. Messages from these topics are consumed using same logic, so I want to use same SCS binding for them. This is how my binding definition looks in application.yml:

spring.cloud.stream:
  bindings:
    input:
      destination: topic1, topic2, topic3, ... topic18
      consumer:
        maxAttempts: 1
      group: myGroup

I have defined a stream listener this way:

@EnableBinding(Sink.class)
class KafkaMessageHandler {
  @StreamListener(Sink.INPUT)
  public void handleMessage(Message<?> message) throws MessageHandlingException {
    doTheLogic(message);
  }
}

This setup works I wanted to handle processing errors like this:

@Component
class KafkaErrorMessageHandler {
  @ServiceActivator(inputChannel = "input.myGroup.errors")
  public void handleError(MessagingException messageException) {
    processErrorMessage(messageException);
  }
}

But it does not catch any exceptions !!

To catch exceptions in my processing logic I have to define a method listening to a dedicated error channel for every single physical destination my application listens to (for every topic). As I have many topics that are processed the same way, this is very poor behaviour in terms of maintainability (my list of topics changes often) So I have to do exception handling like this:

@Component
class KafkaErrorMessageHandler {
  @ServiceActivator(inputChannel = "topic1.myGroup.errors")
  public void handleErrorFromTopic1(MessagingException messageException) {
    processErrorMessage(messageException);
  }
  @ServiceActivator(inputChannel = "topic2.myGroup.errors")
  public void handleErrorFromTopic2(MessagingException messageException) {
    processErrorMessage(messageException);
  }
  @ServiceActivator(inputChannel = "topic3.myGroup.errors")
  public void handleErrorFromTopic3(MessagingException messageException) {
    processErrorMessage(messageException);
  }
  .......... // method for each topic
  @ServiceActivator(inputChannel = "topic18.myGroup.errors")
  public void handleErrorFromTopic18(MessagingException messageException) {
    processErrorMessage(messageException);
  }
}

I know there is an option of using a commom channel “errorChannel”, but in my application I have also other bindings defined, and I would like to have a possibility of defining a error handling logic per binder in a concise way

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 16 (9 by maintainers)

Most upvoted comments

@olegz That’s the wrong commit it’s this one the property is called multiplex.