Kotlin協(xié)程開發(fā)之Flow的融合與Channel容量及溢出策略介紹
一.協(xié)程間的通信
當需要進行協(xié)程間的通信時,可以調(diào)用Channel方法,創(chuàng)建一個Channel接口指向的對象,通過調(diào)用該對象的send方法和receive方法實現(xiàn)消息的發(fā)送與接收。協(xié)程對Channel接口的實現(xiàn),本質(zhì)上與阻塞隊列類似,這里不再贅述。
1.通道容量
事實上,send方法與receive方法并沒有定義在Channel接口中,而是分別定義在SendChannel接口和ReceiveChannel接口中。Channel接口中只是定義了一些與Channel容量策略相關(guān)的枚舉常量,代碼如下:
// 繼承SendChannel接口和ReceiveChannel接口
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
// 枚舉常量
public companion object Factory {
// Channel的容量為無限
public const val UNLIMITED: Int = Int.MAX_VALUE
// Channel的容量為0,沒有緩存
public const val RENDEZVOUS: Int = 0
// Channel的容量為1,溢出策略為DROP_OLDEST,
// 后一個的數(shù)據(jù)會覆蓋前一個數(shù)據(jù)
public const val CONFLATED: Int = -1
// Channel的容量為默認值CHANNEL_DEFAULT_CAPACITY,
// 默認溢出策略為SUSPEND,send方法會發(fā)生掛起
// 當容量策略為BUFFERED,而溢出策略不為SUSPEND時,Channel的容量為1
public const val BUFFERED: Int = -2
// 協(xié)程內(nèi)部使用的一個默認枚舉值,不對外暴露
internal const val OPTIONAL_CHANNEL = -3
// 用于手動配置容量策略為BUFFERED時的默認值
public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "kotlinx.coroutines.channels.defaultBuffer"
// 容量策略為BUFFERED時的默認值
// 默認64,最小1,最大為Int.MAX_VALUE-1
internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
64, 1, UNLIMITED - 1
)
}
}
從上面的代碼可以看出Channel接口繼承自SendChannel接口和ReceiveChannel接口。因此,一個Channel接口指向的對象,既可以用于發(fā)送消息,也可以用于接收消息。
2.溢出策略
Channel除了容量策略外,還有溢出策略,用于決定當Channel的容量已滿時,而下一個消息到來時的行為。溢出策略定義在枚舉類BufferOverflow中,代碼如下:
public enum class BufferOverflow {
// 當容量已滿時,掛起調(diào)用send方法的協(xié)程
SUSPEND,
// 當容量已滿時,刪除舊數(shù)據(jù),將新的數(shù)據(jù)添加進去,不掛起調(diào)用send方法的協(xié)程
DROP_OLDEST,
// 當容量已滿時,忽略當前要添加的數(shù)據(jù),不掛起調(diào)用send方法的協(xié)程
DROP_LATEST
}
二.FusibleFlow接口
FusibleFlow接口繼承自Flow接口。一個類實現(xiàn)了該接口,表示該類創(chuàng)建的流可以與其上游或下游相鄰的流進行融合,當流發(fā)生融合時,就會調(diào)用接口中定義的fuse方法,代碼如下:
@InternalCoroutinesApi
public interface FusibleFlow<T> : Flow<T> {
// 用于流的融合
public fun fuse(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>
}
FusibleFlow接口的fuse方法,默認容量為OPTIONAL_CHANNEL,默認溢出策略為SUSPEND。
流的融合
在Flow中,當channelFlow方法、flowOn方法、buffer方法、produceIn方法、broadcastIn方法相鄰調(diào)用時,就會觸發(fā)流的融合。
具體融合的過程,其實是將下游流的容量、溢出策略、上下文傳遞給上游的流處理,上游的流根據(jù)自身的容量、溢出策略、上下文以及下游的流的容量、溢出策略、上下文重新計算,得到新的容量、溢出策略、上下文,并返回一個融合后的流。
三.ChannelFlow類
ChannelFlow類是一個抽象類,實現(xiàn)了FusibleFlow接口。下面分析一下fuse方法對于上下游流融合的策略,代碼如下:
@InternalCoroutinesApi
public abstract class ChannelFlow<T>(
// 上游流的上下文
@JvmField public val context: CoroutineContext,
// 上下游之間流的緩存容量
@JvmField public val capacity: Int,
// 溢出策略
@JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow<T> {
...
public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
// CONFLATED是一個復合的類型,需要拆解成capacity = 0, onBufferOverflow = DROP_OLDEST
assert { capacity != Channel.CONFLATED }
// 計算融合后流的上下文
val newContext = context + this.context
// 用于保存融合后流的容量
val newCapacity: Int
// 用于保存融合后流的溢出策略
val newOverflow: BufferOverflow
// SUSPEND為默認溢出策略,如果溢出策略不是默認的策略
if (onBufferOverflow != BufferOverflow.SUSPEND) {
// 直接保存
newCapacity = capacity
newOverflow = onBufferOverflow
} else { // 如果是默認策略
// 計算并保存新的容量
newCapacity = when {
// 如果之前的容量為默認枚舉值,則使用新的
this.capacity == Channel.OPTIONAL_CHANNEL -> capacity
// 如果新的容量為默認枚舉值,則使用原來的
capacity == Channel.OPTIONAL_CHANNEL -> this.capacity
// 如果原來的容量為默認值CHANNEL_DEFAULT_CAPACITY,則使用新的
this.capacity == Channel.BUFFERED -> capacity
// 如果新的容量為默認值CHANNEL_DEFAULT_CAPACITY,則使用原來的
capacity == Channel.BUFFERED -> this.capacity
// 如果不為默認值或默認枚舉值
else -> {
// 檢查容量都是大于等于0的
assert { this.capacity >= 0 }
assert { capacity >= 0 }
// 將原來的容量和新的容量進行相加
val sum = this.capacity + capacity
// 如果相加后大與等于0,則容量為相加后的結(jié)果,否則為無限
if (sum >= 0) sum else Channel.UNLIMITED
}
}
// 保存溢出策略
newOverflow = this.onBufferOverflow
}
// 如果融合的兩個流的上下文相同,容量相同,溢出策略也相同
if (newContext == this.context && newCapacity == this.capacity && newOverflow == this.onBufferOverflow)
// 則直接返回
return this
// 有變化則根據(jù)新計算出得參數(shù),創(chuàng)建融合后的流
return create(newContext, newCapacity, newOverflow)
}
// 由子類進行重寫
protected abstract fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T>
...
}流融合的原則
根據(jù)上面對fuse方法的分析,可以總結(jié)出fuse方法在計算容量和溢出策略時的四個原則:
1)下游優(yōu)先于上游
2)溢出策略優(yōu)先于容量
3)非默認值優(yōu)先于默認值
4)上下游容量都不為默認值,則相加取和
到此這篇關(guān)于Kotlin協(xié)程開發(fā)之Flow的融合與Channel容量及溢出策略介紹的文章就介紹到這了,更多相關(guān)Kotlin Flow的融合內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Android 跨進程模擬按鍵(KeyEvent )實例詳解
這篇文章主要介紹了Android 跨進程模擬按鍵(KeyEvent )實例詳解的相關(guān)資料,類似手機遙控器的需求就可以這么做,需要的朋友可以參考下2016-11-11
Android控件Chronometer定時器的實現(xiàn)方法
這篇文章主要為大家詳細介紹了Android控件Chronometer定時器的實現(xiàn)方法,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2016-11-11
Android中使用PagerSlidingTabStrip實現(xiàn)導航標題的示例
本篇文章主要介紹了Android中使用PagerSlidingTabStrip實現(xiàn)導航標題的示例,具有一定的參考價值,有興趣的可以了解一下。2017-01-01

