spring-cloud-stream-binder-kafka: Using DeadLetterPublishingRecoverer with DefaultAfterRollbackProcessor in transactional StreamListener does not commit offset
When using DeadLetterPublishingRecoverer with DefaultAfterRollbackProcessor the offset is not comitted even though the record is successfully published to the DLT.
package dk.digst.digital.post.kafka.retry;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
@SpringBootApplication(proxyBeanMethods = false)
@EnableTransactionManagement
@EnableKafka
@EnableRetry
@EnableBinding(Sink.class)
public class KafkaStreamsRetryApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsRetryApplication.class, args);
}
@Bean
public ProducerFactory<byte[], byte[]> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<byte[], byte[]> factory = new DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix("txPrefix.");
return factory;
}
@Bean
public ProducerFactory<byte[], byte[]> retryProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<byte[], byte[]> factory = new DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix("txRetryRecoverer.");
return factory;
}
@Bean
public KafkaTemplate<byte[], byte[]> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTemplate<byte[], byte[]> retryKafkaTemplate() {
return new KafkaTemplate<>(retryProducerFactory());
}
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> listenerContainerCustomizer(
KafkaTransactionManager<?, ?> kafkaTransactionManager,
KafkaOperations<byte[], byte[]> retryKafkaTemplate) {
return (container, dest, group) -> {
SetupHelper.setContainerProps(container.getContainerProperties(), kafkaTransactionManager);
container
.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(retryKafkaTemplate,
(cr, e) -> new TopicPartition(TopicNames.RETRY_1, -1)),
new FixedBackOff(100L, 1L)));
};
}
@Bean
public ConsumerFactory<byte[], byte[]> retry1ConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
SetupHelper.createConsumerFactoryBaseProps(GroupNames.RETRY_1));
}
@Bean
public ConsumerFactory<byte[], byte[]> retry2ConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
SetupHelper.createConsumerFactoryBaseProps(GroupNames.RETRY_2));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> retry1KafkaListenerContainerFactory(
KafkaTransactionManager<byte[], byte[]> kafkaTransactionManager,
ConsumerFactory<byte[], byte[]> retry1ConsumerFactory,
KafkaOperations<byte[], byte[]> retryKafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
SetupHelper.configureFactory(factory, kafkaTransactionManager, retry1ConsumerFactory,
retryKafkaTemplate, TopicNames.RETRY_2, new FixedBackOff(100L, 1L));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> retry2KafkaListenerContainerFactory(
KafkaTransactionManager<byte[], byte[]> kafkaTransactionManager,
ConsumerFactory<byte[], byte[]> retry2ConsumerFactory,
KafkaOperations<byte[], byte[]> retryKafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
SetupHelper.configureFactory(factory, kafkaTransactionManager, retry2ConsumerFactory,
retryKafkaTemplate, TopicNames.DLQ, new FixedBackOff(100L, 1L));
return factory;
}
public static class SetupHelper {
public static <K, V> void setContainerProps(ContainerProperties containerProperties,
KafkaTransactionManager<K, V> kafkaTransactionManager) {
containerProperties.setIdleEventInterval(60000L);
containerProperties.setTransactionManager(kafkaTransactionManager);
containerProperties.setAckMode(AckMode.RECORD);
containerProperties.setDeliveryAttemptHeader(true);
}
public static <K, V> void configureFactory(
ConcurrentKafkaListenerContainerFactory<K, V> factory,
KafkaTransactionManager<K, V> kafkaTransactionManager,
ConsumerFactory<K, V> consumerFactory, KafkaOperations<K, V> kafkaTemplate,
String errorTopicName, BackOff backOff) {
setContainerProps(factory.getContainerProperties(), kafkaTransactionManager);
factory.setConsumerFactory(consumerFactory);
DefaultAfterRollbackProcessor<K, V> afterRollbackProcessor =
new DefaultAfterRollbackProcessor<>(new DeadLetterPublishingRecoverer(kafkaTemplate,
(cr, e) -> new TopicPartition(errorTopicName, -1)), backOff);
factory.setAfterRollbackProcessor(afterRollbackProcessor);
}
public static Map<String, Object> createConsumerFactoryBaseProps(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
}
public static class TopicNames {
public static final String INPUT = "kafka_retry_store";
public static final String OUTPUT = "kafka_retry_index";
public static final String RETRY_1 = "kafka_retry_index_retry_1";
public static final String RETRY_2 = "kafka_retry_index_retry_2";
public static final String DLQ = "kafka_retry_index_dlq";
}
public static class GroupNames {
public static final String INPUT = "input";
public static final String RETRY_1 = "retry1";
public static final String RETRY_2 = "retry2";
}
}
application.properties
spring.kafka.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.max.poll.records=${KAFKA_CONSUMER_MAX_POLL_RECORDS:10}
spring.kafka.consumer.properties.max.poll.interval.ms=${KAFKA_CONSUMER_MAX_POLL_INTERVAL_MS:180000}
spring.kafka.consumer.properties.heartbeat.interval.ms=${KAFKA_CONSUMER_HEARTBEAT_INTERVAL_MS:3000}
spring.kafka.consumer.properties.session.timeout.ms=${KAFKA_CONSUMER_SESSION_TIMEOUT_MS:10000}
spring.kafka.consumer.isolation-level=read-committed
spring.kafka.producer.transaction-id-prefix=${KAFKA_TRANSACTION_ID_PREFIX:${spring.application.name}.}
spring.kafka.producer.retries=10
spring.kafka.producer.acks=all
spring.kafka.producer.properties.enable.idempotence=true
spring.cloud.discovery.enabled=false
spring.cloud.stream.bindings.default.consumer.use-native-decoding=true
spring.cloud.stream.bindings.default.producer.use-native-encoding=true
spring.cloud.stream.bindings.default.producer.auto-startup=true
spring.cloud.stream.kafka.binder.auto-create-topics=${KAFKA_AUTOCREATE_TOPICS:false}
spring.cloud.stream.kafka.binder.brokers=${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
spring.cloud.stream.kafka.binder.required-acks=1
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=${KAFKA_TRANSACTION_ID_PREFIX:${spring.application.name}.}
spring.cloud.stream.kafka.binder.transaction.producer.configuration.acks=all
spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=10
spring.zipkin.enabled=${TRACING_ENABLED:false}
spring.zipkin.discovery-client-enabled=false
spring.zipkin.service.name=${spring.application.name}
spring.zipkin.base-url=${ZIPKIN_BASE_URL:http://localhost:9411}
spring.zipkin.sender.type=web
spring.sleuth.trace-id128=true
spring.sleuth.sampler.rate=${SLEUTH_SAMPLER_RATE:50}
spring.sleuth.sampler.probability=${SLEUTH_SAMPLER_PROBABILITY:0.1}
#spring.security.oauth2.resourceserver.jwt.public-key-location=classpath:keys/jwt-keys.pem.pub
spring.groovy.template.check-template-location=false
server.servlet.context-path=${SERVER_SERVLET_CONTEXT_PATH:/}
server.compression.enabled=true
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=${MANAGEMENT_HEALTH_SHOW_DETAILS:always}
management.endpoint.health.group.liveness.include=info
management.endpoint.health.group.liveness.show-details=ALWAYS
management.endpoint.health.group.readiness.include=diskSpace,ping
management.endpoint.health.group.readiness.show-details=ALWAYS
management.info.git.mode=full
management.server.servlet.context-path=${MANAGEMENT_SERVER_SERVLET_CONTEXT_PATH:/}
bootstrap.yml
spring:
main.banner-mode: off
application.name: ${SPRING_APPLICATION_NAME:kafka-streams-retry}
cloud:
config:
enabled: false
logging.pattern.level: "%5p [${spring.zipkin.service.name:${spring.application.name:-}},%X{user:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]"
application.yml
spring:
kafka.producer.transaction-id-prefix: txDummy.
cloud:
stream:
bindings:
input:
group: input
destination: kafka_retry_store
consumer:
back-off-max-interval: 1000
max-attempts: 1
kafka:
binder:
configuration:
auto.offset.reset: earliest
headers: x-retries
transaction:
transaction-id-prefix: txKafkaStreamRetry2.
producer:
configuration:
acks: all
retries: 10
bindings:
input:
consumer:
ack-each-record: true
enable-dlq: false
auto-commit-offset: true
auto-commit-on-error: false
info.component: Kafka Retry Service
server.port: ${SERVER_PORT:9082}
management.server.port: ${MANAGEMENT_SERVER_PORT:9083}
EventListenerService
package dk.digst.digital.post.kafka.retry;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;
import dk.digst.digital.post.kafka.retry.KafkaStreamsRetryApplication.GroupNames;
import dk.digst.digital.post.kafka.retry.KafkaStreamsRetryApplication.TopicNames;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
public class EventListenerService {
@StreamListener(Processor.INPUT)
public void eventHandler(GenericMessage<?> message) {
log.debug("INPUT, received: {}", message);
throw new RuntimeException("Create fault! in INPUT");
}
@KafkaListener(topics = TopicNames.RETRY_1, groupId = GroupNames.RETRY_1,
containerFactory = "retry1KafkaListenerContainerFactory")
public void retry1Handler(@Payload byte[] record,
@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) {
log.info("Recieved (delivery: {}) in RETRY1", delivery);
throw new RuntimeException("Create fault! on retry1");
}
@KafkaListener(topics = TopicNames.RETRY_2, groupId = GroupNames.RETRY_2,
containerFactory = "retry2KafkaListenerContainerFactory")
public void retry2Handler(@Payload byte[] record,
@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) {
log.info("Recieved (delivery: {}) in RETRY2", delivery);
throw new RuntimeException("Create fault! on retry2");
}
}
build.gradle
buildscript {
ext {
springBootVersion = '2.3.2.RELEASE'
springCloudVersion = 'Hoxton.SR6'
springDataVersion = 'Neumann-SR1'
}
}
plugins {
id 'org.springframework.boot' version "${springBootVersion}"
id 'java-library'
id 'groovy'
id 'maven-publish'
id 'org.unbroken-dome.test-sets' version '3.0.1'
}
configurations.all {
resolutionStrategy.cacheChangingModulesFor 0, 'seconds'
resolutionStrategy.cacheDynamicVersionsFor 0, 'seconds'
}
apply plugin: 'maven-publish'
apply plugin: 'java-library'
apply plugin: 'groovy'
apply plugin: 'org.unbroken-dome.test-sets'
apply plugin: 'jacoco'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
bom
compileOnly.extendsFrom(bom)
annotationProcessor.extendsFrom(bom)
implementation.extendsFrom(bom)
}
dependencies {
bom platform (group: 'org.springframework.boot', name: 'spring-boot-dependencies', version: "${springBootVersion}")
bom platform (group: 'org.springframework.data', name: 'spring-data-releasetrain', version: "${springDataVersion}")
bom platform (group: 'org.springframework.cloud', name: 'spring-cloud-dependencies', version: "${springCloudVersion}")
annotationProcessor (group: 'org.projectlombok', name: 'lombok')
compileOnly (group: 'org.projectlombok', name: 'lombok')
implementation (group: 'org.springframework.boot', name: 'spring-boot-starter-web')
implementation (group: 'org.springframework.boot', name: 'spring-boot-starter-actuator')
implementation (group: 'org.springframework.data', name: 'spring-data-rest-webmvc')
implementation (group: 'org.springframework.cloud', name: 'spring-cloud-starter-sleuth')
implementation (group: 'org.springframework.cloud', name: 'spring-cloud-starter-zipkin')
implementation (group: 'org.springframework.cloud', name: 'spring-cloud-stream')
implementation (group: 'org.springframework.cloud', name: 'spring-cloud-stream-binder-kafka')
implementation (group: 'org.springframework.kafka', name: 'spring-kafka')
implementation (group: 'org.springframework', name: 'spring-messaging')
}
springBoot {
buildInfo()
}
It can be seen from kafka that the consumer is behind after this
docker exec -ti kafka bash -c 'bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9093 --group input --offsets'
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
input kafka_retry_store 0 2 4 2 consumer-input-2-85538449-9f60-40f9-8403-f32e2dbc6c28 /192.168.32.1 consumer-input-2
input kafka_retry_store 1 2 2 0 consumer-input-2-85538449-9f60-40f9-8403-f32e2dbc6c28 /192.168.32.1 consumer-input-2
input kafka_retry_store 2 4 4 0 consumer-input-2-85538449-9f60-40f9-8403-f32e2dbc6c28 /192.168.32.1 consumer-input-2
If I run this using only spring-kafka with kafka listeners, then everything works as expected. It can also be seen that the propagation using the spring-kafka retry1 and retry2 also works and gets the offset comitted. Only the StreamListener (binding) does not. This seems like an error.
EDIT
The “pure” spring-kafka setup
package dk.digst.digital.post.kafka.retry;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
@SpringBootApplication(proxyBeanMethods = false)
@EnableTransactionManagement
@EnableKafka
@EnableRetry
public class KafkaRetryApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaRetryApplication.class, args);
}
@Bean
public ProducerFactory<byte[], byte[]> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<byte[], byte[]> factory = new DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix("txPrefix.");
return factory;
}
@Bean
public KafkaTemplate<byte[], byte[]> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<byte[], byte[]> retryProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<byte[], byte[]> factory = new DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix("txRetryRecoverer.");
return factory;
}
@Bean
public KafkaTemplate<byte[], byte[]> retryKafkaTemplate() {
return new KafkaTemplate<>(retryProducerFactory());
}
@Bean
public ConsumerFactory<byte[], byte[]> inputConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
SetupHelper.createConsumerFactoryBaseProps(GroupNames.INPUT));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> inputKafkaListenerContainerFactory(
KafkaTransactionManager<byte[], byte[]> kafkaTransactionManager,
ConsumerFactory<byte[], byte[]> inputConsumerFactory,
KafkaTemplate<byte[], byte[]> retryKafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
SetupHelper.configureFactory(factory, kafkaTransactionManager, inputConsumerFactory,
retryKafkaTemplate, TopicNames.RETRY_1, new FixedBackOff(100L, 1L));
return factory;
}
@Bean
public ConsumerFactory<byte[], byte[]> retry1ConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
SetupHelper.createConsumerFactoryBaseProps(GroupNames.RETRY_1));
}
@Bean
public ConsumerFactory<byte[], byte[]> retry2ConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
SetupHelper.createConsumerFactoryBaseProps(GroupNames.RETRY_2));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> retry1KafkaListenerContainerFactory(
KafkaTransactionManager<byte[], byte[]> kafkaTransactionManager,
ConsumerFactory<byte[], byte[]> retry1ConsumerFactory,
KafkaTemplate<byte[], byte[]> retryKafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
SetupHelper.configureFactory(factory, kafkaTransactionManager, retry1ConsumerFactory,
retryKafkaTemplate, TopicNames.RETRY_2, new FixedBackOff(100L, 1L));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> retry2KafkaListenerContainerFactory(
KafkaTransactionManager<byte[], byte[]> kafkaTransactionManager,
ConsumerFactory<byte[], byte[]> retry2ConsumerFactory,
KafkaTemplate<byte[], byte[]> retryKafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
SetupHelper.configureFactory(factory, kafkaTransactionManager, retry2ConsumerFactory,
retryKafkaTemplate, TopicNames.DLQ, new FixedBackOff(100L, 1L));
return factory;
}
public static class SetupHelper {
public static <K, V> void setContainerProps(ContainerProperties containerProperties,
KafkaTransactionManager<K, V> kafkaTransactionManager) {
containerProperties.setIdleEventInterval(60000L);
containerProperties.setTransactionManager(kafkaTransactionManager);
containerProperties.setAckMode(AckMode.RECORD);
containerProperties.setDeliveryAttemptHeader(true);
}
public static <K, V> void configureFactory(
ConcurrentKafkaListenerContainerFactory<K, V> factory,
KafkaTransactionManager<K, V> kafkaTransactionManager,
ConsumerFactory<K, V> consumerFactory, KafkaOperations<K, V> kafkaTemplate,
String errorTopicName, BackOff backOff) {
setContainerProps(factory.getContainerProperties(), kafkaTransactionManager);
factory.setConsumerFactory(consumerFactory);
DefaultAfterRollbackProcessor<K, V> afterRollbackProcessor =
new DefaultAfterRollbackProcessor<>(new DeadLetterPublishingRecoverer(kafkaTemplate,
(cr, e) -> new TopicPartition(errorTopicName, -1)), backOff);
factory.setAfterRollbackProcessor(afterRollbackProcessor);
}
public static Map<String, Object> createConsumerFactoryBaseProps(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
}
public static class TopicNames {
public static final String INPUT = "kafka_retry_store";
public static final String OUTPUT = "kafka_retry_index";
public static final String RETRY_1 = "kafka_retry_index_retry_1";
public static final String RETRY_2 = "kafka_retry_index_retry_2";
public static final String DLQ = "kafka_retry_index_dlq";
}
public static class GroupNames {
public static final String INPUT = "input";
public static final String RETRY_1 = "retry1";
public static final String RETRY_2 = "retry2";
}
}
bootstrap.yml
spring:
main.banner-mode: off
application.name: ${SPRING_APPLICATION_NAME:kafka-retry}
cloud:
config:
enabled: false
logging.pattern.level: "%5p [${spring.zipkin.service.name:${spring.application.name:-}},%X{user:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]"
application.yml
spring:
kafka:
producer:
transaction-id-prefix: ${KAFKA_TRANSACTION_ID_PREFIX:${spring.application.name}.}
retries: 10
acks: all
properties:
enable:
idempotence: true
info.component: Kafka Retry Service
server.port: ${SERVER_PORT:9080}
management.server.port: ${MANAGEMENT_SERVER_PORT:9081}
EventListenerService
package dk.digst.digital.post.kafka.retry;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import dk.digst.digital.post.kafka.retry.KafkaRetryApplication.TopicNames;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
public class EventListenerService {
@KafkaListener(topics = TopicNames.INPUT, groupId = "input",
containerFactory = "inputKafkaListenerContainerFactory")
public void inputHandler(@Payload byte[] record,
@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) {
log.info("Recieved (delivery: {}) in INPUT: {}", delivery, record);
throw new RuntimeException("Create fault! on input");
}
@KafkaListener(topics = TopicNames.RETRY_1, groupId = "retry1",
containerFactory = "retry1KafkaListenerContainerFactory")
public void retry1Handler(@Payload byte[] record,
@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) {
log.info("Recieved (delivery: {}) in RETRY1: {}", delivery, record);
throw new RuntimeException("Create fault! on retry1");
}
@KafkaListener(topics = TopicNames.RETRY_2, groupId = "retry2",
containerFactory = "retry2KafkaListenerContainerFactory")
public void retry2Handler(@Payload byte[] record,
@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) {
log.info("Recieved (delivery: {}) in RETRY2: {}", delivery, record);
throw new RuntimeException("Create fault! on retry2");
}
}
application.properties & build.gradle are identical to the spring cloud stream ones.
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Reactions: 1
- Comments: 32 (15 by maintainers)
Yes, it’s not a “real” lag.
Best guess is you are somehow not processing the
@KafkaListener
in a transaction - the key is to see the offset jump by two for each send and you should always see a lag of 1 with a transactional producer. I added your second-stage retry (with a bit simpler configuration) and it works fine for me.Turns out we can’t do it in spring-kafka, but the binder should certainly not require this complexity to configure it.
This worked for me:
Gives
Doing
Results in: