spring-kafka: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
To add transaction support in my application, I followed the below mentioned approach:
- Configuration File
@Configuration
@EnableKafka
@EnableTransactionManagement
public class KafkaProducerConfig {
@Autowired
private KafkaConfiguration kafkaConfiguration; // default configurations from application.properties
@Bean
public Map producerConfigurations() {
Map configurations = new HashMap(kafkaConfiguration.getProducer());
configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
return configurations;
}
@Bean
public DefaultKafkaProducerFactory producerFactory() {
DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(producerConfigurations());
producerFactory.setTransactionIdPrefix("transaction.prefix.");
return producerFactory;
}
@Bean
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager transactionManager = new KafkaTransactionManager(producerFactory());
transactionManager.setNestedTransactionAllowed(true);
return transactionManager;
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
}
- Implementation (Call to the below mentioned method)
@Autowired
private KafkaTemplate kafkaTemplate;
@Transactional
public void publishAsTransaction(String topic, String messages[]) {
for(String message : messages) {
kafkaTemplate.send(topic, message);
}
}
After calling the publishAsTransaction method with appropriate topic and messages[], the first run is successful with all messages getting published to the topic. But, every subsequent run after that gives me the error Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION.
Is there anything wrong with my approach? Kindly guide me on the same.
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 30 (14 by maintainers)
FYI - this works fine for me…
Thanks; but if you determine the root cause, please post here in case there’s something we can do in the framework to detect a mis-configuration.
Hi @garyrussell it works for me too, both DB an Kafka transactions are rolled back when exception was thrown in transactional method. Thanks again for your help guys.