JAVA?Reactor中Sinks.Many類三種常見的創(chuàng)建方式及使用
一、Java中的Sinks.Many
在Java編程中,我們經(jīng)常需要處理數(shù)據(jù)流。為了更有效地處理數(shù)據(jù)流,我們可以使用Reactor庫中的Sinks.Many類。這個類提供了一種簡單而強(qiáng)大的方式來處理多個事件流,并且可以通過異步或者同步的方式處理這些事件。
Sinks.Many是Reactor庫中的一個類,它可以用來處理多個事件流。它提供了一種簡單而強(qiáng)大的方式來處理多個事件,可以在任何時間添加、獲取以及完成事件。
Sinks.Many為什么被設(shè)計為 Sinks 類的內(nèi)部接口?
實現(xiàn)細(xì)節(jié)隱藏??: Sinks.Many 的實際實現(xiàn)(如 MulticastReplayProcessor 或其他內(nèi)部類)被封裝在 Sinks 類的內(nèi)部。用戶只需通過工廠方法(如 Sinks.many().multicast())獲取接口,無需關(guān)心底層實現(xiàn)。 ??
接口職責(zé)分離??: Sinks 類作為工廠,統(tǒng)一管理所有類型的 Sink(如 Sinks.One、Sinks.Many),而 Sinks.Many 作為內(nèi)部接口,明確表示它是一個??多播數(shù)據(jù)源??,與單播(Sinks.One)等場景隔離。
二、Sinks.Many的創(chuàng)建
在源碼中可以看待,Sinks.Many提供了三中常見的創(chuàng)建方式。
(1)unicast()
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
這種創(chuàng)建方式中提供設(shè)置背壓緩沖區(qū)的方法

用途:
創(chuàng)建一個 單播(Unicast) 的
Sinks.Many,僅允許 一個訂閱者 訂閱。核心特性:
單訂閱者限制:第二個訂閱者嘗試訂閱時會觸發(fā)
IllegalStateException。背壓支持:通過緩沖區(qū)處理生產(chǎn)者和消費者的速率不匹配,默認(rèn)使用無界緩沖區(qū)(需手動配置限制)。
無歷史數(shù)據(jù):訂閱者只能收到訂閱后產(chǎn)生的數(shù)據(jù)。
適用場景:
點對點通信(如任務(wù)隊列)。
需要嚴(yán)格保證單訂閱者的場景(如資源獨占型操作)。
(2)multicast()
Sinks.Many<String> multicastSink = Sinks.many().multicast().onBackpressureBuffer(100);
這個創(chuàng)建方式,相比較Unicast,就是可以允許多個訂閱者訂閱,同樣提供了多種設(shè)置緩存區(qū)的方式

用途:
創(chuàng)建一個 多播(Multicast) 的Sinks.Many,允許多個訂閱者訂閱。核心特性:
多訂閱者支持:所有訂閱者共享同一數(shù)據(jù)流。
無歷史數(shù)據(jù):新訂閱者只能收到訂閱后產(chǎn)生的數(shù)據(jù),無法獲取之前的數(shù)據(jù)。
背壓策略:默認(rèn)使用無界緩沖區(qū),但可以通過配置限制(如
onBackpressureBuffer(int capacity))。
適用場景:
實時廣播(如股票行情推送)。
需要多個消費者并行處理相同數(shù)據(jù)的場景。
(3)replay()
Sinks.Many<String> replaySink = Sinks.many().replay().all();
這個方法中提供了限制存放歷史數(shù)據(jù)的方法

