Kotlin協(xié)程Channel特點(diǎn)及使用細(xì)節(jié)詳解
正文
在協(xié)程啟動(dòng)模式中已經(jīng)知道async是可以返回結(jié)果的,但是只返回一個(gè),那么在復(fù)雜場(chǎng)景下就會(huì)不夠用了,所以Channel就出現(xiàn)了。
1.認(rèn)識(shí)Channel
Channel的意思是管道、通道,用圖表示如下:

Channel的左邊是發(fā)送方,右邊是接收方,中間則是消息,那么代碼表示就是下面這樣:
fun main() {
channelTest()
}
fun channelTest() = runBlocking {
val channel = Channel<Int>() //關(guān)鍵點(diǎn)①
launch {
for (i in 1..3) {
channel.send(i) //關(guān)鍵點(diǎn)②
logX("send: $i")
}
}
launch {
for (i in channel) { //關(guān)鍵點(diǎn)③
logX("receiver: $i")
}
}
logX("end")
}
//輸出結(jié)果:
//================================
//end
//Thread:main @coroutine#1
//================================
//================================
//receiver: 1
//Thread:main @coroutine#3
//================================
//================================
//send: 1
//Thread:main @coroutine#2
//================================
//================================
//send: 2
//Thread:main @coroutine#2
//================================
//================================
//receiver: 2
//Thread:main @coroutine#3
//================================
//================================
//receiver: 3
//Thread:main @coroutine#3
//================================
//================================
//send: 3
//Thread:main @coroutine#2
//================================
上面的代碼中啟動(dòng)了兩個(gè)協(xié)程,一個(gè)發(fā)送,一個(gè)接收,還有幾個(gè)關(guān)鍵點(diǎn):
- 關(guān)鍵點(diǎn)①:通過
Channel創(chuàng)建一個(gè)管道,其中泛型Int表示發(fā)送的數(shù)據(jù)類型; - 關(guān)鍵點(diǎn)②:?jiǎn)?dòng)一個(gè)協(xié)程通過
send發(fā)送數(shù)據(jù),send是一個(gè)掛起函數(shù); - 關(guān)鍵點(diǎn)③:?jiǎn)?dòng)一個(gè)協(xié)程遍歷
channel打印出接收到的消息。
那么這里還有一個(gè)問題,在執(zhí)行完上述代碼后程序并沒有終止,那要如何終止程序呢?
很簡(jiǎn)單,在發(fā)送完所有消息后調(diào)用close方法即可。
launch {
for (i in 1..3) {
channel.send(i) //關(guān)鍵點(diǎn)②
logX("send: $i")
}
// 修改點(diǎn)
// ↓
channel.close()
}
Channel也是一種協(xié)程資源,用完后如果不關(guān)閉那么這個(gè)資源就會(huì)一直被占用。
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
RENDEZVOUS -> {
...
}
CONFLATED -> {
...
}
UNLIMITED -> {
...
}
BUFFERED -> {
...
}
else -> {
...
}
}
Channel中有三個(gè)參數(shù):
capacity: 代表管道的容量,默認(rèn)值為RENDEZVOUS,代表容量為0,除此之外還有三個(gè)類型:
- CONFLATED:代表容量為1,新的數(shù)據(jù)會(huì)替代舊的數(shù)據(jù);
- UNLIMITED:代表無限容量;
- BUFFERED:代表具備一定緩存的容量,默認(rèn)情況下是64,具體容量由VM參數(shù)
kotlinx.coroutines.channels.defaultBuffer決定。 onBufferOverflow: 代表緩沖策略,也就是當(dāng)緩沖的容量滿了之后要怎么做。默認(rèn)值為SUSPEND,表示在緩沖區(qū)溢出時(shí)掛起。除此之外還有兩個(gè)類型:
- DROP_OLDEST:在緩沖區(qū)溢出時(shí)刪除最舊的值,向緩沖區(qū)添加新值,不要掛起;

- DROP_LATEST:在緩沖區(qū)溢出時(shí),立即刪除正在添加到緩沖區(qū)的最新值(以便緩沖區(qū)內(nèi)容保持不變),不要掛起。

