Kotlin協(xié)程flowOn與線程切換超詳細(xì)示例介紹
示例代碼
本文分析示例代碼如下:
launch(Dispatchers.Main) {
flow {
emit(1)
emit(2)
}.flowOn(Dispatchers.IO).collect {
delay(1000)
withContext(Dispatchers.IO) {
Log.d("liduo", "$it")
}
Log.d("liduo", "$it")
}
}
一.flowOn方法
flowOn方法用于將上游的流切換到指定協(xié)程上下文的調(diào)度器中執(zhí)行,同時(shí)不會(huì)把協(xié)程上下文暴露給下游的流,即flowOn方法中協(xié)程上下文的調(diào)度器不會(huì)對(duì)下游的流生效。如下面這段代碼所示:
launch(Dispatchers.Main) {
flow {
emit(2) // 執(zhí)行在IO線程池
}.flowOn(Dispatchers.IO).map {
it + 1 // 執(zhí)行在Default線程池
}.flowOn(Dispatchers.Default).collect {
Log.d("liduo", "$it") //執(zhí)行在主線程
}
}接下來(lái),分析一下flowOn方法,代碼如下:
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
// 檢查當(dāng)前協(xié)程沒(méi)有執(zhí)行結(jié)束
checkFlowContext(context)
return when {
// 為空,則返回自身
context == EmptyCoroutineContext -> this
// 如果是可融合的Flow,則嘗試融合操作,獲取新的流
this is FusibleFlow -> fuse(context = context)
// 其他情況,包裝成可融合的Flow
else -> ChannelFlowOperatorImpl(this, context = context)
}
}
// 確保Job不為空
private fun checkFlowContext(context: CoroutineContext) {
require(context[Job] == null) {
"Flow context cannot contain job in it. Had $context"
}
}
在flowOn方法中,首先會(huì)檢查方法所在的協(xié)程是否執(zhí)行結(jié)束。如果沒(méi)有結(jié)束,則會(huì)執(zhí)行判斷語(yǔ)句,這里flowOn方法傳入的上下文不是空上下文,且通過(guò)flow方法構(gòu)建出的Flow對(duì)象也不是FusibleFlow類(lèi)型的對(duì)象,因此這里會(huì)走到else分支,將上游flow方法創(chuàng)建的Flow對(duì)象和上下文包裝成ChannelFlowOperatorImpl類(lèi)型的對(duì)象。
1.ChannelFlowOperatorImpl類(lèi)
ChannelFlowOperatorImpl類(lèi)繼承自ChannelFlowOperator類(lèi),用于將上游的流包裝成一個(gè)ChannelFlow對(duì)象,它的繼承關(guān)系如下圖所示:

