一文快速掌握Spring?Cloud?Stream
本篇文章所涉及到的demo練習(xí) 使用的cloud 2021.0.3+ springboot2.6.8
一、概述簡(jiǎn)介
官網(wǎng):https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/
1.1. cloud Stream是什么
官方定義:Spring Cloud Stream是一個(gè)用于構(gòu)建 與 共享消息系統(tǒng) 連接的高度可擴(kuò)展的事件驅(qū)動(dòng)微服務(wù)。
目前主流的消息框架有:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
假設(shè)公司業(yè)務(wù)項(xiàng)目用了RabbitMQ,而大數(shù)據(jù)項(xiàng)目用了Kafka。這時(shí)候就會(huì)出現(xiàn)有兩個(gè)消息框架,相對(duì)于程序員來(lái)說(shuō)其實(shí)并不友好,還得兩個(gè)都掌握,正常對(duì)于一個(gè)程序員來(lái)說(shuō)熟練一個(gè)消息框架都不錯(cuò)了,何況還搞了兩個(gè),并且兩個(gè)維護(hù)起來(lái)也不好維護(hù)。
RabbitMQ和Kafka是兩個(gè)不同的框架,兩個(gè)消息模型上也存在著差異,并且代碼上用法也不一樣。Spring Cloud Stream就是不再關(guān)注具體MQ的細(xì)節(jié),可以在不改代碼的基礎(chǔ)上,來(lái)完成Rabbit和Kafka兩個(gè)不同的消息中間件的切換(這里的切換指的是原本用的RabbitMQ,但是用著用著發(fā)現(xiàn)kafka比較符合,所以想要換框架)。
總結(jié)成一句話:屏蔽底層消息中間件的差異,降低切換成本,統(tǒng)一消息的編程模型
注意:遺憾的是目前僅支持RabbitMQ、Kafka。
1.2. 設(shè)計(jì)思想
常規(guī)的MQ設(shè)計(jì)如下:

- Message:生產(chǎn)者/消費(fèi)者之間靠消息媒介傳遞信息內(nèi)容
- MessageChannel:消息必須走特定的通道
- 隊(duì)列:假如發(fā)消息會(huì)先發(fā)到消息隊(duì)列當(dāng)中
- 消息隊(duì)列的消息如何被消費(fèi)呢:訂閱的人可以進(jìn)行消費(fèi)
cloud Stream設(shè)計(jì)如下:
通過(guò)定義綁定器Binder作為中間層,實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。
在沒(méi)有綁定器這個(gè)概念的情況下,我們的SpringBoot應(yīng)用要直接與消息中間件進(jìn)行信息交互的時(shí)候,由于各消息中間件構(gòu)建的初衷不同,它們的實(shí)現(xiàn)細(xì)節(jié)上會(huì)有較大的差異性,通過(guò)定義綁定器作為中間層,完美地實(shí)現(xiàn)了應(yīng)用程序與消息中間件細(xì)節(jié)之間的隔離。Stream對(duì)消息中間件的進(jìn)一步封裝,可以做到代碼層面對(duì)中間件的無(wú)感知,甚至于動(dòng)態(tài)的切換中間件(rabbitmq切換為kafka),使得微服務(wù)開(kāi)發(fā)的高度解耦,服務(wù)可以關(guān)注更多自己的業(yè)務(wù)流程
注意:左圖是官網(wǎng)的架構(gòu)圖

Binder可以生成Binding,Binding用來(lái)綁定消息容器的生產(chǎn)者和消費(fèi)者,它有兩種類型,INPUT和OUTPUT,INPUT對(duì)應(yīng)于消費(fèi)者,OUTPUT對(duì)應(yīng)于生產(chǎn)者。

stream為了屏蔽差異,抽象出來(lái)了一個(gè)Binder層,而目前為止,只提供了兩個(gè)框架的實(shí)現(xiàn),通過(guò)具體的實(shí)現(xiàn)來(lái)連接消息中間件。
假如想要通過(guò)stream連接RabbitMQ就使用:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
假如想要通過(guò)stream連接Kafka就使用:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
Stream中的消息通信方式遵循了發(fā)布-訂閱模式,Topic主題進(jìn)行廣播,在RabbitMQ就是Exchange,在Kakfa中就是Topic。
1.3. 標(biāo)準(zhǔn)流程

