Java反應(yīng)式框架Reactor中的Mono和Flux

1. 前言
最近寫關(guān)于響應(yīng)式編程的東西有點(diǎn)多,很多同學(xué)反映對(duì)Flux和Mono這兩個(gè)Reactor中的概念有點(diǎn)懵逼。但是目前Java響應(yīng)式編程中我們對(duì)這兩個(gè)對(duì)象的接觸又最多,諸如Spring WebFlux、RSocket、R2DBC。我開(kāi)始也對(duì)這兩個(gè)對(duì)象頭疼,所以今天我們就簡(jiǎn)單來(lái)探討一下它們。
2. 響應(yīng)流的特點(diǎn)
要搞清楚這兩個(gè)概念,必須說(shuō)一下響應(yīng)流規(guī)范。它是響應(yīng)式編程的基石。他具有以下特點(diǎn):
響應(yīng)流必須是無(wú)阻塞的。響應(yīng)流必須是一個(gè)數(shù)據(jù)流。它必須可以異步執(zhí)行。并且它也應(yīng)該能夠處理背壓。
背壓是反應(yīng)流中的一個(gè)重要概念,可以理解為,生產(chǎn)者可以感受到消費(fèi)者反饋的消費(fèi)壓力,并根據(jù)壓力進(jìn)行動(dòng)態(tài)調(diào)整生產(chǎn)速率。形象點(diǎn)可以按照下面理解:

3. Publisher
由于響應(yīng)流的特點(diǎn),我們不能再返回一個(gè)簡(jiǎn)單的POJO對(duì)象來(lái)表示結(jié)果了。必須返回一個(gè)類似Java中的Future的概念,在有結(jié)果可用時(shí)通知消費(fèi)者進(jìn)行消費(fèi)響應(yīng)。
Reactive Stream規(guī)范中這種被定義為Publisher<T> ,Publisher<T>是一個(gè)可以提供0-N個(gè)序列元素的提供者,并根據(jù)其訂閱者Subscriber<? super T>的需求推送元素。一個(gè)Publisher<T>可以支持多個(gè)訂閱者,并可以根據(jù)訂閱者的邏輯進(jìn)行推送序列元素。下面這個(gè)Excel計(jì)算就能說(shuō)明一些Publisher<T>的特點(diǎn)。

A1-A9就可以看做Publisher<T>及其提供的元素序列。A10-A13分別是求和函數(shù)SUM(A1:A9)、平均函數(shù)AVERAGE(A1:A9)、最大值函數(shù)MAX(A1:A9)、最小值函數(shù)MIN(A1:A9),可以看作訂閱者Subscriber。假如說(shuō)我們沒(méi)有A10-A13,那么A1-A9就沒(méi)有實(shí)際意義,它們并不產(chǎn)生計(jì)算。這也是響應(yīng)式的一個(gè)重要特點(diǎn):當(dāng)沒(méi)有訂閱時(shí)發(fā)布者什么也不做。
而Flux和Mono都是Publisher<T>在Reactor 3實(shí)現(xiàn)。Publisher<T>提供了subscribe方法,允許消費(fèi)者在有結(jié)果可用時(shí)進(jìn)行消費(fèi)。如果沒(méi)有消費(fèi)者Publisher<T>不會(huì)做任何事情,他根據(jù)消費(fèi)情況進(jìn)行響應(yīng)。 Publisher<T>可能返回零或者多個(gè),甚至可能是無(wú)限的,為了更加清晰表示期待的結(jié)果就引入了兩個(gè)實(shí)現(xiàn)模型Mono和Flux。
4. Flux
Flux 是一個(gè)發(fā)出(emit)0-N個(gè)元素組成的異步序列的Publisher<T>,可以被onComplete信號(hào)或者onError信號(hào)所終止。在響應(yīng)流規(guī)范中存在三種給下游消費(fèi)者調(diào)用的方法 onNext, onComplete, 和onError。下面這張圖表示了Flux的抽象模型:

