Project?Reactor源碼解析publishOn使用示例
功能分析
相關(guān)示例源碼:github.com/chentianmin…
public final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch)

在onNext()、onComplete()、onError()方法進(jìn)行線程切換,publishOn()使得它下游的消費(fèi)階段異步執(zhí)行。
- scheduler:線程切換的調(diào)度器,
Scheduler用來(lái)生成實(shí)際執(zhí)行異步任務(wù)的Worker。 - delayError:是否延時(shí)轉(zhuǎn)發(fā)
Error。如果為true,當(dāng)收到上游的Error時(shí),會(huì)等隊(duì)列中的元素消費(fèi)完畢后再向下游轉(zhuǎn)發(fā)Error。否則會(huì)立即轉(zhuǎn)發(fā)Error,可能導(dǎo)致隊(duì)列中的元素丟失。默認(rèn)為true。 - prefetch:預(yù)取元素的數(shù)量,同時(shí)也是隊(duì)列的容量。默認(rèn)值為
Queues.SMALL_BUFFER_SIZE,該值通過(guò)配置進(jìn)行修改。

代碼示例
prefetch
/**
* 每隔delayMillis生產(chǎn)一個(gè)元素
*/
protected Flux<Integer> delayPublishFlux(int delayMillis, int startInclusive, int endExclusive) {
return Flux.create(fluxSink -> {
IntStream.range(startInclusive, endExclusive)
.forEach(i -> {
// 同步next
sleep(delayMillis);
logInt(i, "生產(chǎn)");
fluxSink.next(i);
});
fluxSink.complete();
});
}
@Test
public void testPreFetch() {
delayPublishFlux(1000, 1, 5)
.doOnRequest(i -> logLong(i, "request"))
.publishOn(Schedulers.boundedElastic(), 2)
.subscribe(i -> logInt(i, "消費(fèi)"));
sleep(10000);
}

每次會(huì)都向上游請(qǐng)求2個(gè)元素。另外還能發(fā)現(xiàn),從第二個(gè)request開始,線程發(fā)生了切換。
delayError
/**
* 每隔delayMillis生產(chǎn)一個(gè)元素,最后發(fā)送Error
*/
protected Flux<Integer> delayPublishFluxError(int delayMillis, int startInclusive, int endExclusive) {
return Flux.create(fluxSink -> {
IntStream.range(startInclusive, endExclusive)
.forEach(i -> {
// 同步next
sleep(delayMillis);
logInt(i, "生產(chǎn)");
fluxSink.next(i);
});
fluxSink.error(new RuntimeException("發(fā)布錯(cuò)誤!"));
});
}
@Test
public void testDelayError() {
delayPublishFluxError(500, 1, 5)
.publishOn(Schedulers.boundedElastic())
// 只是為了消費(fèi)慢一點(diǎn)
.doOnNext(i -> sleep(1000))
.subscribe(i -> logInt(i, "消費(fèi)"));
sleep(10000);
}

元素消費(fèi)完才觸發(fā)Error!
@Test
public void testNotDelayError() {
delayPublishFluxError(500, 1, 5)
.publishOn(Schedulers.boundedElastic(), false, 256)
// 只是為了消費(fèi)慢一點(diǎn)
.doOnNext(i -> sleep(1000))
.subscribe(i -> logInt(i, "消費(fèi)"));
sleep(10000);
}

元素還沒消費(fèi)完就觸發(fā)Error!
源碼分析
首先看一下publishOn()操作符在裝配階段做了什么,直接查看Flux#publishOn()源碼。
Flux#publishOn()

publishOn()裝配階段重點(diǎn)是創(chuàng)建了FluxPublishOn對(duì)象。
接下來(lái),我們分析訂閱階段發(fā)生了什么。一個(gè)Publisher在訂閱的時(shí)候調(diào)用的是其subscribe()方法,因此我們繼續(xù)看Flux#subscribe()源碼。
Flux#subscribe()

在Flux#subscribe()方法的實(shí)現(xiàn)中,如果上游Publisher是OptimizableOperator類型,實(shí)際的Subscriber是通過(guò)調(diào)用該InternalFluxOperator#subscribeOrReturn()方法返回的。如果返回值為null,直接return。
對(duì)于publishOn()操作符來(lái)說(shuō),裝配階段創(chuàng)建的FluxPublishOn就是OptimizableOperator類型。所以繼續(xù)查看FluxPublishOn#subscribeOrReturn()源碼。
FluxPublishOn#subscribeOrReturn()

可以看到,方法返回的是PublishOnSubscriber,它包裝了原始的Subscriber。
在后續(xù)的訂閱階段一定會(huì)調(diào)用其onSubscribe()方法,在運(yùn)行階段一定會(huì)調(diào)用其onNext()方法。我們先看FluxPublishOn#onSubscribe()源碼。
FluxPublishOn#onSubscribe()