- Binder: 很方便的連接中間件,屏蔽差異
- Channel: 通道,是隊(duì)列Queue的一種抽象,在消息通訊系統(tǒng)中就是實(shí)現(xiàn)存儲(chǔ)和轉(zhuǎn)發(fā)的媒介,通過(guò)Channe對(duì)隊(duì)列進(jìn)行配置
- Source(源:發(fā)送者)和Sink(水槽:接受者): 簡(jiǎn)單的可理解為參照對(duì)象是Spring Cloud Stream自身,從Stream發(fā)布消息就是輸出,接受消息就是輸入。
1.4. 注解

注解完全是基于官方給的模型而定的!通過(guò)stream使用消息中間件也是非常簡(jiǎn)單的,直接使用以下注解就可以使用。

注意:注解依然是能用的,但是官方明確表示注解已經(jīng)被棄用,棄用并不是不能用,而是用了會(huì)畫橫杠不建議用。但是功能是沒(méi)有問(wèn)題的,低版本的cloud是沒(méi)有被棄用的。針對(duì)于注解和函數(shù)式編程兩種我都會(huì)進(jìn)行使用。
題外話:學(xué)技術(shù)永遠(yuǎn)是這樣,技術(shù)一直在不斷的更新迭代,真正學(xué)習(xí)一個(gè)技術(shù)并不是要掌握編碼使用,而是要掌握他到底是什么,能干什么,要去深入理解他,對(duì)于編碼,我認(rèn)為其實(shí)不是很重要。就算你今天掌握了官方最新用法,回頭人家又改寫法了。


二、基于注解代碼練習(xí)
生產(chǎn)者就是消息發(fā)送者,消費(fèi)者就是消息接受者。這里我就不用kafka了,我直接用的是RabbitMQ。
windows下安裝RabbitMQ:https://blog.csdn.net/weixin_43888891/article/details/126514021
2.1. 消息驅(qū)動(dòng)之生產(chǎn)者
1.創(chuàng)建項(xiàng)目(可以是聚合可以是普通springboot項(xiàng)目)
2.添加pom
因?yàn)槭呛蚏abbitMQ整合,所以就是引入的spring-cloud-starter-stream-rabbit啟動(dòng)器
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<springboot.version>2.6.8</springboot.version>
<springcloud.version>2021.0.3</springcloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${springboot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${springcloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
3.添加application配置
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 消息組件類型
environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務(wù)的整合處理
output: # 這個(gè)名字是一個(gè)通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設(shè)置消息類型,本次為json,文本則設(shè)置“text/plain”
binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置
4.添加接口
public interface IMessageProvider {
public String send();
}
5.添加實(shí)現(xiàn)類
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import javax.annotation.Resource;
import java.util.UUID;
// 可以理解為是一個(gè)消息的發(fā)送管道的定義
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {
// 消息的發(fā)送管道
@Resource
private MessageChannel output;
@Override
public String send() {
String serial = UUID.randomUUID().toString();
// 創(chuàng)建并發(fā)送消息
this.output.send(MessageBuilder.withPayload(serial).build());
System.out.println("***serial: " + serial);
return serial;
}
}
6.添加controller控制器
@RestController
public class SendMessageController {
@Autowired
private IMessageProvider iMessageProvider;
@GetMapping("send")
public String send() {
return iMessageProvider.send();
}
}
7.測(cè)試
(1)首先要保證RabbitMQ是可以訪問(wèn)的:http://localhost:15672

(2)啟動(dòng)項(xiàng)目訪問(wèn):http://localhost:8801/send
下圖波峰代表發(fā)送消息成功

啟動(dòng)后會(huì)創(chuàng)建交換機(jī),名稱就是application.yml當(dāng)中的destination屬性設(shè)置的

注意:停止服務(wù)后并沒(méi)有刪除交換機(jī)?。?!
2.2. 消息驅(qū)動(dòng)之消費(fèi)者
1.創(chuàng)建項(xiàng)目
2.添加pom(pom和發(fā)送者依賴一模一樣)
3.添加application配置
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務(wù)信息;
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 消息組件類型
environment: # 設(shè)置rabbitmq的相關(guān)的環(huán)境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務(wù)的整合處理
input: # 這個(gè)名字是一個(gè)通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設(shè)置消息類型,本次為對(duì)象json,如果是文本則設(shè)置“text/plain”
binder: defaultRabbit # 設(shè)置要綁定的消息服務(wù)的具體設(shè)置
4.添加監(jiān)聽(tīng)(消費(fèi)者只負(fù)責(zé)接受消息)
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("消費(fèi)者1號(hào),------->接收到的消息:" + message.getPayload() + "\t port: " + serverPort);
}
}
5.測(cè)試
(1)啟動(dòng)RabbitMQ
(2)啟動(dòng)發(fā)送消息端服務(wù)
(3)啟動(dòng)消費(fèi)者服務(wù),啟動(dòng)后會(huì)發(fā)現(xiàn),他自動(dòng)會(huì)向這個(gè)交換機(jī)當(dāng)中添加一個(gè)隊(duì)列。

