Go語言高效I/O并發(fā)處理雙緩沖和Exchanger模式實(shí)例探索
雙緩沖(double buffering)
雙緩沖(double buffering)是高效處理 I/O 操作的一種并發(fā)技術(shù),它使用兩個(gè) buffer,一個(gè) goroutine 使用其中一個(gè) buffer 進(jìn)行寫,而另一個(gè) goroutine 使用另一個(gè) buffer 進(jìn)行讀,然后進(jìn)行交換。這樣兩個(gè) goroutine 可能并發(fā)的執(zhí)行,減少它們之間的等待和阻塞。
本文還提供了一個(gè)類似 Java 的java.util.concurrent.Exchanger[1]的 Go 并發(fā)原語,它可以用來在兩個(gè) goroutine 之間交換數(shù)據(jù),快速實(shí)現(xiàn)雙緩沖的模式。 這個(gè)并發(fā)原語可以在github.com/smallnest/exp/sync/Exchanger[2]找到。
double buffering 并發(fā)模式
雙緩沖(double buffering)設(shè)計(jì)方式雖然在一些領(lǐng)域中被廣泛的應(yīng)用,但是我還沒有看到它在并發(fā)模式中專門列出了,或者專門列為一種模式。這里我們不妨把它稱之為雙緩存模式。
這是一種在 I/O 處理領(lǐng)域廣泛使用的用來提速的編程技術(shù),它使用兩個(gè)緩沖區(qū)來加速計(jì)算機(jī),該計(jì)算機(jī)可以重疊 I/O 和處理。一個(gè)緩沖區(qū)中的數(shù)據(jù)正在處理,而下一組數(shù)據(jù)被讀入另一個(gè)緩沖區(qū)。 在流媒體應(yīng)用程序中,一個(gè)緩沖區(qū)中的數(shù)據(jù)被發(fā)送到聲音或圖形卡,而另一個(gè)緩沖區(qū)則被來自源(Internet、本地服務(wù)器等)的更多數(shù)據(jù)填充。 當(dāng)視頻顯示在屏幕上時(shí),一個(gè)緩沖區(qū)中的數(shù)據(jù)被填充,而另一個(gè)緩沖區(qū)中的數(shù)據(jù)正在顯示。當(dāng)在緩沖區(qū)之間移動(dòng)數(shù)據(jù)的功能是在硬件電路中實(shí)現(xiàn)的,而不是由軟件執(zhí)行時(shí),全動(dòng)態(tài)視頻的速度會(huì)加快,不但速度被加快,而且可以減少黑屏閃爍的可能。

在這個(gè)模式中,兩個(gè) goroutine 并發(fā)的執(zhí)行,一個(gè) goroutine 使用一個(gè) buffer 進(jìn)行寫(不妨稱為 buffer1),而另一個(gè) goroutine 使用另一個(gè) buffer 進(jìn)行讀(不妨稱為 buffer2)。如圖所示。 當(dāng)左邊的 writer 寫滿它當(dāng)前使用的 buffer1 后,它申請(qǐng)和右邊的 goroutine 的 buffer2 進(jìn)行交換,這會(huì)出現(xiàn)兩種情況:
右邊的 reader 已經(jīng)讀完了它當(dāng)前使用的 buffer2,那么它會(huì)立即交換,這樣左邊的 writer 可以繼續(xù)寫 buffer2,而右邊的 reader 可以繼續(xù)讀 buffer1。
右邊的 reader 還沒有讀完 buffer2,那么左邊的 writer 就會(huì)阻塞,直到右邊的 reader 讀完 buffer2,然后交換。 周而復(fù)始。
同樣右邊的 goroutine 也是同樣的處理,當(dāng)它讀完 buffer2 后,它會(huì)申請(qǐng)和左邊的 goroutine 的 buffer1 進(jìn)行交換,這會(huì)出現(xiàn)兩種情況:
左邊的 writer 已經(jīng)寫完了它當(dāng)前使用的 buffer1,那么它會(huì)立即交換,這樣右邊的 reader 可以繼續(xù)讀 buffer1,而左邊的 writer 可以繼續(xù)寫 buffer2。
左邊的 writer 還沒有寫完 buffer1,那么右邊的 reader 就會(huì)阻塞,直到左邊的 writer 寫完 buffer1,然后交換。 周而復(fù)始。
這樣兩個(gè) goroutine 就可以并發(fā)的執(zhí)行,而不用等待對(duì)方的讀寫操作。這樣可以提高并發(fā)處理的效率。
不僅僅如此, double buffering 其實(shí)可以應(yīng)用于更多的場(chǎng)景, 不僅僅是 buffer 的場(chǎng)景,如 Java 的垃圾回收機(jī)制中,HotSpot JVM 把年輕代分為了三部分:1 個(gè) Eden 區(qū)和 2 個(gè) Survivor 區(qū)(分別叫 from 和 to,或者 s0 和 s1),在 GC 開始的時(shí)候,對(duì)象只會(huì)存在于 Eden 區(qū)和名為“From”的 Survivor 區(qū),Survivor 區(qū)“To”是空的。緊接著進(jìn)行 GC,Eden 區(qū)中所有存活的對(duì)象都會(huì)被復(fù)制到“To”,而在“From”區(qū)中,仍存活的對(duì)象會(huì)根據(jù)他們的年齡值來決定去向。年齡達(dá)到一定值的對(duì)象會(huì)被移動(dòng)到年老代中,沒有達(dá)到閾值的對(duì)象會(huì)被復(fù)制到“To”區(qū)域。經(jīng)過這次 GC 后,Eden 區(qū)和 From 區(qū)已經(jīng)被清空。這個(gè)時(shí)候,“From”和“To”會(huì)交換(exchange)他們的角色,也就是新的“To”就是上次 GC 前的“From”,新的“From”就是上次 GC 前的“To”。不管怎樣,都會(huì)保證名為 To 的 Survivor 區(qū)域是空的。Minor GC 會(huì)一直重復(fù)這樣的過程,直到“To”區(qū)被填滿,“To”區(qū)被填滿之后,會(huì)將所有對(duì)象移動(dòng)到年老代中。

