springboot使用kafka事務的示例代碼
先看下下面這種情況,程序都出錯了,按理說消息也不應該成功
@GetMapping("/send")
public void test9(String message) {
kafkaTemplate.send(topic, message);
throw new RuntimeException("fail");
}但是執(zhí)行結果是發(fā)生了異常并且消息發(fā)送成功了:

Kafka 同數(shù)據(jù)庫一樣支持事務,當發(fā)生異常的時候可以進行回滾,確保消息監(jiān)聽器不會接收到一些錯誤的或者不需要的消息。
kafka事務屬性是指一系列的生產者生產消息和消費者提交偏移量的操作在一個事務,或者說是是一個原子操作),同時成功或者失敗。使用事務也很簡單,需要先開啟事務支持,然后再使用。
如何開啟事務
如果使用默認配置只需要在yml添加spring.kafka.producer.transaction-id-prefix配置來開啟事務,之前沒有使用默認的配置,自定義的kafkaTemplate,那么需要在ProducerFactory中設置事務Id前綴開啟事務并將KafkaTransactionManager注入到spring中,看下KafkaProducerConfig完整代碼:
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
public Map<String,Object> producerConfigs(){
Map<String,Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG,retries);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 配置分區(qū)策略
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.example.springbootkafka.config.CustomizePartitioner");
// 配置生產者攔截器
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.example.springbootkafka.interceptor.CustomProducerInterceptor");
// 配置攔截器消息處理類
SendMessageInterceptorUtil sendMessageInterceptorUtil = new SendMessageInterceptorUtil();
props.put("interceptorUtil",sendMessageInterceptorUtil);
return props;
}
@Bean
public ProducerFactory<String,String> producerFactory(){
DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(producerConfigs());
//設置事務Id前綴 開啟事務
producerFactory.setTransactionIdPrefix("tx-");
return producerFactory;
}
@Bean
public KafkaTemplate<String,String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTransactionManager<Integer, String> kafkaTransactionManager(ProducerFactory<String, String> producerFactory) {
return new KafkaTransactionManager(producerFactory);
}
} 配置開啟事務后,使用大體有兩種方式,先記錄下第一種使用事務方式:使用 executeInTransaction 方法
直接看下代碼:
@GetMapping("/send11")
public void test11(String message) {
kafkaTemplate.executeInTransaction(operations ->{
operations.send(topic,message);
throw new RuntimeException("fail");
});
}當然你可以這么寫:
@GetMapping("/send11")
public void test11(String message) {
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback(){
@Override
public Object doInOperations(KafkaOperations operations) {
operations.send(topic,message);
throw new RuntimeException("fail");
}
});
}啟動項目,訪問http://localhost:8080/send10?message=test10 結果如下:

如上:消費者沒打印消息,說明消息沒發(fā)送成功,并且前面會報錯org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted 的錯誤,說明事務生效了。
第一種使用事務方式:使用 @Transactional 注解方式 直接在方法上加上@Transactional注解即可,看下代碼:
@GetMapping("/send12")
@Transactional
public void test12(String message) {
kafkaTemplate.send(topic, message);
throw new RuntimeException("fail");
}如果開啟的事務,則后續(xù)發(fā)送消息必須使用@Transactional注解或者使用kafkaTemplate.executeInTransaction() ,否則拋出異常,異常信息如下:
貼下完整的異常吧:java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
到此這篇關于springboot使用kafka事務的文章就介紹到這了,更多相關springboot kafka事務內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Jenkins一鍵打包部署SpringBoot應用的方法步驟
本文主要介紹了使用Jenkins一鍵打包部署SpringBoot應用的方法步驟,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-12-12
Java超詳細精講數(shù)據(jù)結構之bfs與雙端隊列
廣搜BFS的基本思想是: 首先訪問初始點v并將其標志為已經訪問。接著通過鄰接關系將鄰接點入隊。然后每訪問過一個頂點則出隊。按照順序,訪問每一個頂點的所有未被訪問過的頂點直到所有的頂點均被訪問過。廣度優(yōu)先遍歷類似與層次遍歷2022-07-07