以上的的講解對(duì)于初次接觸反應(yīng)式編程的依然是難以理解的,所以這里有一個(gè)循序漸進(jìn)的理解過(guò)程。
有些類比并不是很妥當(dāng),但是對(duì)于你循序漸進(jìn)的理解這些新概念還是有幫助的。
傳統(tǒng)數(shù)據(jù)處理
我們?cè)谄匠J沁@么寫的:
public List<ClientUser> allUsers() {
return Arrays.asList(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
我們通過(guò)迭代返回值List來(lái)get這些元素進(jìn)行再處理(消費(fèi)),這種方式有點(diǎn)類似廚師做了很多菜,吃不吃在于食客。需要食客主動(dòng)去來(lái)吃就行了(pull的方式),至于喜歡吃什么不喜歡吃什么自己隨意,怎么吃也自己隨意。
流式數(shù)據(jù)處理
在Java 8中我們可以改寫為流的表示:
public Stream<ClientUser> allUsers() {
return Stream.of(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
依然是廚師做了很多菜,但是這種就更加高級(jí)了一些,提供了菜品的搭配方式(不包含具體細(xì)節(jié)),食客可以按照說(shuō)明根據(jù)自己的習(xí)慣搭配著去吃,一但開(kāi)始概不退換,吃完為止,過(guò)期不候。
反應(yīng)式數(shù)據(jù)處理
在Reactor中我們又可以改寫為Flux表示:
public Flux<ClientUser> allUsers(){
return Flux.just(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
這時(shí)候食客只需要訂餐就行了,做好了自然就呈上來(lái),而且可以隨時(shí)根據(jù)食客的飯量進(jìn)行調(diào)整。如果沒(méi)有食客訂餐那么廚師就什么都不用做。當(dāng)然不止有這么點(diǎn)特性,不過(guò)對(duì)于方便我們理解來(lái)說(shuō)這就夠了。
5. Mono
Mono 是一個(gè)發(fā)出(emit)0-1個(gè)元素的Publisher<T>,可以被onComplete信號(hào)或者onError信號(hào)所終止。

這里就不翻譯了,整體和Flux差不多,只不過(guò)這里只會(huì)發(fā)出0-1個(gè)元素。也就是說(shuō)不是有就是沒(méi)有。象Flux一樣,我們來(lái)看看Mono的演化過(guò)程以幫助理解。
傳統(tǒng)數(shù)據(jù)處理
public ClientUser currentUser () {
return isAuthenticated ? new ClientUser("felord.cn", "reactive") : null;
}
直接返回符合條件的對(duì)象或者null。
Optional的處理方式
public Optional<ClientUser> currentUser () {
return isAuthenticated ? Optional.of(new ClientUser("felord.cn", "reactive"))
: Optional.empty();
}
這個(gè)Optional我覺(jué)得就有反應(yīng)式的那種味兒了,當(dāng)然它并不是反應(yīng)式。當(dāng)我們不從返回值Optional取其中具體的對(duì)象時(shí),我們不清楚里面到底有沒(méi)有,但是Optional是一定客觀存在的,不會(huì)出現(xiàn)NPE問(wèn)題。
反應(yīng)式數(shù)據(jù)處理
public Mono<ClientUser> currentUser () {
return isAuthenticated ? Mono.just(new ClientUser("felord.cn", "reactive"))
: Mono.empty();
}
和Optional有點(diǎn)類似的機(jī)制,當(dāng)然Mono不是為了解決NPE問(wèn)題的,它是為了處理響應(yīng)流中單個(gè)值(也可能是Void)而存在的。
6. 總結(jié)
Flux和Mono是Java反應(yīng)式中的重要概念,但是很多同學(xué)包括我在開(kāi)始都難以理解它們。這其實(shí)是規(guī)定了兩種流式范式,這種范式讓數(shù)據(jù)具有一些新的特性,比如基于發(fā)布訂閱的事件驅(qū)動(dòng),異步流、背壓等等。另外數(shù)據(jù)是推送(Push)給消費(fèi)者的以區(qū)別于平時(shí)我們的拉(Pull)模式。同時(shí)我們可以像Stream Api一樣使用類似map、flatmap等操作符(operator)來(lái)操作它們。對(duì)Flux和Mono這兩個(gè)概念需要花一些時(shí)間去理解它們,不能操之過(guò)急。
到此這篇關(guān)于Java反應(yīng)式框架Reactor中的Mono和Flux的文章就介紹到這了,更多相關(guān)Java框架 Reactor中的Mono和Flux內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
spring/springboot整合curator遇到的坑及解決
這篇文章主要介紹了spring/springboot整合curator遇到的坑及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-05-05
Calcite使用SQL實(shí)現(xiàn)查詢excel內(nèi)容
因?yàn)閏alcite本身沒(méi)有excel的適配器,?所以本文將模仿calcite-file,?搞一個(gè)calcite-file-excel實(shí)現(xiàn)查詢excel內(nèi)容,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-01-01
教你怎么用SpringBoot+Mybati-Plus快速搭建代碼
Mybatis自身通過(guò)了逆向工程來(lái)幫助我們快速生成代碼,但Mybatis-plus卻更加強(qiáng)大,不僅僅可以生成dao,pojo,mapper,還有基本的controller和service層代碼,接下來(lái)我們來(lái)寫一個(gè)簡(jiǎn)單的人門案例是看看如何mybatis-plus是怎么實(shí)現(xiàn)的,需要的朋友可以參考下2021-06-06
基于SpringBoot構(gòu)建電商秒殺項(xiàng)目代碼實(shí)例
這篇文章主要介紹了基于SpringBoot構(gòu)建電商秒殺項(xiàng)目代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-05-05
Java 實(shí)戰(zhàn)范例之線上新聞平臺(tái)系統(tǒng)的實(shí)現(xiàn)
讀萬(wàn)卷書不如行萬(wàn)里路,只學(xué)書上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用java+jsp+jdbc+mysql實(shí)現(xiàn)一個(gè)線上新聞平臺(tái)系統(tǒng),大家可以在過(guò)程中查缺補(bǔ)漏,提升水平2021-11-11
SpringBoot實(shí)現(xiàn)動(dòng)態(tài)多線程并發(fā)定時(shí)任務(wù)
這篇文章主要為大家詳細(xì)介紹了SpringBoot實(shí)現(xiàn)動(dòng)態(tài)多線程并發(fā)定時(shí)任務(wù),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-05-05
Java多線程程序中synchronized修飾方法的使用實(shí)例
synchronized關(guān)鍵字主要北用來(lái)進(jìn)行線程同步,這里我們主要來(lái)演示Java多線程程序中synchronized修飾方法的使用實(shí)例,需要的朋友可以參考下:2016-06-06