用途:
創(chuàng)建一個 支持?jǐn)?shù)據(jù)重放(Replay) 的Sinks.Many,允許多個訂閱者訂閱,并回放歷史數(shù)據(jù)。核心特性:
歷史數(shù)據(jù)緩存:新訂閱者可以收到訂閱前一定數(shù)量的數(shù)據(jù)(通過
limit(int)或time(Duration)配置)。多訂閱者支持:與
multicast()類似,但增加了數(shù)據(jù)重放能力。內(nèi)存管理:緩存數(shù)據(jù)量或時間窗口可配置,避免內(nèi)存無限增長。
適用場景:
需要新訂閱者獲取歷史數(shù)據(jù)的場景(如聊天記錄回放)。
實時監(jiān)控面板(多個訂閱者需要看到完整上下文)。
三、Sinks.Many的使用
(1)添加事件
一旦我們創(chuàng)建了一個Sinks.Many對象,我們可以使用emitNext()方法來添加一個事件。這個方法接受一個參數(shù),表示要添加的事件。
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
sink.emitNext("Event 1");
sink.emitNext("Event 2");
sink.emitNext("Event 3");我們首先創(chuàng)建了一個Sinks.Many對象,然后使用emitNext()方法添加了三個事件。
當(dāng)然,如果我們選擇接受Flux流中的數(shù)據(jù)的時候,可以這樣添加數(shù)據(jù)
//創(chuàng)建Flux流
Flux<String> flux = Flux.<String>create(sink->{
sink.next("你好");
sink.complete();
});
//創(chuàng)建Sinks.Many處理流數(shù)據(jù)
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
//訂閱Flux流,并將數(shù)據(jù)交給sink處理(像做流數(shù)據(jù)的緩存,篩選流數(shù)據(jù))
flux.subscribe(
sink::tryEmitNext,
sink::tryEmitError,
sink::tryEmitComplete
);(2)獲取流數(shù)據(jù)
我們可以使用Sinks.Many對象來獲取已添加的事件??梢允褂胊sFlux()方法將Sinks.Many對象轉(zhuǎn)換為一個Flux對象,然后使用Flux對象的方法來訂閱和處理事件。
// 1. .unicast()使用創(chuàng)建單播 Sink
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
// 2. 將 Sink 轉(zhuǎn)換為 Flux(供訂閱者訂閱)
Flux<String> flux = unicastSink.asFlux();
// 3. 第一個訂閱者(合法)
flux.subscribe(
data -> System.out.println("訂閱者1收到數(shù)據(jù): " + data),
error -> System.err.println("訂閱者1發(fā)生錯誤: " + error),
() -> System.out.println("訂閱者1完成")
);
// 4. 推送數(shù)據(jù)到 Sink
unicastSink.tryEmitNext("數(shù)據(jù)1");
unicastSink.tryEmitNext("數(shù)據(jù)2");
// 5. 嘗試第二個訂閱者(會拋出 IllegalStateException)
try {
flux.subscribe(
data -> System.out.println("訂閱者2收到數(shù)據(jù): " + data),
error -> System.err.println("訂閱者2發(fā)生錯誤: " + error),
() -> System.out.println("訂閱者2完成")
);
} catch (IllegalStateException e) {
System.err.println("訂閱者2訂閱失敗: " + e.getMessage());
}
// 6. 關(guān)閉 Sink(發(fā)送完成信號)
unicastSink.tryEmitComplete();
這里我使用了unicast()方法創(chuàng)建的Sinks.Many,這個時候我通過asFlux()方法轉(zhuǎn)換的flux流,只能被一個訂閱者訂閱到,第二個訂閱者,訂閱的時候就出報錯,當(dāng)然,如果你想要多個訂閱者訂閱,可以使用multicast()或者replay()方式創(chuàng)建,
四、熱冷流
上面三種方式創(chuàng)建的是熱流。
熱流:數(shù)據(jù)獨立于訂閱者持續(xù)生成,多訂閱者共享實時數(shù)據(jù),適用于實時事件推送。
在添加事件代碼中我也有使用到Flux.create()方法創(chuàng)建流,要注意這里創(chuàng)建的是冷流。
冷流:每次調(diào)用 subscribe() 時,會觸發(fā) Flux.create() 的回調(diào)函數(shù)(即 Consumer<SynchronousSink<T>>),??重新生成數(shù)據(jù)流??。多個訂閱者之間數(shù)據(jù)獨立。這個跟ThreadLocal有點像,F(xiàn)lux.create()會為每一個訂閱者創(chuàng)建單獨隔離的數(shù)據(jù)流,保證每一條流中數(shù)據(jù)互不影響。
冷熱流的選擇;
- 若需要多個訂閱者共享實時數(shù)據(jù) → 熱流。
- 若需要每個訂閱者獨立消費完整數(shù)據(jù) → 冷流。
- 若需要歷史數(shù)據(jù) → 使用
replay()緩存。
總結(jié)
到此這篇關(guān)于JAVA Reactor中Sinks.Many類三種常見的創(chuàng)建方式及使用的文章就介紹到這了,更多相關(guān)JAVA Reactor中Sinks.Many類內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringMVC核心DispatcherServlet處理流程分步詳解
這篇文章主要介紹了SpringMVC核心之中央調(diào)度器DispatcherServlet的相關(guān)知識,包括SpringMVC請求處理過程及SrpingMVC容器和spring IOC容器關(guān)系,需要的朋友可以參考下2023-04-04
mybatis3.4.6 批量更新 foreach 遍歷map 的正確姿勢詳解
這篇文章主要介紹了mybatis3.4.6 批量更新 foreach 遍歷map 的正確姿勢詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-11-11
25行Java代碼將普通圖片轉(zhuǎn)換為字符畫圖片和文本的實現(xiàn)
這篇文章主要介紹了25行Java代碼將普通圖片轉(zhuǎn)換為字符畫圖片和文本的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04

