Springboot微服務(wù)項(xiàng)目整合Kafka實(shí)現(xiàn)文章上下架功能
前言:
1.前面基于Springboot的單體項(xiàng)目介紹已經(jīng)完結(jié)了,至于項(xiàng)目中的其他功能實(shí)現(xiàn)我這里就不打算介紹了,因?yàn)樯婕暗闹R(shí)點(diǎn)不難,而且都是簡單的CRUD操作,假如有興趣的話可以私信我我再看看要不要寫幾篇文章做個(gè)介紹。
2.完成上一階段的學(xué)習(xí),我就投入到了微服務(wù)的學(xué)習(xí)當(dāng)中,所用教程為B站上面黑馬的微服務(wù)教程。由于我的記性不是很好,所以對(duì)于新事物的學(xué)習(xí)我比較喜歡做筆記以加強(qiáng)理解,在這里我會(huì)將筆記的重點(diǎn)內(nèi)容做個(gè)總結(jié)發(fā)布到“微服務(wù)學(xué)習(xí)”筆記欄目中。我是趙四,一名有追求的程序員,希望大家能多多支持,能給我點(diǎn)個(gè)關(guān)注就更好了。
一:Kafka消息發(fā)送快速入門
1.傳遞字符串消息
(1)發(fā)送消息
創(chuàng)建一個(gè)Controller包并編寫一個(gè)測試類用于發(fā)送消息
package com.my.kafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("hello")
public String helloProducer(){
kafkaTemplate.send("my-topic","Hello~");
return "ok";
}
}(2)監(jiān)聽消息
編寫測試類用于接收消息:
package com.my.kafka.listener;
import org.junit.platform.commons.util.StringUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class HelloListener {
@KafkaListener(topics = "my-topic")
public void helloListener(String message) {
if(StringUtils.isNotBlank(message)) {
System.out.println(message);
}
}
}(3)測試結(jié)果
打開瀏覽器輸入localhost:9991/hello,然后到控制臺(tái)查看消息,可以看到成功消息監(jiān)聽到并且進(jìn)行了消費(fèi)。

2.傳遞對(duì)象消息
目前springboot整合后的kafka,因?yàn)樾蛄谢魇荢tringSerializer,這個(gè)時(shí)候如果需要傳遞對(duì)象可以有兩種方式:
方式一:可以自定義序列化器,對(duì)象類型眾多,這種方式通用性不強(qiáng),這里不做介紹。
方式二:可以把要傳遞的對(duì)象進(jìn)行轉(zhuǎn)json字符串,接收消息后再轉(zhuǎn)為對(duì)象即可,本項(xiàng)目采用這種方式。
(1)修改生產(chǎn)者代碼
@GetMapping("hello")
public String helloProducer(){
User user = new User();
user.setName("趙四");
user.setAge(20);
kafkaTemplate.send("my-topic", JSON.toJSONString(user));
return "ok";
}(2)結(jié)果測試

可以看到成功接收都對(duì)象參數(shù),后期要使用該對(duì)象只需要將其轉(zhuǎn)換成User對(duì)象即可。
二:功能引入
1.需求分析
發(fā)布文章之后,可能會(huì)由于文章出現(xiàn)某些錯(cuò)誤或者其他原因,我們會(huì)在文章管理端實(shí)現(xiàn)文章的上下架功能(見下圖),也即當(dāng)管理端實(shí)現(xiàn)對(duì)文章下架之后移動(dòng)端將不會(huì)再展示該文章,只有該文章重新被上架之后才能在移動(dòng)端看到該文章信息。

2.邏輯分析