通過(guò)上圖可以知道,ChannelFlowOperatorImpl類(lèi)最終繼承了ChannelFlow類(lèi),代碼如下:
internal class ChannelFlowOperatorImpl<T>(
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
// 用于流融合時(shí)創(chuàng)建新的流
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)
// 若當(dāng)前的流不需要通過(guò)Channel即可實(shí)現(xiàn)正常工作時(shí),會(huì)調(diào)用此方法
override fun dropChannelOperators(): Flow<T>? = flow
// 觸發(fā)對(duì)下一級(jí)流進(jìn)行收集
override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
}
二.collect方法
在Kotlin協(xié)程:Flow基礎(chǔ)原理中講到,當(dāng)執(zhí)行collect方法時(shí),內(nèi)部會(huì)調(diào)用最后產(chǎn)生的Flow對(duì)象的collect方法,代碼如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector<T> {<!--{C}%3C!%2D%2D%20%2D%2D%3E--> override suspend fun emit(value: T) = action(value) })public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
這個(gè)最后產(chǎn)生的Flow對(duì)象就是ChannelFlowOperatorImpl類(lèi)對(duì)象。
1.ChannelFlowOperator類(lèi)的collect方法
ChannelFlowOperatorImpl類(lèi)沒(méi)有重寫(xiě)collect方法,因此調(diào)用的是它的父類(lèi)ChannelFlowOperator類(lèi)的collect方法,代碼如下:
override suspend fun collect(collector: FlowCollector<T>) {
// OPTIONAL_CHANNEL為默認(rèn)值,這里滿足條件,之后會(huì)詳細(xì)講解
if (capacity == Channel.OPTIONAL_CHANNEL) {
// 獲取當(dāng)前協(xié)程的上下文
val collectContext = coroutineContext
// 計(jì)算新的上下文
val newContext = collectContext + context
// 如果前后上下文沒(méi)有發(fā)生變化
if (newContext == collectContext)
// 直接觸發(fā)對(duì)下一級(jí)流的收集
return flowCollect(collector)
// 如果上下文發(fā)生變化,但不需要切換線程
if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
// 切換協(xié)程上下文,調(diào)用flowCollect方法觸發(fā)下一級(jí)流的收集
return collectWithContextUndispatched(collector, newContext)
}
// 調(diào)用父類(lèi)的collect方法
super.collect(collector)
}
// 獲取當(dāng)前協(xié)程的上下文,該方法會(huì)被編譯器處理
@SinceKotlin("1.3")
@Suppress("WRONG_MODIFIER_TARGET")
@InlineOnly
public suspend inline val coroutineContext: CoroutineContext
get() {
throw NotImplementedError("Implemented as intrinsic")
}ChannelFlowOperator類(lèi)的collect方法在設(shè)計(jì)上與協(xié)程的withContext方法設(shè)計(jì)思路是一致的:在方法內(nèi)根據(jù)上下文的不同情況進(jìn)行判斷,在必要時(shí)才會(huì)切換線程去執(zhí)行任務(wù)。
通過(guò)flowOn方法創(chuàng)建的ChannelFlowOperatorImpl類(lèi)對(duì)象,參數(shù)capacity為默認(rèn)值OPTIONAL_CHANNEL。因此代碼在執(zhí)行時(shí)會(huì)進(jìn)入到判斷中,但因?yàn)槲覀冎付松舷挛臑镈ispatchers.IO,因此上下文發(fā)生了變化,同時(shí)攔截器也發(fā)生了變化,所以最后會(huì)調(diào)用ChannelFlowOperator類(lèi)的父類(lèi)的collect方法,也就是ChannelFlow類(lèi)的collect方法。
2.ChannelFlow類(lèi)的collect方法
ChannelFlow類(lèi)的代碼如下:
override suspend fun collect(collector: FlowCollector<T>): Unit =
coroutineScope {
collector.emitAll(produceImpl(this))
}在ChannelFlow類(lèi)的collect方法中,首先通過(guò)coroutineScope方法創(chuàng)建了一個(gè)作用域協(xié)程,接著調(diào)用了produceImpl方法,代碼如下:
public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)produceImpl方法內(nèi)部調(diào)用了produce方法,并且傳入了待執(zhí)行的任務(wù)collectToFun。
produce方法在Kotlin協(xié)程:協(xié)程的基礎(chǔ)與使用中曾提到過(guò),它是官方提供的啟動(dòng)協(xié)程的四個(gè)方法之一,另外三個(gè)方法為launch方法、async方法、actor方法。代碼如下:
internal fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
// 根據(jù)容量與溢出策略創(chuàng)建Channel對(duì)象
val channel = Channel<E>(capacity, onBufferOverflow)
// 計(jì)算新的上下文
val newContext = newCoroutineContext(context)
// 創(chuàng)建協(xié)程
val coroutine = ProducerCoroutine(newContext, channel)
// 監(jiān)聽(tīng)完成事件
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
// 啟動(dòng)協(xié)程
coroutine.start(start, coroutine, block)
return coroutine
}
在produce方法內(nèi)部,首先創(chuàng)建了一個(gè)Channel類(lèi)型的對(duì)象,接著創(chuàng)建了類(lèi)型為ProducerCoroutine的協(xié)程,并且傳入Channel對(duì)象作為參數(shù)。最后,produce方法返回了一個(gè)ReceiveChannel接口指向的對(duì)象,當(dāng)協(xié)程執(zhí)行完畢后,會(huì)通過(guò)Channel對(duì)象將結(jié)果通過(guò)send方法發(fā)送出來(lái)。
至此,可以知道flowOn方法的實(shí)現(xiàn)實(shí)際上是利用了協(xié)程攔截器的攔截功能。
在這里之后,代碼邏輯分成了兩部分,一部分是block在ProducerCoroutine協(xié)程中的執(zhí)行,另一部分是通過(guò)ReceiveChannel對(duì)象獲取執(zhí)行的結(jié)果。
3.flow方法中代碼的執(zhí)行
在produceImpl方法中,調(diào)用了produce方法,并且傳入了collectToFun對(duì)象,這個(gè)對(duì)象將會(huì)在produce方法創(chuàng)建的協(xié)程中執(zhí)行,代碼如下:
internal val collectToFun: suspend (ProducerScope<T>) -> Unit
get() = { collectTo(it) }
當(dāng)調(diào)用collectToFun對(duì)象的invoke方法時(shí),會(huì)觸發(fā)collectTo方法的執(zhí)行,該方法在ChannelFlowOperator類(lèi)中被重寫(xiě),代碼如下:
protected override suspend fun collectTo(scope: ProducerScope<T>) =
flowCollect(SendingCollector(scope))
在collectTo方法中,首先將參數(shù)scope封裝成SendingCollector類(lèi)型的對(duì)象,接著調(diào)用了flowCollect方法,該方法在ChannelFlowOperatorImpl類(lèi)中被重寫(xiě),代碼如下:
override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
ChannelFlowOperatorImpl類(lèi)的flowCollect方法內(nèi)部調(diào)用了flow對(duì)象的collect方法,這個(gè)flow對(duì)象就是最初通過(guò)flow方法構(gòu)建的對(duì)象。根據(jù)Kotlin協(xié)程:Flow基礎(chǔ)原理的分析,這個(gè)flow對(duì)象類(lèi)型為SafeFlow,最后會(huì)通過(guò)collectSafely方法,觸發(fā)flow方法中的block執(zhí)行。代碼如下:
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
// 觸發(fā)執(zhí)行
collector.block()
}
}
當(dāng)flow方法在執(zhí)行過(guò)程中需要向下游發(fā)出值時(shí),會(huì)調(diào)用emit方法。根據(jù)上面flowCollect方法和collectTo方法可以知道,collectSafely方法的collector對(duì)象就是collectTo方法中創(chuàng)建的SendingCollector類(lèi)型的對(duì)象,代碼如下:
@InternalCoroutinesApi
public class SendingCollector<T>(
private val channel: SendChannel<T>
) : FlowCollector<T> {
// 通過(guò)Channel類(lèi)對(duì)象發(fā)送值
override suspend fun emit(value: T): Unit = channel.send(value)
}
當(dāng)調(diào)用SendingCollector類(lèi)型的對(duì)象的emit方法時(shí),會(huì)通過(guò)調(diào)用類(lèi)型為Channel的對(duì)象的send方法,將值發(fā)送出去。
接下來(lái),將分析下游如何接收上游發(fā)出的值。
4.接收f(shuō)low方法發(fā)出的值
回到ChannelFlow類(lèi)的collect方法,之前提到collect方法中調(diào)用produceImpl方法,開(kāi)啟了一個(gè)新的協(xié)程去執(zhí)行任務(wù),并且返回了一個(gè)ReceiveChannel接口指向的對(duì)象。代碼如下:
override suspend fun collect(collector: FlowCollector<T>): Unit =
coroutineScope {
collector.emitAll(produceImpl(this))
}
在調(diào)用完produceImpl方法后,接著調(diào)用了emitAll方法,將ReceiveChannel接口指向的對(duì)象作為emitAll方法的參數(shù),代碼如下:
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =
emitAllImpl(channel, consume = true)
emitAll方法是FlowCollector接口的擴(kuò)展方法,內(nèi)部調(diào)用了emitAllImpl方法對(duì)參數(shù)channel進(jìn)行封裝,代碼如下:
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
// 用于保存異常
var cause: Throwable? = null
try {
// 死循環(huán)
while (true) {
// 掛起,等待接收Channel結(jié)果或Channel關(guān)閉
val result = run { channel.receiveOrClosed() }
// 如果Channel關(guān)閉了
if (result.isClosed) {
// 如果有異常,則拋出
result.closeCause?.let { throw it }
// 沒(méi)有異常,則跳出循環(huán)
break
}
// 獲取并發(fā)送值
emit(result.value)
}
} catch (e: Throwable) {
// 捕獲到異常時(shí)拋出
cause = e
throw e
} finally {
// 執(zhí)行結(jié)束關(guān)閉Channel
if (consume) channel.cancelConsumed(cause)
}
}
emitAllImpl方法是FlowCollector接口的擴(kuò)展方法,而這里的FlowCollector接口指向的對(duì)象,就是collect方法中創(chuàng)建的匿名對(duì)象,代碼如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
在emitAllImpl方法中,當(dāng)通過(guò)receiveOrClosed方法獲取到上游發(fā)出的值時(shí),會(huì)調(diào)用emit方法通知下游,這時(shí)就會(huì)觸發(fā)collect方法中block的執(zhí)行,最終實(shí)現(xiàn)值從流的上游傳遞到了下游。
三.flowOn方法與流的融合
假設(shè)對(duì)一個(gè)流連續(xù)調(diào)用兩次flowOn方法,那么流最終會(huì)在哪個(gè)flowOn方法指定的調(diào)度器中執(zhí)行呢?代碼如下:
launch(Dispatchers.Main) {
flow {
emit(2)
// emit方法是在IO線程執(zhí)行還是在主線程執(zhí)行呢?
}.flowOn(Dispatchers.IO).flowOn(Dispatchers.Main).collect {
Log.d("liduo", "$it")
}
}
答案是在IO線程執(zhí)行,為什么呢?
根據(jù)本篇上面的分析,當(dāng)?shù)谝淮握{(diào)用flowOn方法時(shí),上游的流會(huì)被包裹成ChannelFlowOperatorImpl對(duì)象,代碼如下:
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
// 檢查當(dāng)前協(xié)程沒(méi)有執(zhí)行結(jié)束
checkFlowContext(context)
return when {
// 為空,則返回自身
context == EmptyCoroutineContext -> this
// 如果是可融合的Flow,則嘗試融合操作,獲取新的流
this is FusibleFlow -> fuse(context = context)
// 其他情況,包裝成可融合的Flow
else -> ChannelFlowOperatorImpl(this, context = context)
}
}
而當(dāng)?shù)诙握{(diào)用flowOn方法時(shí),由于此時(shí)上游的流——ChannelFlowOperatorImpl類(lèi)型的對(duì)象,實(shí)現(xiàn)了FusibleFlow接口,因此,這里會(huì)觸發(fā)流的融合,直接調(diào)用上游的流的fuse方法,并傳入新的上下文。這里容量和溢出策略均為默認(rèn)值。
根據(jù)Kotlin協(xié)程:Flow的融合、Channel容量、溢出策略的分析,這里會(huì)調(diào)用ChannelFlow類(lèi)的fuse方法。相關(guān)代碼如下:
public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
...
// 計(jì)算融合后流的上下文
// context為下游的上下文,this.context為上游的上下文
val newContext = context + this.context
...
}
再根據(jù)之前在Kotlin協(xié)程:協(xié)程上下文與上下文元素中的分析,當(dāng)兩個(gè)上下文進(jìn)行相加時(shí),后一個(gè)上下文中的攔截器會(huì)覆蓋前一個(gè)上下文中的攔截器。在上面的代碼中,后一個(gè)上下文為上游的流的上下文,因此會(huì)優(yōu)先使用上游的攔截器。代碼如下:
public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
四.總結(jié)

