spring boot整合spring-kafka實(shí)現(xiàn)發(fā)送接收消息實(shí)例代碼
前言
由于我們的新項(xiàng)目使用的是spring-boot,而又要同步新項(xiàng)目中建的數(shù)據(jù)到老的系統(tǒng)當(dāng)中.原來(lái)已經(jīng)有一部分的同步代碼,使用的是kafka. 其實(shí)只是做數(shù)據(jù)的同步,我覺(jué)得選MQ沒(méi)必要使用kafka.首先數(shù)據(jù)量不大,其實(shí)搞kafka又要搞集群,ZK.只是用做一些簡(jiǎn)單數(shù)據(jù)同步的話,有點(diǎn)大材小用.
沒(méi)辦法,咱只是個(gè)打工的,領(lǐng)導(dǎo)讓搞就搞吧.剛開(kāi)始的時(shí)候發(fā)現(xiàn)有一個(gè)spring-integration-kafka,描述中說(shuō)是基于spring-kafka做了一次重寫.但是我看了官方文檔.實(shí)在是搞的有點(diǎn)頭大.功能一直沒(méi)實(shí)現(xiàn).文檔寫的也不是很漂亮,也可能是剛起步,有很多的問(wèn)題.我這里只能放棄了,使用了spring-kafka.
實(shí)現(xiàn)方法
pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.linuxsogood.sync</groupId>
<artifactId>linuxsogood-sync</artifactId>
<version>1.0.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.0.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<!-- 依賴版本 -->
<mybatis.version>3.3.1</mybatis.version>
<mybatis.spring.version>1.2.4</mybatis.spring.version>
<mapper.version>3.3.6</mapper.version>
<pagehelper.version>4.1.1</pagehelper.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.0.1.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>4.3.1.RELEASE</version>
<scope>compile</scope>
</dependency>-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>
<!--<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.2.3.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>sqljdbc4</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.11</version>
</dependency>
<!--Mybatis-->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>${mybatis.version}</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>${mybatis.spring.version}</version>
</dependency>
<!--<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>-->
<!-- Mybatis Generator -->
<dependency>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-core</artifactId>
<version>1.3.2</version>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<!--分頁(yè)插件-->
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper</artifactId>
<version>${pagehelper.version}</version>
</dependency>
<!--通用Mapper-->
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper</artifactId>
<version>${mapper.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>repo.spring.io.milestone</id>
<name>Spring Framework Maven Milestone Repository</name>
<url>https://repo.spring.io/libs-milestone</url>
</repository>
</repositories>
<build>
<finalName>mybatis_generator</finalName>
<plugins>
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.2</version>
<configuration>
<verbose>true</verbose>
<overwrite>true</overwrite>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>org.linuxsogood.sync.Starter</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
orm層使用了MyBatis,又使用了通用Mapper和分頁(yè)插件.
kafka消費(fèi)端配置
import org.linuxsogood.sync.listener.Listener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.broker.address}")
private String brokerAddress;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "firehome-group");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
}
@Bean
public Listener listener() {
return new Listener();
}
}
生產(chǎn)者的配置.
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.broker.address}")
private String brokerAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
監(jiān)聽(tīng),監(jiān)聽(tīng)里面,寫的就是業(yè)務(wù)邏輯了,從kafka里面得到數(shù)據(jù)后,具體怎么去處理. 如果需要開(kāi)啟kafka處理消息的廣播模式,多個(gè)監(jiān)聽(tīng)要監(jiān)聽(tīng)不同的group,即方法上的注解@KafkaListener里的group一定要不一樣.如果多個(gè)監(jiān)聽(tīng)里的group寫的一樣,就會(huì)造成只有一個(gè)監(jiān)聽(tīng)能處理其中的消息,另外監(jiān)聽(tīng)就不能處理消息了.也即是kafka的分布式消息處理方式.
在同一個(gè)group里的監(jiān)聽(tīng),共同處理接收到的消息,會(huì)根據(jù)一定的算法來(lái)處理.如果不在一個(gè)組,但是監(jiān)聽(tīng)的是同一個(gè)topic的話,就會(huì)形成廣播模式
import com.alibaba.fastjson.JSON;
import org.linuxsogood.qilian.enums.CupMessageType;
import org.linuxsogood.qilian.kafka.MessageWrapper;
import org.linuxsogood.qilian.model.store.Store;
import org.linuxsogood.sync.mapper.StoreMapper;
import org.linuxsogood.sync.model.StoreExample;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.List;
import java.util.Optional;
public class Listener {
private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class);
@Autowired
private StoreMapper storeMapper;
/**
* 監(jiān)聽(tīng)kafka消息,如果有消息則消費(fèi),同步數(shù)據(jù)到新烽火的庫(kù)
* @param record 消息實(shí)體bean
*/
@KafkaListener(topics = "linuxsogood-topic", group = "sync-group")
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
try {
MessageWrapper messageWrapper = JSON.parseObject(message.toString(), MessageWrapper.class);
CupMessageType type = messageWrapper.getType();
//判斷消息的數(shù)據(jù)類型,不同的數(shù)據(jù)入不同的表
if (CupMessageType.STORE == type) {
proceedStore(messageWrapper);
}
} catch (Exception e) {
LOGGER.error("將接收到的消息保存到數(shù)據(jù)庫(kù)時(shí)異常, 消息:{}, 異常:{}",message.toString(),e);
}
}
}
/**
* 消息是店鋪類型,店鋪消息處理入庫(kù)
* @param messageWrapper 從kafka中得到的消息
*/
private void proceedStore(MessageWrapper messageWrapper) {
Object data = messageWrapper.getData();
Store cupStore = JSON.parseObject(data.toString(), Store.class);
StoreExample storeExample = new StoreExample();
String storeName = StringUtils.isBlank(cupStore.getStoreOldName()) ? cupStore.getStoreName() : cupStore.getStoreOldName();
storeExample.createCriteria().andStoreNameEqualTo(storeName);
List<org.linuxsogood.sync.model.Store> stores = storeMapper.selectByExample(storeExample);
org.linuxsogood.sync.model.Store convertStore = new org.linuxsogood.sync.model.Store();
org.linuxsogood.sync.model.Store store = convertStore.convert(cupStore);
//如果查詢不到記錄則新增
if (stores.size() == 0) {
storeMapper.insert(store);
} else {
store.setStoreId(stores.get(0).getStoreId());
storeMapper.updateByPrimaryKey(store);
}
}
}
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作能帶來(lái)一定的幫助,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
新建springboot項(xiàng)目時(shí),entityManagerFactory報(bào)錯(cuò)的解決
這篇文章主要介紹了新建springboot項(xiàng)目時(shí),entityManagerFactory報(bào)錯(cuò)的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01
elasticsearch如何根據(jù)條件刪除數(shù)據(jù)
Elasticsearch是一個(gè)基于Apache Lucene?的開(kāi)源搜索引擎,無(wú)論在開(kāi)源還是專有領(lǐng)域,Lucene 可以被認(rèn)為是迄今為止最先進(jìn)、性能最好的、功能最全的搜索引擎庫(kù),這篇文章主要介紹了elasticsearch如何根據(jù)條件刪除數(shù)據(jù),需要的朋友可以參考下2023-03-03
Java @Transactional與synchronized使用的問(wèn)題
這篇文章主要介紹了Java @Transactional與synchronized使用的問(wèn)題,了解內(nèi)部原理是為了幫助我們做擴(kuò)展,同時(shí)也是驗(yàn)證了一個(gè)人的學(xué)習(xí)能力,如果你想讓自己的職業(yè)道路更上一層樓,這些底層的東西你是必須要會(huì)的2023-01-01
Java中Lombok @Value注解導(dǎo)致的variable not been initialized問(wèn)題
本文主要介紹了Java中Lombok @Value注解導(dǎo)致的variable not been initialized問(wèn)題,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-07-07
詳解Java如何實(shí)現(xiàn)數(shù)值校驗(yàn)的算法
給定一個(gè)字符串如何判斷它是否為數(shù)值類型?本文將帶著大家學(xué)習(xí)一下如何利用Java實(shí)現(xiàn)這個(gè)判斷算法,感興趣的小伙伴可以學(xué)習(xí)一下2022-04-04
Spring實(shí)戰(zhàn)之依賴關(guān)系注入之后的行為示例
這篇文章主要介紹了Spring實(shí)戰(zhàn)之依賴關(guān)系注入之后的行為,結(jié)合實(shí)例形式分析了Spring依賴關(guān)系注入之后的行為實(shí)現(xiàn)與使用相關(guān)操作技巧,需要的朋友可以參考下2019-11-11
Spring?AI借助全局參數(shù)實(shí)現(xiàn)智能數(shù)據(jù)庫(kù)操作與個(gè)性化待辦管理
這篇文章主要介紹了Spring?AI借助全局參數(shù)實(shí)現(xiàn)智能數(shù)據(jù)庫(kù)操作與個(gè)性化待辦管理,本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2024-11-11