發(fā)送消息:http://localhost:8801/send
接受消息:


注意:當(dāng)停止服務(wù)后消息隊(duì)列會(huì)被自動(dòng)刪除!?。?/strong>
2.3. 目前存在的問(wèn)題
1.依照8802, clone出來(lái)一份運(yùn)行8803,主要用來(lái)演示多個(gè)消費(fèi)者的場(chǎng)景
2.啟動(dòng)8801生產(chǎn)者
3.啟動(dòng)8802消費(fèi)者
4.啟動(dòng)8803消費(fèi)者
當(dāng)三個(gè)服務(wù)都啟動(dòng)后通過(guò)RabbitMQ界面會(huì)發(fā)現(xiàn),一個(gè)交換機(jī)綁定了兩個(gè)隊(duì)列

運(yùn)行后會(huì)發(fā)現(xiàn)存在兩個(gè)問(wèn)題:
有重復(fù)消費(fèi)問(wèn)題消息持久化問(wèn)題
(1)重復(fù)消費(fèi)問(wèn)題:
發(fā)送消息后兩個(gè)消費(fèi)者都收到了消息:http://localhost:8801/send


比如在如下場(chǎng)景中,訂單系統(tǒng)我們做集群部署,都會(huì)從RabbitMQ中獲取訂單信息,那如果一個(gè)訂單同時(shí)被兩個(gè)服務(wù)獲取到,那么就會(huì)造成數(shù)據(jù)錯(cuò)誤,我們得避免這種情況。這時(shí)我們就可以使用Stream中的消息分組來(lái)解決

注意在Stream中處于同一個(gè)group中的多個(gè)消費(fèi)者是競(jìng)爭(zhēng)關(guān)系,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次。不同組是可以全面消費(fèi)的(重復(fù)消費(fèi)),同一組內(nèi)會(huì)發(fā)生競(jìng)爭(zhēng)關(guān)系,只有其中一個(gè)可以消費(fèi)。
(2)消息持久化問(wèn)題:
當(dāng)生產(chǎn)者發(fā)送消息的時(shí)候,消費(fèi)者恰好宕機(jī)了,但是過(guò)一會(huì)消費(fèi)者恢復(fù)了,但是消息卻沒(méi)收到。那也就是意味著消息隊(duì)列是臨時(shí)消息隊(duì)列。針對(duì)于這一點(diǎn),大家也可以測(cè)試一下,加深一下印象。
2.4. 分組解決重復(fù)消費(fèi)問(wèn)題
原理: 微服務(wù)應(yīng)用放置于同一個(gè)group中,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次。同一個(gè)組內(nèi)會(huì)發(fā)生競(jìng)爭(zhēng)關(guān)系,只有其中一個(gè)可以消費(fèi)。
接下來(lái)直接調(diào)整兩個(gè)消費(fèi)者為同一個(gè)組:添加如下配置

