spring-kafka: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION

To add transaction support in my application, I followed the below mentioned approach:

  1. Configuration File

@Configuration
@EnableKafka
@EnableTransactionManagement
public class KafkaProducerConfig {

    @Autowired
    private KafkaConfiguration kafkaConfiguration; // default configurations from application.properties

    @Bean
    public Map producerConfigurations() {
        Map configurations = new HashMap(kafkaConfiguration.getProducer());
        configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
        configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
        return configurations;
    }

    @Bean
    public DefaultKafkaProducerFactory producerFactory() {
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(producerConfigurations());
        producerFactory.setTransactionIdPrefix("transaction.prefix.");
        return producerFactory;
    }

    @Bean
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager transactionManager = new KafkaTransactionManager(producerFactory());
        transactionManager.setNestedTransactionAllowed(true);
        return transactionManager;
    }
    
    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }
}
  1. Implementation (Call to the below mentioned method)
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Transactional
    public void publishAsTransaction(String topic, String messages[]) {
        for(String message : messages) {
            kafkaTemplate.send(topic, message);
        }
    }

After calling the publishAsTransaction method with appropriate topic and messages[], the first run is successful with all messages getting published to the topic. But, every subsequent run after that gives me the error Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION.

Is there anything wrong with my approach? Kindly guide me on the same.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 30 (14 by maintainers)

Most upvoted comments

FYI - this works fine for me…

@SpringBootApplication
@EnableTransactionManagement
public class Kgh433Application {

	public static void main(String[] args) {
		ConfigurableApplicationContext ctx = SpringApplication.run(Kgh433Application.class, args);
		Map<String, PlatformTransactionManager> tms = ctx.getBeansOfType(PlatformTransactionManager.class);
		System.out.println(tms);
		ctx.close();
	}

	@Bean
	public ApplicationRunner runner(Foo foo) {
		return args -> foo.sendToKafkaAndDB();
	}

	@Bean
	public JpaTransactionManager transactionManager() {
		return new JpaTransactionManager();
	}

	@Bean
	public ChainedTransactionManager chainedTxM(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
		return new ChainedTransactionManager(jpa, kafka);
	}

	@Component
	public static class Foo {

		@Autowired
		private KafkaTemplate<Object, Object> template;

		@Autowired
		private MessageRepo repo;

		@Transactional(transactionManager = "chainedTxM")
		public void sendToKafkaAndDB() throws Exception {
			this.repo.save(new Message("foo", 1L));
			System.out.println(this.template.send("kgh433", "bar").get());
			System.out.println(this.template.send("kgh433", "baz").get());
		}

	}

}
spring.jpa.generate-ddl=true
spring.kafka.producer.transaction-id-prefix=txId

Thanks; but if you determine the root cause, please post here in case there’s something we can do in the framework to detect a mis-configuration.

Hi @garyrussell it works for me too, both DB an Kafka transactions are rolled back when exception was thrown in transactional method. Thanks again for your help guys.