onUndeliveredElement: 它相當(dāng)于一個(gè)異常處理回調(diào)。當(dāng)管道中的某些數(shù)據(jù)沒有被成功接收的時(shí)候,這個(gè)回調(diào)就會(huì)被調(diào)用
現(xiàn)在寫個(gè)案例看一下capacity在其他類型下的區(qū)別
/**
* Channel.CONFLATED
*/
fun channelTest() = runBlocking {
val channel = Channel<Int>(Channel.CONFLATED)
launch {
for (i in 1..4) {
channel.send(i)
println("send: $i")
}
channel.close()
}
launch {
for (i in channel) {
println("receiver: $i")
}
}
println("end")
}
//輸出結(jié)果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 4
/**
* Channel.UNLIMITED
*/
fun channelTest() = runBlocking {
val channel = Channel<Int>(Channel.UNLIMITED)
launch {
for (i in 1..4) {
channel.send(i)
println("send: $i")
}
channel.close()
}
launch {
for (i in channel) {
println("receiver: $i")
}
}
println("end")
}
//輸出結(jié)果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3
//receiver: 4
/**
* Channel.BUFFERED
*/
fun channelTest() = runBlocking {
val channel = Channel<Int>(Channel.BUFFERED)
launch {
for (i in 1..4) {
channel.send(i)
println("send: $i")
}
channel.close()
}
launch {
for (i in channel) {
println("receiver: $i")
}
}
println("end")
}
//輸出結(jié)果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3
//receiver: 4
再看一下onBufferOverflow在其他類型下的區(qū)別
/**
* capacity = 3,onBufferOverflow = BufferOverflow.DROP_OLDEST
* 緩沖區(qū)設(shè)置為3,緩沖區(qū)溢出時(shí)刪除最舊的值,向緩沖區(qū)添加新值
*/
fun channelTest() = runBlocking {
val channel = Channel<Int>(
capacity = 3,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
launch {
for (i in 1..4) {
channel.send(i)
println("send: $i")
}
channel.close()
}
launch {
for (i in channel) {
println("receiver: $i")
}
}
println("end")
}
//輸出結(jié)果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 2
//receiver: 3
//receiver: 4
/**
* capacity = 3,onBufferOverflow = BufferOverflow.DROP_LATEST
* 緩沖區(qū)設(shè)置為3,緩沖區(qū)溢出時(shí)立即刪除正在添加到緩沖區(qū)的最新值
*/
fun channelTest() = runBlocking {
val channel = Channel<Int>(
capacity = 3,
onBufferOverflow = BufferOverflow.DROP_LATEST
)
launch {
for (i in 1..4) {
channel.send(i)
println("send: $i")
}
channel.close()
}
launch {
for (i in channel) {
println("receiver: $i")
}
}
println("end")
}
//輸出結(jié)果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3
再看一下onUndeliveredElement要如何使用
/**
* capacity = 2,onBufferOverflow = BufferOverflow.DROP_LATEST, onUndeliveredElement
* 緩沖區(qū)設(shè)置為2,緩沖區(qū)溢出時(shí)立即刪除正在添加到緩沖區(qū)的最新值
* 接收一個(gè)數(shù)據(jù)后取消接收其他數(shù)據(jù)
*/
fun channelTest() = runBlocking {
val channel = Channel<Int>(
capacity = 2,
onBufferOverflow = BufferOverflow.DROP_LATEST,
onUndeliveredElement = {
println("onUndeliveredElement: $it")
}
)
launch {
for (i in 1..4) {
channel.send(i)
println("send: $i")
}
}
println("receive:${channel.receive()}")
channel.cancel()
}
//輸出結(jié)果:
//send: 1
//send: 2
//send: 3
//send: 4
//receive:1
//onUndeliveredElement: 2
//onUndeliveredElement: 3
上面的代碼容量設(shè)置為2,緩沖策略是刪除正在添加到緩沖區(qū)的最新值,接收一個(gè)數(shù)據(jù)后立即取消接收其他數(shù)據(jù),也就是說接收到了【send: 1】的數(shù)據(jù)【receive:1】,【send: 4】的數(shù)據(jù)被緩沖策略刪除了,由于接收消息的同道已經(jīng)被取消了那么【send: 2】和【send: 3】的數(shù)據(jù)就只能在異常中被處理,從輸出結(jié)果就可以看到。
從上面的代碼示例可以總結(jié)出它的應(yīng)用場(chǎng)景:接收方很關(guān)心數(shù)據(jù)是否被消費(fèi),例如企業(yè)微信、釘釘?shù)南⑹欠褚炎x的狀態(tài),對(duì)于異常處理那塊的場(chǎng)景就像是發(fā)送消息過程中消息沒有被發(fā)送出去,那么接收方就無法接受到這個(gè)消息。
2.Channel使用中的細(xì)節(jié)
前面在使用Channel時(shí)為了讓程序終止在發(fā)送完成后調(diào)用了channel.close(),但是這個(gè)很容易被忘記,忘記添加就會(huì)造成程序無法終止的問題,那么Produce就誕生了,它是一個(gè)高階函數(shù)。
fun produceTest() = runBlocking {
val channel: ReceiveChannel<Int> = produce {
for (i in 1..4) {
send(i)
}
}
launch {
for (i in channel) {
println("receive: $i")
}
}
println("end")
}
//輸出結(jié)果:
//end
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//Process finished with exit code 0
可以看到?jīng)]有加入close代碼就可以正常結(jié)束,上面發(fā)送了4條數(shù)據(jù),那么我要是接收5條數(shù)據(jù)會(huì)不會(huì)有什么問題?
fun produceTest() = runBlocking {
val channel: ReceiveChannel<Int> = produce {
for (i in 1..4) {
send(i)
}
}
println("receive: ${channel.receive()}")
println("receive: ${channel.receive()}")
println("receive: ${channel.receive()}")
println("receive: ${channel.receive()}")
println("receive: ${channel.receive()}")
println("end")
}
//輸出結(jié)果:
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//ClosedReceiveChannelException: Channel was closed
可以看到當(dāng)我接收第5條數(shù)據(jù)的時(shí)候報(bào)出channel被關(guān)閉的提示,也就是說produce確實(shí)會(huì)在消息發(fā)送完畢后關(guān)閉通道。
業(yè)務(wù)開發(fā)中有可能我們確實(shí)需要對(duì)channel發(fā)送的消息進(jìn)行單獨(dú)處理,那么也許并不知道具體發(fā)送了幾條數(shù)據(jù),如果接收數(shù)據(jù)數(shù)量超過發(fā)送數(shù)據(jù)數(shù)量就會(huì)出現(xiàn)錯(cuò)誤,那有沒有像isClose這類的方法可以在接收前判斷是否被關(guān)閉呢?有的,在Channel中還有兩個(gè)變量:
//如果該通道已通過調(diào)用[close]關(guān)閉,則返回' true '。這意味著調(diào)用[send]將導(dǎo)致異常。 public val isClosedForSend: Boolean //如果通過在SendChannel端調(diào)用close關(guān)閉了此通道, //并且已經(jīng)接收到以前發(fā)送的所有項(xiàng)目,則返回true。 public val isClosedForReceive: Boolean
那么安全的調(diào)用channel.receive()接收就可以這么寫
fun produceTest() = runBlocking {
val channel: ReceiveChannel<Int> = produce(capacity = 3) {
(1..4).forEach {
send(it)
println("Send $it")
}
}
while (!channel.isClosedForReceive) {
println("receive: ${channel.receive()}")
}
println("end")
}
//輸出結(jié)果:
//Send 1
//Send 2
//Send 3
//Send 4
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//end
但是這里會(huì)有一個(gè)問題,不定義capacity的數(shù)量
fun produceTest() = runBlocking {
// 變化在這里
// ↓
val channel: ReceiveChannel<Int> = produce {
(1..4).forEach {
send(it)
println("Send $it")
}
}
while (!channel.isClosedForReceive) {
println("receive: ${channel.receive()}")
}
println("end")
}
//輸出結(jié)果:
//Send 1
//receive: 1
//receive: 2
//Send 2
//Send 3
//receive: 3
//receive: 4
//Send 4
//
//ClosedReceiveChannelException: Channel was closed
可以看到send發(fā)送的數(shù)據(jù)全部都被接收了,但是還是報(bào)出channel被關(guān)閉的錯(cuò)誤,原因在注釋中已經(jīng)寫明:如果通過在SendChannel端調(diào)用close關(guān)閉了此通道,并且已經(jīng)接收到以前發(fā)送的所有項(xiàng)目,則返回true。
這意味著調(diào)用receive將導(dǎo)致closereceivechannelexception。 所以channel.receive()要慎用??梢杂?code>channel.consumeEach代替
fun produceTest() = runBlocking {
val channel: ReceiveChannel<Int> = produce {
(1..4).forEach {
send(it)
println("Send $it")
}
}
//變化在這里
channel.consumeEach {
println("receive: $it")
}
println("end")
}
//輸出結(jié)果:
//Send 1
//receive: 1
//receive: 2
//Send 2
//Send 3
//receive: 3
//receive: 4
//Send 4
//end
3.Channe的特點(diǎn)
Channel主要你用來傳遞數(shù)據(jù)流的,這個(gè)數(shù)據(jù)流指的是多個(gè)數(shù)據(jù)組合形成別的流,與它形成鮮明對(duì)比的是async、掛起函數(shù)。
數(shù)據(jù)流的傳輸,有發(fā)送就有接收,而Channel是完全符合這一點(diǎn)的。發(fā)送與接收存在兩種情況:
- 數(shù)據(jù)流的發(fā)送了但是還沒有被接收,沒有接收則不再進(jìn)行發(fā)送消息,例如文件的傳輸;
- 數(shù)據(jù)流的發(fā)送了不管有沒有被接收,都要繼續(xù)發(fā)送消息,例如微信聊天。
Channel符合第二個(gè)結(jié)論,無論發(fā)送的數(shù)據(jù)是否被消費(fèi)或者說被接收,Channel都會(huì)進(jìn)行工作。我們來證明一下這個(gè)結(jié)論。
/**
* 消息容量為10,發(fā)送4條數(shù)據(jù)
* 無論消息是否被接收都會(huì)吧消息發(fā)送完畢
*/
fun produceTest() = runBlocking {
val channel: ReceiveChannel<Int> = produce(capacity = 10) {
(1..4).forEach {
send(it)
println("Send $it")
}
}
println("end")
}
//輸出結(jié)果:
//end
//Send 1
//Send 2
//Send 3
//Send 4
/**
* 消息容量改為默認(rèn),默認(rèn)值時(shí)0,發(fā)送4條數(shù)據(jù)
* Channel依舊是在工作的,只是說在調(diào)用send方法的時(shí)候
* 接收方還沒有準(zhǔn)備完畢且容量為0,所以會(huì)被掛起,程序一直無法退出
*/
fun produceTest() = runBlocking {
val channel: ReceiveChannel<Int> = produce {
(1..4).forEach {
send(it)
println("Send $it")
}
}
println("end")
}
//輸出結(jié)果:
//end
//程序沒有結(jié)束
通過上面的代碼引出一個(gè)結(jié)論:Channel是“熱” 的。不管接收方是否存在,Channel是一定會(huì)工作的。類似于自來水廠向像居民提供水源,發(fā)電廠向居民提供電能。