當(dāng)兩個(gè)消費(fèi)者都設(shè)置好后啟動(dòng),會(huì)發(fā)現(xiàn)一個(gè)問(wèn)題: 實(shí)際上分到一個(gè)組對(duì)于RabbitMQ來(lái)說(shuō)就是兩個(gè)消費(fèi)者監(jiān)聽(tīng)了一個(gè)隊(duì)列。一個(gè)隊(duì)列那也就意味著,當(dāng)隊(duì)列收到一條消息,哪個(gè)消費(fèi)者誰(shuí)先消費(fèi)就是誰(shuí)的,消費(fèi)完隊(duì)列里面就沒(méi)有了,也就是只有一個(gè)消費(fèi)者能消費(fèi)到消息!
注意:假如不設(shè)置group屬性的時(shí)候,默認(rèn)是啟動(dòng)一個(gè)消費(fèi)者,就會(huì)創(chuàng)建一個(gè)消費(fèi)隊(duì)列,啟動(dòng)多個(gè)服務(wù)就會(huì)創(chuàng)建多個(gè)隊(duì)列。stream默認(rèn)使用的是RabbitMQ的topic交換機(jī)。當(dāng)發(fā)送者向這個(gè)交換機(jī)發(fā)送消息的時(shí)候,兩個(gè)隊(duì)列就都會(huì)接收到。關(guān)于RabbitMQ相關(guān)知識(shí)本篇不記錄,后續(xù)會(huì)專門寫RabbitMQ相關(guān)文章。

最終測(cè)試:8802/8803實(shí)現(xiàn)了輪詢分組,每次只有一個(gè)消費(fèi)者8801模塊的發(fā)的消息只能被8802或8803其中一個(gè)接收到,這樣避免了重復(fù)消費(fèi)。
2.5. 消息持久化
當(dāng)三個(gè)項(xiàng)目都啟動(dòng)著的時(shí)候,現(xiàn)在我們要做幾件事:
停止8802和8803并去除掉8802的分組group: gxs,8803不去分組信息,停止掉項(xiàng)目的時(shí)候會(huì)發(fā)現(xiàn)消息隊(duì)列并沒(méi)有刪除,說(shuō)明一旦設(shè)置分組信息,消息隊(duì)列就不再是臨時(shí)隊(duì)列。

2.8801發(fā)送4條消息啟動(dòng)8802然后消息并沒(méi)有打印,沒(méi)有收到消息(注意8802是去掉分組信息的)再啟動(dòng)8803,有分組屬性配置,后臺(tái)打出來(lái)了MQ上的消息
原因就是:當(dāng)兩個(gè)項(xiàng)目都停止的時(shí)候,隊(duì)列并未刪除,而8803還綁定了這個(gè)隊(duì)列,所以他就算宕機(jī)了,又重啟了,依然可以收到消息。而8802沒(méi)有設(shè)置分組信息,他再啟動(dòng)后系統(tǒng)會(huì)給他創(chuàng)建一個(gè)臨時(shí)隊(duì)列,自然而然收不到之前的消息了。
三、函數(shù)式編程練習(xí)
到此這篇關(guān)于Spring Cloud Stream詳解的文章就介紹到這了,更多相關(guān)Spring Cloud Stream內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot中的@RequestMapping注解的用法示例
@RequestMapping注解是SpringBoot中最常用的注解之一,它可以幫助開(kāi)發(fā)者定義和處理HTTP請(qǐng)求,本篇文章我們將詳細(xì)為大家介紹如何使用SpringBoot中的@RequestMapping注解,感興趣的同學(xué)跟著小編一起來(lái)學(xué)習(xí)吧2023-06-06
SpringBoot使用redis實(shí)現(xiàn)session共享功能
這篇文章主要介紹了pringboot項(xiàng)目使用redis實(shí)現(xiàn)session共享,文中通過(guò)代碼示例講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-05-05
Java導(dǎo)出Excel通用工具類實(shí)例代碼
這篇文章主要給大家介紹了關(guān)于Java導(dǎo)出Excel通用工具類的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04