后端接收到前端傳過來的參數(shù)之后要先做一個(gè)校驗(yàn),參數(shù)不為空才能繼續(xù)往下執(zhí)行,首先應(yīng)該根據(jù)前端傳過來的文章id(自媒體端文章id)查詢自媒體數(shù)據(jù)庫的文章信息并判斷該文章是否已是發(fā)布狀態(tài),因?yàn)橹挥袑徍顺晒Σ⒊晒Πl(fā)布了的文章才能進(jìn)行上下架操作。自媒體端微服務(wù)對(duì)文章上下架狀態(tài)進(jìn)行修改之后便可以向Kafka發(fā)送一條消息,該消息為Map對(duì)象,里面存儲(chǔ)的數(shù)據(jù)為移動(dòng)端的文章id以及前端傳過來的上下架參數(shù)enable,當(dāng)然要將該Map對(duì)象轉(zhuǎn)換成JSON字符串才能進(jìn)行發(fā)送。
文章微服務(wù)監(jiān)聽到Kafka發(fā)送過來的消息之后將JSON字符串轉(zhuǎn)換成Map對(duì)象之后再獲取相關(guān)參數(shù)對(duì)移動(dòng)端文章的上下架狀態(tài)進(jìn)行修改。
三:前期準(zhǔn)備
1.引入依賴
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>2.定義常量
package com.my.common.constans;
public class WmNewsMessageConstants {
public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";
}3.Kafka配置信息
由于我是用Nacos來作為注冊(cè)中心,所以配置信息放置在Nacos上面即可。
(1)自媒體端配置
spring:
kafka:
bootstrap-servers: 4.234.52.122:9092
producer:
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer(2)移動(dòng)端配置
spring:
kafka:
bootstrap-servers: 4.234.52.122:9092
consumer:
group-id: ${spring.application.name}-test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer四:代碼實(shí)現(xiàn)
1.自媒體端
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
* 文章下架或上架
* @param id
* @param enable
* @return
*/
@Override
public ResponseResult downOrUp(Integer id,Integer enable) {
log.info("執(zhí)行文章上下架操作...");
if(id == null || enable == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
//根據(jù)id獲取文章
WmNews news = getById(id);
if(news == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在");
}
//獲取當(dāng)前文章狀態(tài)
Short status = news.getStatus();
if(!status.equals(WmNews.Status.PUBLISHED.getCode())) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非發(fā)布狀態(tài),不能上下架");
}
//更改文章狀態(tài)
news.setEnable(enable.shortValue());
updateById(news);
log.info("更改文章上架狀態(tài){}-->{}",status,news.getEnable());
//發(fā)送消息到Kafka
Map<String, Object> map = new HashMap<>();
map.put("articleId",news.getArticleId());
map.put("enable",enable.shortValue());
kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
log.info("發(fā)送消息到Kafka...");
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}2.移動(dòng)端
(1)設(shè)置監(jiān)聽器
package com.my.article.listener;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.my.article.service.ApArticleService;
import com.my.common.constans.WmNewsMessageConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;
@Slf4j
@Component
public class EnableListener {
@Autowired
private ApArticleService apArticleService;
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void downOrUp(String message) {
if(StringUtils.isNotBlank(message)) {
log.info("監(jiān)聽到消息{}",message);
apArticleService.downOrUp(message);
}
}
}(2)獲取消息并修改文章狀態(tài)
/**
* 文章上下架
* @param message
* @return
*/
@Override
public ResponseResult downOrUp(String message) {
Map map = JSON.parseObject(message, Map.class);
//獲取文章id
Long articleId = (Long) map.get("articleId");
//獲取文章待修改狀態(tài)
Integer enable = (Integer) map.get("enable");
//查詢文章配置
ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne
(Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId));
if(apArticleConfig != null) {
//上架
if(enable == 1) {
log.info("文章重新上架");
apArticleConfig.setIsDown(false);
apArticleConfigMapper.updateById(apArticleConfig);
}
//下架
if(enable == 0) {
log.info("文章下架");
apArticleConfig.setIsDown(true);
apArticleConfigMapper.updateById(apArticleConfig);
}
}
else {
throw new RuntimeException("文章信息不存在");
}
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}到此這篇關(guān)于Springboot微服務(wù)項(xiàng)目整合Kafka實(shí)現(xiàn)文章上下架功能的文章就介紹到這了,更多相關(guān)Springboot整合Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring boot注解@Async線程池實(shí)例詳解
這篇文章主要介紹了Spring boot注解@Async線程池實(shí)例詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
SpringBoot接收J(rèn)SON類型的參數(shù)方式
這篇文章主要介紹了SpringBoot接收J(rèn)SON類型的參數(shù)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2025-03-03
Java動(dòng)態(tài)替換properties文件中鍵值方式
這篇文章主要介紹了Java動(dòng)態(tài)替換properties文件中鍵值方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08
Java Web請(qǐng)求與響應(yīng)實(shí)例詳解
這篇文章主要介紹了Java Web請(qǐng)求與響應(yīng)實(shí)例詳解的相關(guān)資料,需要的朋友可以參考下2016-05-05
Mybatis配置之properties和settings標(biāo)簽的用法
這篇文章主要介紹了Mybatis配置之properties和settings標(biāo)簽的用法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07

