java發(fā)送kafka事務(wù)消息的實(shí)現(xiàn)方法
前言
事務(wù)對(duì)java開(kāi)發(fā)的同學(xué)來(lái)說(shuō)并不陌生,我們使用事務(wù)的目的在于避免產(chǎn)生重復(fù)數(shù)據(jù)或者說(shuō)利用數(shù)據(jù)存儲(chǔ)中間件的事務(wù)特性確保數(shù)據(jù)的精準(zhǔn)性,比如大家熟悉的mysql,我們?cè)诔绦蜷_(kāi)始時(shí),只需要在程序中添加上事務(wù)注解即可
kafka客戶(hù)端事務(wù),直接使用客戶(hù)端提供的相關(guān)的API即可,和jdbc事務(wù)的使用很類(lèi)似,主要包含下面5個(gè)API
// 1 初始化事務(wù) void initTransactions(); // 2 開(kāi)啟事務(wù) void beginTransaction() throws ProducerFencedException; // 3 在事務(wù)內(nèi)提交已經(jīng)消費(fèi)的偏移量(主要用于消費(fèi)者) void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException; // 4 提交事務(wù) void commitTransaction() throws ProducerFencedException; // 5 放棄事務(wù)(類(lèi)似于回滾事務(wù)的操作) void abortTransaction() throws ProducerFencedException;
下面結(jié)合實(shí)際的代碼以及效果演示進(jìn)行說(shuō)明
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerTransaction {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建 kafka 生產(chǎn)者的配置對(duì)象
Properties properties = new Properties();
// 2. 給 kafka 配置對(duì)象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 設(shè)置事務(wù) id(必須),事務(wù) id 任意起名
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
// 3. 創(chuàng)建 kafka 生產(chǎn)者對(duì)象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 初始化事務(wù)
kafkaProducer.initTransactions();
// 開(kāi)啟事務(wù)
kafkaProducer.beginTransaction();
System.out.println("開(kāi)始發(fā)送消息");
try {
// 4. 調(diào)用 send 方法,發(fā)送消息
for (int i = 0; i < 5; i++) {
// 發(fā)送消息
kafkaProducer.send(new ProducerRecord<>("zcy222", "hello kafka " + i));
}
//int i = 1 / 0;
// 提交事務(wù)
kafkaProducer.commitTransaction();
} catch (Exception e) {
System.out.println(e);
// 終止事務(wù)
kafkaProducer.abortTransaction();
} finally {
// 5. 關(guān)閉資源
kafkaProducer.close();
}
}
}運(yùn)行上面的代碼,正常是可以發(fā)送到指定的topic下

接下來(lái),我們將上面的代碼中的 1/0 放開(kāi),再次運(yùn)行程序,可以看到,程序中拋異常了,但是消息并沒(méi)有發(fā)送到kafka的broker,說(shuō)明事務(wù)的配置生效了

到此這篇關(guān)于java發(fā)送kafka事務(wù)消息的實(shí)現(xiàn)方法的文章就介紹到這了,更多相關(guān)java發(fā)送kafka事務(wù)消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring 使用JavaConfig實(shí)現(xiàn)配置的方法步驟
這篇文章主要介紹了Spring 使用JavaConfig實(shí)現(xiàn)配置的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01
詳解Java刪除Map中元素java.util.ConcurrentModificationException”異常解決
這篇文章主要介紹了詳解Java刪除Map中元素java.util.ConcurrentModificationException”異常解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01
JSP 開(kāi)發(fā)之 releaseSession的實(shí)例詳解
這篇文章主要介紹了JSP 開(kāi)發(fā)之 releaseSession的實(shí)例詳解的相關(guān)資料,需要的朋友可以參考下2017-07-07
關(guān)于application.yml數(shù)據(jù)庫(kù)配置方式
這篇文章主要介紹了關(guān)于application.yml數(shù)據(jù)庫(kù)配置方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08
IDEA插件Statistic統(tǒng)計(jì)代碼快速分辨爛項(xiàng)目
這篇文章主要為大家介紹了使用IDEA插件Statistic來(lái)統(tǒng)計(jì)項(xiàng)目代碼,幫助大家快速識(shí)別出爛項(xiàng)目,有需要的朋友可以借鑒參考下,希望能夠有所幫助2022-01-01
Spring?Session(分布式Session共享)實(shí)現(xiàn)示例
這篇文章主要介紹了Spring?Session(分布式Session共享)實(shí)現(xiàn)示例,文章內(nèi)容詳細(xì),需要的朋友可以參考下2023-01-01
Spring?IOC容器Bean注解創(chuàng)建對(duì)象組件掃描
這篇文章主要為大家介紹了Spring?IOC容器Bean注解創(chuàng)建對(duì)象組件掃描,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05
Spring AOP里的靜態(tài)代理和動(dòng)態(tài)代理用法詳解
這篇文章主要介紹了 Spring AOP里的靜態(tài)代理和動(dòng)態(tài)代理用法詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07