以上就是Kotlin協(xié)程Channel特點(diǎn)及使用細(xì)節(jié)詳解的詳細(xì)內(nèi)容,更多關(guān)于Kotlin協(xié)程Channel特點(diǎn)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
android控件實(shí)現(xiàn)單擊拖動(dòng)效果
這篇文章主要為大家詳細(xì)介紹了android控件實(shí)現(xiàn)單擊拖動(dòng)效果,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-01-01
Android開發(fā)中使用Volley庫發(fā)送HTTP請(qǐng)求的實(shí)例教程
這篇文章主要介紹了Android開發(fā)中使用Volley庫發(fā)送HTTP請(qǐng)求的實(shí)例教程,包括創(chuàng)建Volley單例的基本知識(shí)與取消Request請(qǐng)求的技巧等,需要的朋友可以參考下2016-05-05
Android星級(jí)評(píng)分條實(shí)現(xiàn)評(píng)分界面
這篇文章主要為大家詳細(xì)介紹了Android星級(jí)評(píng)分條實(shí)現(xiàn)評(píng)分界面,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-05-05
android中NFC讀寫功能的實(shí)現(xiàn)方法
這篇文章主要為大家詳細(xì)介紹了android中NFC讀寫功能的實(shí)現(xiàn)方法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09
Android開發(fā)實(shí)現(xiàn)查詢遠(yuǎn)程服務(wù)器的工具類QueryUtils完整實(shí)例
這篇文章主要介紹了Android開發(fā)實(shí)現(xiàn)查詢遠(yuǎn)程服務(wù)器的工具類QueryUtils,涉及Android服務(wù)器請(qǐng)求發(fā)送、接收、數(shù)據(jù)交互等相關(guān)操作技巧,需要的朋友可以參考下2017-11-11
Android初學(xué)者必須知道的10個(gè)技術(shù)
本篇內(nèi)容給大家整理10個(gè)作為Android初學(xué)者必須要了解和會(huì)用的技術(shù)以及詳細(xì)代碼分析,需要的朋友收藏下慢慢學(xué)習(xí)吧。2017-12-12
Android簡(jiǎn)單實(shí)用的可拖拽GridView組件分享
在我們?nèi)粘i_發(fā)中,使用?GridView?這種網(wǎng)格視圖的場(chǎng)合還是不少的,本篇我們來介紹一個(gè)支持拖拽的?GridView?組件,可以輕松搞定網(wǎng)格視圖的拖拽排序,需要的可以參考一下2023-06-06
Android5.0以上版本錄屏實(shí)現(xiàn)代碼(完整代碼)
這篇文章主要介紹了Android5.0以上版本錄屏實(shí)現(xiàn)代碼,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2018-01-01
Android Studio如何為Activity添加自定義注解信息
好久沒用寫文章了,今天給大家分享Android Studio如何為Activity添加自定義注解信息,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2021-06-06