Exchanger 的實(shí)現(xiàn)
既然有這樣的場(chǎng)景,有這樣的需求,所以我們需要針對(duì)這樣場(chǎng)景的一個(gè)同步原語。Java 給我們做了一個(gè)很好的師范,接下來我們使用實(shí)現(xiàn)相應(yīng)的 Go,但是我們的實(shí)現(xiàn)和 Java 的實(shí)現(xiàn)完全不同,我們要基于 Go 既有的同步原語來實(shí)現(xiàn)。
基于 Java 實(shí)現(xiàn)的 Exchanger 的功能,我們也實(shí)現(xiàn)一個(gè)Exchanger, 我們期望它的功能如下:
只用作兩個(gè) goroutine 之間的數(shù)據(jù)交換,不支持多個(gè) goroutine 之間的數(shù)據(jù)交換。
可以重用。交換完之后還可以繼續(xù)交換
支持泛型,可以交換任意類型的數(shù)據(jù)
如果對(duì)端還沒有準(zhǔn)備交換,就阻塞等待
在交換完之前,阻塞的 goroutine 不可能調(diào)用
Exchange方法兩次Go 內(nèi)存模型補(bǔ)充: 同一次交換, 一個(gè) goroutine 在調(diào)用
Exchange方法的完成,一定happens after另一個(gè) goroutine 調(diào)用Exchange方法的開始。
如果你非常熟悉 Go 的各種同步原語,你可以很快的組合出這樣一個(gè)同步原語。如果你還不是那么熟悉,建議你閱讀《深入理解 Go 并發(fā)編程》這本書,京東有售。 下面是一個(gè)簡(jiǎn)單的實(shí)現(xiàn),代碼在Exchanger[3]。 我們只用left、right指代這兩個(gè) goroutine, goroutine 是 Go 語言中的并發(fā)單元,我們期望的就是這兩個(gè) goroutine 發(fā)生關(guān)系。
為了跟蹤這兩個(gè) goroutine,我們需要使用 goroutine id 來標(biāo)記這兩個(gè) goroutine,這樣避免了第三者插入。
type Exchanger[T any] struct {
leftGoID, rightGoID int64
left, right chan T
}
你必須使用 NewExchanger 創(chuàng)建一個(gè)Exchanger,它會(huì)返回一個(gè)Exchanger的指針。 初始化的時(shí)候我們把 left 和 right 的 id 都設(shè)置為-1,表示還沒有 goroutine 使用它們,并且不會(huì)和所有的 goroutine 的 id 沖突。 同時(shí)我們創(chuàng)建兩個(gè) channel,一個(gè)用來左邊的 goroutine 寫,右邊的 goroutine 讀,另一個(gè)用來右邊的 goroutine 寫,左邊的 goroutine 讀。channel 的 buffer 設(shè)置為 1,這樣可以避免死鎖。
func NewExchanger[T any]( "T any") *Exchanger[T] {
return &Exchanger[T]{
leftGoID: -1,
rightGoID: -1,
left: make(chan T, 1),
right: make(chan T, 1),
}
}
Exchange方法是核心方法,它用來交換數(shù)據(jù),它的實(shí)現(xiàn)如下:
func (e *Exchanger[T]) Exchange(value T) T {
goid := goroutine.ID()
// left goroutine
isLeft := atomic.CompareAndSwapInt64(&e.leftGoID, -1, goid)
if !isLeft {
isLeft = atomic.LoadInt64(&e.leftGoID) == goid
}
if isLeft {
e.right <- value // send value to right
return <-e.left // wait for value from right
}
// right goroutine
isRight := atomic.CompareAndSwapInt64(&e.rightGoID, -1, goid)
if !isRight {
isRight = atomic.LoadInt64(&e.rightGoID) == goid
}
if isRight {
e.left <- value // send value to left
return <-e.right // wait for value from left
}
// other goroutine
panic("sync: exchange called from neither left nor right goroutine")
}當(dāng)一個(gè) goroutine 調(diào)用的時(shí)候,首先我們嘗試把它設(shè)置為left,如果成功,那么它就是left。 如果不成功,我們就判斷它是不是先前已經(jīng)是left,如果是,那么它就是left。 如果先前,或者此時(shí)left已經(jīng)被另一個(gè) goroutine 占用了,它還有機(jī)會(huì)成為right,同樣的邏輯檢查和設(shè)置right。
如果既不是left也不是right,那么就是第三者插入了,我們需要 panic,因?yàn)槲覀儾幌M谌卟遄恪?/p>
如果它是left,那么它就會(huì)把數(shù)據(jù)發(fā)送到right,然后等待right發(fā)送數(shù)據(jù)過來。 如果它是right,那么它就會(huì)把數(shù)據(jù)發(fā)送到left,然后等待left發(fā)送數(shù)據(jù)過來。
這樣就實(shí)現(xiàn)了數(shù)據(jù)的交換。
Exchanger 的使用
我們使用一個(gè)簡(jiǎn)單的雙緩沖例子來說明如何使用Exchanger,我們創(chuàng)建兩個(gè) goroutine,一個(gè) goroutine 負(fù)責(zé)寫,另一個(gè) goroutine 負(fù)責(zé)讀,它們之間通過Exchanger來交換數(shù)據(jù)。
buf1 := bytes.NewBuffer(make([]byte, 1024))
buf2 := bytes.NewBuffer(make([]byte, 1024))
exchanger := syncx.NewExchanger[*bytes.Buffer]( "*bytes.Buffer")
var wg sync.WaitGroup
wg.Add(2)
expect := 0
go func() { // g1
defer wg.Done()
buf := buf1
for i := 0; i < 10; i++ {
for j := 0; j < 1024; j++ {
buf.WriteByte(byte(j / 256))
expect += j / 256
}
buf = exchanger.Exchange(buf)
}
}()
var got int
go func() { // g2
defer wg.Done()
buf := buf2
for i := 0; i < 10; i++ {
buf = exchanger.Exchange(buf)
for _, b := range buf.Bytes() {
got += int(b)
}
buf.Reset()
}
}()
wg.Wait()
fmt.Println(got)
fmt.Println(expect == got)在這個(gè)例子中 g1負(fù)責(zé)寫,每個(gè) buffer 的容量是 1024,寫滿就交給另外一個(gè)讀 g2,并從讀 g2 中交換過來一個(gè)空的 buffer 繼續(xù)寫。 交換 10 次之后,兩個(gè) goroutine 都退出了,我們檢查寫入的數(shù)據(jù)和讀取的數(shù)據(jù)是否一致,如果一致,那么就說明我們的Exchanger實(shí)現(xiàn)是正確的。
總結(jié)
文本介紹了一種類似 Java 的Exchanger的同步原語的實(shí)現(xiàn),這個(gè)同步原語可以在雙緩沖的場(chǎng)景中使用,提高并發(fā)處理的性能。
參考資料
[1]java.util.concurrent.Exchanger: https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/Exchanger.html
[2]github.com/smallnest/exp/sync/Exchanger: https://pkg.go.dev/github.com/smallnest/exp@v0.2.2/sync#Exchanger
[3]Exchanger: https://pkg.go.dev/github.com/smallnest/exp@v0.2.2/sync#Exchanger
以上就是Go語言高效I/O并發(fā)處理雙緩沖和Exchanger模式實(shí)例探索的詳細(xì)內(nèi)容,更多關(guān)于Go I/O并發(fā)處理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go語言實(shí)現(xiàn)IP段范圍校驗(yàn)示例
這篇文章主要介紹了Go語言實(shí)現(xiàn)IP段范圍校驗(yàn)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09
利用Go語言實(shí)現(xiàn)流量回放工具的示例代碼
今天給大家推薦一款使用Go語言編寫的流量回放工具?--?goreplay;工作中你一定遇到過需要在服務(wù)器上抓包的場(chǎng)景,有了這個(gè)工具就可以助你一臂之力,廢話不多,我們接下來來看一看這個(gè)工具2022-09-09
Go在GoLand中引用github.com中的第三方包具體步驟
這篇文章主要給大家介紹了關(guān)于Go在GoLand中引用github.com中第三方包的具體步驟,文中通過圖文介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Go具有一定的參考價(jià)值,需要的朋友可以參考下2024-01-01
k8s容器互聯(lián)-flannel?host-gw原理篇
這篇文章主要為大家介紹了k8s容器互聯(lián)-flannel?host-gw原理篇,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04
Golang性能提升利器之SectionReader的用法詳解
本文將介紹 Go 語言中的 SectionReader,包括 SectionReader的基本使用方法、實(shí)現(xiàn)原理、使用注意事項(xiàng),感興趣的小伙伴可以了解一下2023-07-07
Golang編程實(shí)現(xiàn)生成n個(gè)從a到b不重復(fù)隨機(jī)數(shù)的方法
這篇文章主要介紹了Golang編程實(shí)現(xiàn)生成n個(gè)從a到b不重復(fù)隨機(jī)數(shù)的方法,結(jié)合實(shí)例形式分析了Go語言字符串操作及隨機(jī)數(shù)生成的相關(guān)操作技巧,需要的朋友可以參考下2017-01-01