在onSubscribe()實(shí)現(xiàn)中,分為同步隊(duì)列融合、異步隊(duì)列融合以及非融合方式處理。
如果上游的Subscription是QueueSubscription類型,則會(huì)進(jìn)行隊(duì)列融合。具體采用同步還是異步,取決于該QueueSubscription#requestFusion()實(shí)現(xiàn)。
- 同步隊(duì)列融合:復(fù)用當(dāng)前隊(duì)列,繼續(xù)調(diào)用下游
onSubscribe()方法,但不會(huì)繼續(xù)調(diào)用上游request()方法。 - 異步隊(duì)列融合:復(fù)用當(dāng)前隊(duì)列,然后繼續(xù)調(diào)用下游
onSubscribe()以及上游request()方法,請(qǐng)求數(shù)量是prefetch。 - 非融合:創(chuàng)建一個(gè)新的隊(duì)列,然后繼續(xù)調(diào)用下游
onSubscribe()以及上游request()方法,請(qǐng)求數(shù)量是prefetch。
接下來(lái),我們從源碼角度分別介紹上述三種方式的處理邏輯,首先介紹非融合方式。
非融合
先看如下代碼示例,該代碼會(huì)以非融合方式執(zhí)行。
@Test
public void testNoFuse() {
delayPublishFlux(1000, 1, 5)
.publishOn(Schedulers.boundedElastic())
.subscribe(i -> logInt(i, "消費(fèi)"));
sleep(10000);
}

間隔1s生產(chǎn)消費(fèi)元素!
在消費(fèi)階段,一定會(huì)調(diào)用FluxPublishOn#onNext()方法。
FluxPublishOn#onNext()

我們重點(diǎn)關(guān)注非融合方式執(zhí)行邏輯,其實(shí)只做了2件事:
- 將下發(fā)的元素添加到隊(duì)列中,該隊(duì)列就是
onSubscribe()階段創(chuàng)建的新隊(duì)列。 - 調(diào)用
trySchedule()方法進(jìn)行調(diào)度。
繼續(xù)看FluxPublishOn#trySchedule()源碼。
FluxPublishOn#trySchedule()

這里其實(shí)就是交由woker異步執(zhí)行,后續(xù)會(huì)執(zhí)行FluxPublishOn.run()方法。
FluxPublishOn#run()

在run()方法執(zhí)行的時(shí)候,分為3段邏輯:
- 如果是輸出融合,執(zhí)行
runBackfused()方法。 - 如果是同步隊(duì)列融合,執(zhí)行
runSync()方法。 - 否則,執(zhí)行
runAsync()方法。
對(duì)于當(dāng)前例子,實(shí)際執(zhí)行的是runAsync()方法,繼續(xù)查看其源碼。
FluxPublishOn#runAsync()

runAsync()做的事情比較簡(jiǎn)單,就是排空隊(duì)列中的元素下發(fā)給下游。同時(shí)在這里會(huì)繼續(xù)調(diào)用request()向上游請(qǐng)求數(shù)據(jù),這也是前面說(shuō)的從第二個(gè)request()開始會(huì)進(jìn)行線程切換的原因。
另外這里還會(huì)調(diào)用checkTerminated(),檢查終止情況。
FluxPublishOn#checkTerminated()

如果delayError=true,必須當(dāng)前隊(duì)列為空是才會(huì)轉(zhuǎn)發(fā)Error。如果delayError=false,則直接轉(zhuǎn)發(fā)Error。繼續(xù)查看onComplete()方法。
FluxPublishOn#onComplete()

如果未結(jié)束,將done標(biāo)記設(shè)置為true,然后再次調(diào)用trySchedule()進(jìn)行調(diào)度。后續(xù)再被調(diào)度到的時(shí)候,如果隊(duì)列已經(jīng)排空,才會(huì)調(diào)用下游onComplete(),觸發(fā)完成。
小結(jié)
簡(jiǎn)單總結(jié)一下非融合執(zhí)行過(guò)程:
在onSubscribe()時(shí)創(chuàng)建一個(gè)隊(duì)列,在onNext()時(shí)將上游下發(fā)的元素添加到隊(duì)列中,然后異步排空隊(duì)列中的元素,繼續(xù)下發(fā)給下游。
同步隊(duì)列融合
以下代碼會(huì)以同步隊(duì)列融合方式執(zhí)行。
@Test
public void testSyncFuse() {
Flux.just(1, 2 ,3, 4, 5)
.publishOn(Schedulers.boundedElastic())
.subscribe(this::logInt);
sleep(10000);
}
因?yàn)?code>Flux.just()對(duì)應(yīng)的Subscription是SynchronousSubscription,其requestFusion()方法實(shí)現(xiàn)如下:
SynchronousSubscription#requestFusion()

