源碼分析Go語(yǔ)言使用cgo導(dǎo)致線程增長(zhǎng)的原因
TDengine Go 連接器 https://github.com/taosdata/driver-go 使用 cgo 調(diào)用 taos.so 中的 API,使用過(guò)程中發(fā)現(xiàn)線程數(shù)不斷增長(zhǎng),本文從一個(gè) cgo 調(diào)用開始解析 Go 源碼,分析造成線程增長(zhǎng)的原因。
轉(zhuǎn)換 cgo 代碼
對(duì) driver-go/wrapper/taosc.go 進(jìn)行轉(zhuǎn)換
go tool cgo taosc.go
執(zhí)行后生成 _obj 文件夾
go 代碼分析
以 taosc.cgo1.go 中 TaosResetCurrentDB 為例來(lái)分析。
// TaosResetCurrentDB void taos_reset_current_db(TAOS *taos);
func TaosResetCurrentDB(taosConnect unsafe.Pointer) {
func() { _cgo0 := /*line :161:26*/taosConnect; _cgoCheckPointer(_cgo0, nil); _Cfunc_taos_reset_current_db(_cgo0); }()
}
//go:linkname _cgoCheckPointer runtime.cgoCheckPointer
func _cgoCheckPointer(interface{}, interface{})
//go:cgo_unsafe_args
func _Cfunc_taos_reset_current_db(p0 unsafe.Pointer) (r1 _Ctype_void) {
_cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))
if _Cgo_always_false {
_Cgo_use(p0)
}
return
}
//go:linkname _cgo_runtime_cgocall runtime.cgocall
func _cgo_runtime_cgocall(unsafe.Pointer, uintptr) int32
//go:cgo_import_static _cgo_453a0cad50ef_Cfunc_taos_reset_current_db
//go:linkname __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db _cgo_453a0cad50ef_Cfunc_taos_reset_current_db
var __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db byte
var _cgo_453a0cad50ef_Cfunc_taos_reset_current_db = unsafe.Pointer(&__cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db)TaosResetCurrentDB 首先調(diào)用 _cgoCheckPointer 檢查傳入?yún)?shù)是否為 nil。
//go:linkname _cgoCheckPointer runtime.cgoCheckPointer 表示 cgoCheckPointer 方法實(shí)現(xiàn)是 runtime.cgoCheckPointer,如果傳入?yún)?shù)是 nil 程序?qū)?huì) panic。
接著調(diào)用 _Cfunc_taos_reset_current_db。
Cfunc_taos_reset_current_db 方法中 _Cgo_always_false 在運(yùn)行時(shí)會(huì)是 false,所以只分析第一句 _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))。
_cgo_runtime_cgocall實(shí)現(xiàn)是runtime.cgocall這個(gè)會(huì)重點(diǎn)分析。_cgo_453a0cad50ef_Cfunc_taos_reset_current_db由上方最后代碼塊可以看出是taos_reset_current_db方法指針。uintptr(unsafe.Pointer(&p0))表示 p0 的指針地址。- 由上面可以看出這句意思是調(diào)用
runtime.cgocall,參數(shù)為方法指針和參數(shù)的指針地址。
分析 runtime.cgocall
基于 golang 1.20.4 分析該方法
func cgocall(fn, arg unsafe.Pointer) int32 {
if !iscgo && GOOS != "solaris" && GOOS != "illumos" && GOOS != "windows" {
throw("cgocall unavailable")
}
if fn == nil {
throw("cgocall nil")
}
if raceenabled {
racereleasemerge(unsafe.Pointer(&racecgosync))
}
mp := getg().m // 獲取當(dāng)前 goroutine 的 M
mp.ncgocall++ // 總 cgo 計(jì)數(shù) +1
mp.ncgo++ // 當(dāng)前 cgo 計(jì)數(shù) +1
mp.cgoCallers[0] = 0 // 重置追蹤
entersyscall() // 進(jìn)入系統(tǒng)調(diào)用,保存上下文, 標(biāo)記當(dāng)前 goroutine 獨(dú)占 m, 跳過(guò)垃圾回收
osPreemptExtEnter(mp) // 標(biāo)記異步搶占, 使異步搶占邏輯失效
mp.incgo = true // 修改狀態(tài)
errno := asmcgocall(fn, arg) // 真正進(jìn)行方法調(diào)用的地方
mp.incgo = false // 修改狀態(tài)
mp.ncgo-- // 當(dāng)前 cgo 調(diào)用-1
osPreemptExtExit(mp) // 恢復(fù)異步搶占
exitsyscall() // 退出系統(tǒng)調(diào)用,恢復(fù)調(diào)度器控制
if raceenabled {
raceacquire(unsafe.Pointer(&racecgosync))
}
// 避免 GC 過(guò)早回收
KeepAlive(fn)
KeepAlive(arg)
KeepAlive(mp)
return errno
}其中兩個(gè)主要的方法 entersyscall 和 asmcgocall,接下來(lái)對(duì)這兩個(gè)方法進(jìn)行著重分析。
分析 entersyscall
func entersyscall() {
reentersyscall(getcallerpc(), getcallersp())
}entersyscall 直接調(diào)用的 reentersyscall,關(guān)注下 reentersyscall 注釋中的一段:
// If the syscall does not block, that is it, we do not emit any other events. // If the syscall blocks (that is, P is retaken), retaker emits traceGoSysBlock;
如果 syscall 調(diào)用沒(méi)有阻塞則不會(huì)觸發(fā)任何事件,如果被阻塞 retaker 會(huì)觸發(fā) traceGoSysBlock,那需要了解一下多長(zhǎng)時(shí)間被認(rèn)為是阻塞,先跟到 retaker 方法。
func retake(now int64) uint32 {
n := 0
lock(&allpLock)
for i := 0; i < len(allp); i++ {
pp := allp[i]
if pp == nil {
continue
}
pd := &pp.sysmontick
s := pp.status
sysretake := false
if s == _Prunning || s == _Psyscall {
t := int64(pp.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
preemptone(pp)
sysretake = true
}
}
// 從系統(tǒng)調(diào)用中搶占P
if s == _Psyscall {
// 如果已經(jīng)超過(guò)了一個(gè)系統(tǒng)監(jiān)控的 tick(20us),則從系統(tǒng)調(diào)用中搶占 P
t := int64(pp.syscalltick)
if !sysretake && int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
if runqempty(pp) && sched.nmspinning.Load()+sched.npidle.Load() > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
unlock(&allpLock)
incidlelocked(-1)
if atomic.Cas(&pp.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(pp)
traceProcStop(pp)
}
n++
pp.syscalltick++
handoffp(pp)
}
incidlelocked(1)
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}從上面可以看到系統(tǒng)調(diào)用阻塞 20 多微秒會(huì)被搶占 P,cgo 被迫 handoffp,接下來(lái)分析 handoffp 方法
func handoffp(pp *p) {
// ...
// 沒(méi)有任務(wù)且沒(méi)有自旋和空閑的 M 則需要啟動(dòng)一個(gè)新的 M
if sched.nmspinning.Load()+sched.npidle.Load() == 0 && sched.nmspinning.CompareAndSwap(0, 1) {
sched.needspinning.Store(0)
startm(pp, true)
return
}
// ...
}handoffp 方法會(huì)調(diào)用 startm 來(lái)啟動(dòng)一個(gè)新的 M,跟到 startm 方法。
func startm(pp *p, spinning bool) {
// ...
nmp := mget()
if nmp == nil {
// 沒(méi)有M可用,調(diào)用newm
id := mReserveID()
unlock(&sched.lock)
var fn func()
if spinning {
fn = mspinning
}
newm(fn, pp, id)
releasem(mp)
return
}
// ...
}此時(shí)如果沒(méi)有 M startm 會(huì)調(diào)用 newm 創(chuàng)建一個(gè)新的 M,接下來(lái)分析 newm 方法。
func newm(fn func(), pp *p, id int64) {
acquirem()
mp := allocm(pp, fn, id)
mp.nextp.set(pp)
mp.sigmask = initSigmask
if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {
lock(&newmHandoff.lock)
if newmHandoff.haveTemplateThread == 0 {
throw("on a locked thread with no template thread")
}
mp.schedlink = newmHandoff.newm
newmHandoff.newm.set(mp)
if newmHandoff.waiting {
newmHandoff.waiting = false
notewakeup(&newmHandoff.wake)
}
unlock(&newmHandoff.lock)
releasem(getg().m)
return
}
newm1(mp)
releasem(getg().m)
}
func newm1(mp *m) {
if iscgo {
var ts cgothreadstart
if _cgo_thread_start == nil {
throw("_cgo_thread_start missing")
}
ts.g.set(mp.g0)
ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
ts.fn = unsafe.Pointer(abi.FuncPCABI0(mstart))
if msanenabled {
msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
}
if asanenabled {
asanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
}
execLock.rlock()
// 創(chuàng)建新線程
asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
execLock.runlock()
return
}
execLock.rlock()
newosproc(mp)
execLock.runlock()
}從 newm 看出如果線程都在阻塞中則調(diào)用 newm1,newm1 調(diào)用 _cgo_thread_start 創(chuàng)建新線程。
由以上分析得出當(dāng)高并發(fā)調(diào)用 cgo 且執(zhí)行時(shí)間超過(guò) 20 微秒時(shí)會(huì)創(chuàng)建新線程。
分析 asmcgocall
只分析 amd64
asm_amd64.s
TEXT ·asmcgocall(SB),NOSPLIT,$0-20
MOVQ fn+0(FP), AX
MOVQ arg+8(FP), BX
MOVQ SP, DX
// 考慮是否需要切換到 m.g0 棧
// 也用來(lái)調(diào)用創(chuàng)建新的 OS 線程,這些線程已經(jīng)在 m.g0 棧中了
get_tls(CX)
MOVQ g(CX), DI
CMPQ DI, $0
JEQ nosave
MOVQ g_m(DI), R8
MOVQ m_gsignal(R8), SI
CMPQ DI, SI
JEQ nosave
MOVQ m_g0(R8), SI
CMPQ DI, SI
JEQ nosave
// 切換到系統(tǒng)棧
CALL gosave_systemstack_switch<>(SB)
MOVQ SI, g(CX)
MOVQ (g_sched+gobuf_sp)(SI), SP
// 于調(diào)度棧中(pthread 新創(chuàng)建的棧)
// 確保有足夠的空間給四個(gè) stack-based fast-call 寄存器
// 為使得 windows amd64 調(diào)用服務(wù)
SUBQ $64, SP
ANDQ $~15, SP // 為 gcc ABI 對(duì)齊
MOVQ DI, 48(SP) // 保存 g
MOVQ (g_stack+stack_hi)(DI), DI
SUBQ DX, DI
MOVQ DI, 40(SP) // 保存棧深 (不能僅保存 SP,因?yàn)闂?赡茉诨卣{(diào)時(shí)被復(fù)制)
MOVQ BX, DI // DI = AMD64 ABI 第一個(gè)參數(shù)
MOVQ BX, CX // CX = Win64 第一個(gè)參數(shù)
CALL AX // 調(diào)用 fn
// 恢復(fù)寄存器、 g、棧指針
get_tls(CX)
MOVQ 48(SP), DI
MOVQ (g_stack+stack_hi)(DI), SI
SUBQ 40(SP), SI
MOVQ DI, g(CX)
MOVQ SI, SP
MOVL AX, ret+16(FP)
RET
nosave:
// 在系統(tǒng)棧上運(yùn)行,可能沒(méi)有 g
// 沒(méi)有 g 的情況發(fā)生在線程創(chuàng)建中或線程結(jié)束中(比如 Solaris 平臺(tái)上的 needm/dropm)
// 這段代碼和上面類似,但沒(méi)有保存和恢復(fù) g,且沒(méi)有考慮棧的移動(dòng)問(wèn)題(因?yàn)槲覀冊(cè)谙到y(tǒng)棧上,而非 goroutine 棧)
// 如果已經(jīng)在系統(tǒng)棧上,則上面的代碼可被直接使用,在 Solaris 上會(huì)進(jìn)入下面這段代碼。
// 使用這段代碼來(lái)為所有 "已經(jīng)在系統(tǒng)棧" 的調(diào)用進(jìn)行服務(wù),從而保持正確性。
SUBQ $64, SP
ANDQ $~15, SP // ABI 對(duì)齊
MOVQ $0, 48(SP) // 上面的代碼保存了 g, 確保 debug 時(shí)可用
MOVQ DX, 40(SP) // 保存原始的棧指針
MOVQ BX, DI // DI = AMD64 ABI 第一個(gè)參數(shù)
MOVQ BX, CX // CX = Win64 第一個(gè)參數(shù)
CALL AX
MOVQ 40(SP), SI // 恢復(fù)原來(lái)的棧指針
MOVQ SI, SP
MOVL AX, ret+16(FP)
RET這段就是將當(dāng)前棧移到系統(tǒng)棧去執(zhí)行,因?yàn)?C 需要無(wú)窮大的棧,在 Go 的棧上執(zhí)行 C 函數(shù)會(huì)導(dǎo)致棧溢出。
產(chǎn)生問(wèn)題
cgo 調(diào)用會(huì)將當(dāng)前棧移到系統(tǒng)棧,并且當(dāng) cgo 高并發(fā)調(diào)用且阻塞超過(guò) 20 微秒時(shí)會(huì)新建線程。而 Go 并不會(huì)銷毀線程,由此造成線程增長(zhǎng)。
解決方案
限制 Go 程序最大線程數(shù),默認(rèn)為 cpu 核數(shù)。
runtime.GOMAXPROCS(runtime.NumCPU())
使用 channel 限制 cgo 最大并發(fā)數(shù)為 cpu 核數(shù)
package thread
import "runtime"
var c chan struct{}
func Lock() {
c <- struct{}{}
}
func Unlock() {
<-c
}
func init() {
c = make(chan struct{}, runtime.NumCPU())
}針對(duì)超過(guò) 20 微秒的 cgo 調(diào)用進(jìn)行限制:
thread.Lock() wrapper.TaosFreeResult(result) thread.Unlock()
以上就是源碼分析Go語(yǔ)言使用cgo導(dǎo)致線程增長(zhǎng)的原因的詳細(xì)內(nèi)容,更多關(guān)于Go語(yǔ)言cgo線程增長(zhǎng)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
從淺入深帶你掌握Golang數(shù)據(jù)結(jié)構(gòu)map
在?Go?語(yǔ)言中,map?是一種非常常見(jiàn)的數(shù)據(jù)類型,它可以用于快速地檢索數(shù)據(jù)。本篇文章將介紹?Go?語(yǔ)言中的?map,包括?map?的定義、初始化、操作和優(yōu)化,需要的可以參考一下2023-04-04
Skywalking-go自動(dòng)監(jiān)控增強(qiáng)使用探究
這篇文章主要介紹了Skywalking-go自動(dòng)監(jiān)控增強(qiáng)使用深入探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01
Go語(yǔ)言使用Request,Response處理web頁(yè)面請(qǐng)求
這篇文章主要介紹了Go語(yǔ)言使用Request,Response處理web頁(yè)面請(qǐng)求,需要的朋友可以參考下2022-04-04

