spring-cloud-stream: Cannot handle errors from a binding with destination defined as a list of kafka topics
I have a SCS application that listens on messages from 18 kafka topics, say topic1, topic2, … topic18. Messages from these topics are consumed using same logic, so I want to use same SCS binding for them. This is how my binding definition looks in application.yml:
spring.cloud.stream:
bindings:
input:
destination: topic1, topic2, topic3, ... topic18
consumer:
maxAttempts: 1
group: myGroup
I have defined a stream listener this way:
@EnableBinding(Sink.class)
class KafkaMessageHandler {
@StreamListener(Sink.INPUT)
public void handleMessage(Message<?> message) throws MessageHandlingException {
doTheLogic(message);
}
}
This setup works I wanted to handle processing errors like this:
@Component
class KafkaErrorMessageHandler {
@ServiceActivator(inputChannel = "input.myGroup.errors")
public void handleError(MessagingException messageException) {
processErrorMessage(messageException);
}
}
But it does not catch any exceptions !!
To catch exceptions in my processing logic I have to define a method listening to a dedicated error channel for every single physical destination my application listens to (for every topic). As I have many topics that are processed the same way, this is very poor behaviour in terms of maintainability (my list of topics changes often) So I have to do exception handling like this:
@Component
class KafkaErrorMessageHandler {
@ServiceActivator(inputChannel = "topic1.myGroup.errors")
public void handleErrorFromTopic1(MessagingException messageException) {
processErrorMessage(messageException);
}
@ServiceActivator(inputChannel = "topic2.myGroup.errors")
public void handleErrorFromTopic2(MessagingException messageException) {
processErrorMessage(messageException);
}
@ServiceActivator(inputChannel = "topic3.myGroup.errors")
public void handleErrorFromTopic3(MessagingException messageException) {
processErrorMessage(messageException);
}
.......... // method for each topic
@ServiceActivator(inputChannel = "topic18.myGroup.errors")
public void handleErrorFromTopic18(MessagingException messageException) {
processErrorMessage(messageException);
}
}
I know there is an option of using a commom channel “errorChannel”, but in my application I have also other bindings defined, and I would like to have a possibility of defining a error handling logic per binder in a concise way
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 16 (9 by maintainers)
@olegz That’s the wrong commit it’s this one the property is called
multiplex
.