此時(shí)返回的是SYNC,執(zhí)行同步隊(duì)列融合。
前面提到過(guò),同步隊(duì)列融合會(huì)復(fù)用當(dāng)前隊(duì)列,繼續(xù)調(diào)用下游onSubscribe()方法,但不會(huì)繼續(xù)調(diào)用上游request()方法。
這意味著,此時(shí)FluxPublishOn#onNext()和FluxPublishOn#onComplete()方法并不會(huì)調(diào)用。但是FluxPublishOn#request()依然會(huì)被下游調(diào)用到。
FluxPublishOn#request()

在request()方法中還是會(huì)調(diào)用trySchedule(),后續(xù)會(huì)異步調(diào)用runSync()方法(前面已經(jīng)分析了)。
對(duì)于非融合方式,trySchedule()也會(huì)執(zhí)行,只是這次調(diào)度的時(shí)候,隊(duì)列中還沒有數(shù)據(jù)被添加進(jìn)去。
FluxPublishOn#runSync()

runSync()實(shí)現(xiàn)上runAsync()差不多,也是排空隊(duì)列的元素,繼續(xù)下發(fā)給下游。不同的點(diǎn)是少了request()調(diào)用,以及取消完成控制有差異。
小結(jié)
簡(jiǎn)單總結(jié)一下同步隊(duì)列融合執(zhí)行過(guò)程:
在onSubsrribe()時(shí)直接復(fù)用上游QueueSubscription作為隊(duì)列,不會(huì)調(diào)用上游request()請(qǐng)求數(shù)據(jù),在自身request()時(shí)異步排空隊(duì)列中的元素,繼續(xù)下發(fā)給下游。
異步隊(duì)列融合
以下代碼會(huì)以異步隊(duì)列融合方式執(zhí)行。
@Test
public void testAsyncFuse() {
Flux.just(1, 2, 3, 4, 5)
.windowUntil(i -> i % 3 == 0)
.publishOn(Schedulers.boundedElastic())
.flatMap(Function.identity())
.subscribe(this::logInt);
sleep(10000);
}
因?yàn)?code>windowUntil()對(duì)應(yīng)的Subscription是WindowPredicateMain,其requestFusion()方法實(shí)現(xiàn)如下:
WindowPredicateMain#requestFusion()

此時(shí)返回ASYNC,執(zhí)行異步隊(duì)列融合。接下來(lái)再看一下FluxPublishOn#onNext()源碼。
FluxPublishOn#onNext()

注意,此時(shí)onNext()方法參數(shù)是null,表明上游并沒有真正下發(fā)元素,可以將其看做是一個(gè)觸發(fā)Worker調(diào)度的信號(hào)。后續(xù)還是會(huì)異步執(zhí)行runAsync()方法,這里就不再分析了。
這其實(shí)也很容易理解:異步隊(duì)列融合直接復(fù)用了上游的QueueSubscription作為隊(duì)列,真正的數(shù)據(jù)應(yīng)該由這個(gè)隊(duì)列下發(fā)。
總結(jié)
簡(jiǎn)單總結(jié)一下同步隊(duì)列融合執(zhí)行過(guò)程:
在onSubsrribe()時(shí)直接復(fù)用上游QueueSubscription作為隊(duì)列,在onNext()時(shí)接收上游信號(hào),異步排空隊(duì)列中的元素,繼續(xù)下發(fā)給下游。
非融合、同步隊(duì)列融合、異步隊(duì)列融合比較如下:

以上就是Project Reactor源碼解析publishOn使用示例的詳細(xì)內(nèi)容,更多關(guān)于Project Reactor publishOn的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
JavaWeb實(shí)現(xiàn)用戶登錄注冊(cè)功能實(shí)例代碼(基于Servlet+JSP+JavaBean模式)
這篇文章主要基于Servlet+JSP+JavaBean開發(fā)模式實(shí)現(xiàn)JavaWeb用戶登錄注冊(cè)功能實(shí)例代碼,非常實(shí)用,本文介紹的非常詳細(xì),具有參考借鑒價(jià)值,感興趣的朋友一起看看吧2016-05-05
JetBrains IntelliJ IDEA 優(yōu)化教超詳細(xì)程
這篇文章主要介紹了JetBrains IntelliJ IDEA 優(yōu)化教超詳細(xì)程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03
SpringBoot實(shí)現(xiàn)列表數(shù)據(jù)導(dǎo)出為Excel文件
這篇文章主要為大家詳細(xì)介紹了在Spring?Boot框架中如何將列表數(shù)據(jù)導(dǎo)出為Excel文件,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解下2024-02-02
mybatis-plus 實(shí)現(xiàn)分頁(yè)查詢的示例代碼
本文介紹了在MyBatis-Plus中實(shí)現(xiàn)分頁(yè)查詢,包括引入依賴、配置分頁(yè)插件、使用分頁(yè)查詢以及在控制器中調(diào)用分頁(yè)查詢的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-11-11