粉線為使用時(shí)代碼編寫(xiě)順序,綠線為下游觸發(fā)上游的調(diào)用順序,紅線為上游向下游發(fā)送值的調(diào)用順序,藍(lán)線為線程切換的位置。
到此這篇關(guān)于Kotlin協(xié)程flowOn與線程切換超詳細(xì)示例介紹的文章就介紹到這了,更多相關(guān)Kotlin flowOn與線程切換內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Android 老生常談LayoutInflater的新認(rèn)知
今天不想去聊一些Android的新功能,新特性之類(lèi)的東西,特別想聊一聊這個(gè)老生常談的話題:LayoutInflater,感興趣的朋友來(lái)看看吧2022-03-03
android LinearLayout 布局實(shí)例代碼
android LinearLayout 布局實(shí)例代碼,需要的朋友可以參考一下2013-04-04
Android開(kāi)源庫(kù)自定義相機(jī)模塊
這篇文章主要為大家詳細(xì)介紹了Android開(kāi)源庫(kù)自定義相機(jī)模塊,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-05-05
Android ListView添加頭布局和腳布局實(shí)例詳解
這篇文章主要介紹了Android ListView添加頭布局和腳布局實(shí)例詳解的相關(guān)資料,大家看下效果是否是自己想要實(shí)現(xiàn)的效果,這里附了實(shí)現(xiàn)代碼和實(shí)現(xiàn)效果圖,需要的朋友可以參考下2016-11-11
Android自定義scrollview實(shí)現(xiàn)回彈效果
這篇文章主要為大家詳細(xì)介紹了Android自定義scrollview實(shí)現(xiàn)回彈效果,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-04-04
Android檢測(cè)手機(jī)中存儲(chǔ)卡及剩余空間大小的方法(基于Environment,StatFs及DecimalFormat
這篇文章主要介紹了Android檢測(cè)手機(jī)中存儲(chǔ)卡及剩余空間大小的方法,基于Environment,StatFs及DecimalFormat實(shí)現(xiàn)該功能,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2016-01-01
Android實(shí)現(xiàn)九宮格解鎖的實(shí)例代碼
本篇文章主要介紹了Android九宮格解鎖的實(shí)例代碼,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-07-07
android通過(guò)jxl讀excel存入sqlite3數(shù)據(jù)庫(kù)
本文主要介紹了android通過(guò)jxl去讀excel的內(nèi)容,然后存入sqlite3數(shù)據(jù)庫(kù)表,需要用到j(luò)xl的jar包和sqlite 的jar包,圖片是excel的數(shù)據(jù)格式,需要的朋友可以參考下2014-03-03
詳解AndroidStudio3.0開(kāi)發(fā)調(diào)試安卓NDK的C++代碼
這篇文章主要介紹了AndroidStudio3.0開(kāi)發(fā)調(diào)試安卓NDK的C++代碼,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-12-12
NestedScrollView+Recyclerview下滑卡頓解決方法
本文為大家解決安卓開(kāi)發(fā)時(shí)候NestedScrollView+Recyclerview下滑卡頓的問(wèn)題,希望能夠幫助到你。2017-11-11

