淺談Go連接池的設(shè)計(jì)與實(shí)現(xiàn)
為什么需要連接池
如果不用連接池,而是每次請(qǐng)求都創(chuàng)建一個(gè)連接是比較昂貴的,因此需要完成3次tcp握手
同時(shí)在高并發(fā)場景下,由于沒有連接池的最大連接數(shù)限制,可以創(chuàng)建無數(shù)個(gè)連接,耗盡文件描述符
連接池就是為了復(fù)用這些創(chuàng)建好的連接
連接池設(shè)計(jì)
基本上連接池都會(huì)設(shè)計(jì)以下幾個(gè)參數(shù):
初始連接數(shù):在初始化連接池時(shí)就會(huì)預(yù)先創(chuàng)建好的連接數(shù)量,如果設(shè)置得:
- 過大:可能造成浪費(fèi)
- 過?。赫?qǐng)求到來時(shí)需要新建連接
最大空閑連接數(shù)maxIdle:池中最大緩存的連接個(gè)數(shù),如果設(shè)置得:
- 過大:造成浪費(fèi),自己不用還把持著連接。因?yàn)閿?shù)據(jù)庫整體的連接數(shù)是有限的,當(dāng)前進(jìn)程占用多了,其他進(jìn)程能獲取的就少了
- 過?。簾o法應(yīng)對(duì)突發(fā)流量
最大連接數(shù)maxCap:
- 如果已經(jīng)用了maxCap個(gè)連接,要申請(qǐng)第maxCap+1個(gè)連接時(shí),一般會(huì)阻塞在那里,直到超時(shí)或者別人歸還一個(gè)連接
最大空閑時(shí)間idleTimeout:當(dāng)發(fā)現(xiàn)某連接空閑超過這個(gè)時(shí)間時(shí),會(huì)將其關(guān)閉,重新去獲取連接
避免連接長時(shí)間沒用,自動(dòng)失效的問題
連接池對(duì)外提供兩個(gè)方法,Get:獲取一個(gè)連接,Put:歸還一個(gè)連接
大部分連接池的實(shí)現(xiàn)大同小異,基本流程如下:
Get

需要注意:
- 當(dāng)有空閑連接時(shí),需要進(jìn)一步判斷連接是否有過期(超過最大空閑時(shí)間idleTimeout)
- 這些連接有可能很久沒用過了,在數(shù)據(jù)庫層面已經(jīng)過期。如果貿(mào)然使用可能出現(xiàn)錯(cuò)誤,因此最好檢查下是否超時(shí)
- 當(dāng)陷入阻塞時(shí),最好設(shè)置超時(shí)時(shí)間,避免一直沒等到有人歸還連接而一直阻塞
Put

歸還連接時(shí):
- 先看有沒有阻塞的獲取連接的請(qǐng)求,如果有轉(zhuǎn)交連接,并喚醒阻塞請(qǐng)求
- 否則看能否放回去空閑隊(duì)列,如果不能直接關(guān)閉請(qǐng)求
總結(jié)
根據(jù)上面總結(jié)的流程,連接池還需要維護(hù)另外兩個(gè)結(jié)構(gòu):
- 空閑隊(duì)列
- 阻塞請(qǐng)求的隊(duì)列

開源實(shí)現(xiàn)
接下來看幾個(gè)開源連接池的實(shí)現(xiàn),都大體符合上面介紹的流程
silenceper/pool
代碼地址:https://github.com/silenceper/pool
數(shù)據(jù)結(jié)構(gòu):
// channelPool 存放連接信息
type channelPool struct {
mu sync.RWMutex
// 空閑連接
conns chan *idleConn
// 產(chǎn)生新連接的方法
factory func() (interface{}, error)
// 關(guān)閉連接的方法
close func(interface{}) error
ping func(interface{}) error
// 最大空閑時(shí)間,最大阻塞等待時(shí)間(實(shí)際沒用到)
idleTimeout, waitTimeOut time.Duration
// 最大連接數(shù)
maxActive int
openingConns int
// 阻塞的請(qǐng)求
connReqs []chan connReq
}
可以看出,silenceper/pool:
- 用channel實(shí)現(xiàn)了空閑連接隊(duì)列
conns - 為每個(gè)阻塞的請(qǐng)求創(chuàng)建一個(gè)channel,加入
connReqs中。這樣請(qǐng)求會(huì)阻塞在自己的channel上
Get:
func (c *channelPool) Get() (interface{}, error) {
conns := c.getConns()
if conns == nil {
return nil, ErrClosed
}
for {
select {
// 如果有空閑連接
case wrapConn := <-conns:
if wrapConn == nil {
return nil, ErrClosed
}
//判斷是否超時(shí),超時(shí)則丟棄
if timeout := c.idleTimeout; timeout > 0 {
if wrapConn.t.Add(timeout).Before(time.Now()) {
//丟棄并關(guān)閉該連接
c.Close(wrapConn.conn)
continue
}
}
//判斷是否失效,失效則丟棄,如果用戶沒有設(shè)定 ping 方法,就不檢查
if c.ping != nil {
if err := c.Ping(wrapConn.conn); err != nil {
c.Close(wrapConn.conn)
continue
}
}
return wrapConn.conn, nil
// 沒有空閑連接
default:
c.mu.Lock()
log.Debugf("openConn %v %v", c.openingConns, c.maxActive)
if c.openingConns >= c.maxActive {
// 連接數(shù)已經(jīng)達(dá)到上線,不能再創(chuàng)建連接
req := make(chan connReq, 1)
c.connReqs = append(c.connReqs, req)
c.mu.Unlock()
// 將自己阻塞在channel上
ret, ok := <-req
if !ok {
return nil, ErrMaxActiveConnReached
}
// 再檢查一次是否超時(shí)
if timeout := c.idleTimeout; timeout > 0 {
if ret.idleConn.t.Add(timeout).Before(time.Now()) {
//丟棄并關(guān)閉該連接
c.Close(ret.idleConn.conn)
continue
}
}
return ret.idleConn.conn, nil
}
// 沒有超過最大連接數(shù),創(chuàng)建一個(gè)新的連接
if c.factory == nil {
c.mu.Unlock()
return nil, ErrClosed
}
conn, err := c.factory()
if err != nil {
c.mu.Unlock()
return nil, err
}
c.openingConns++
c.mu.Unlock()
return conn, nil
}
}
}
這段代碼基本符合上面介紹的Get流程,應(yīng)該很好理解
需要注意:
- 當(dāng)收到別人歸還的連接狗,這里再檢查了一次是否超時(shí)。但我認(rèn)為這次檢查是沒必要的,因?yàn)閯e人剛用完,一般不可能超時(shí)
- 雖然在pool的數(shù)據(jù)結(jié)構(gòu)定義中有
waitTimeOut字段,但實(shí)際沒有使用,即阻塞獲取可能無限期阻塞,這是一個(gè)優(yōu)化點(diǎn)
Put:
// Put 將連接放回pool中
func (c *channelPool) Put(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
c.mu.Lock()
if c.conns == nil {
c.mu.Unlock()
return c.Close(conn)
}
// 如果有請(qǐng)求在阻塞獲取連接
if l := len(c.connReqs); l > 0 {
req := c.connReqs[0]
copy(c.connReqs, c.connReqs[1:])
c.connReqs = c.connReqs[:l-1]
// 將連接轉(zhuǎn)交
req <- connReq{
idleConn: &idleConn{conn: conn, t: time.Now()},
}
c.mu.Unlock()
return nil
} else {
// 否則嘗試是否能放回空閑連接隊(duì)列
select {
case c.conns <- &idleConn{conn: conn, t: time.Now()}:
c.mu.Unlock()
return nil
default:
c.mu.Unlock()
//連接池已滿,直接關(guān)閉該連接
return c.Close(conn)
}
}
}
值得注意的是:
put方法喚醒阻塞請(qǐng)求時(shí),從隊(duì)頭開始喚醒,這樣先阻塞的請(qǐng)求先被喚醒,保證了公平性
sql.DB
Go在官方庫sql中就實(shí)現(xiàn)了連接池,這樣的好處在于:
- 對(duì)于開發(fā):就不用像java一樣,需要自己找第三方的連接池實(shí)現(xiàn)
- 對(duì)于driver的實(shí)現(xiàn):只用關(guān)心怎么和數(shù)據(jù)庫交互,不用考慮連接池的問題
sql.DB中和連接池相關(guān)的字段如下:
type DB struct {
/**
...
*/
// 空閑連接隊(duì)列
freeConn []*driverConn
// 阻塞請(qǐng)求的隊(duì)列
connRequests map[uint64]chan connRequest
// 已經(jīng)打開的連接
numOpen int // number of opened and pending open connections
// 最大空閑連接
maxIdle int // zero means defaultMaxIdleConns; negative means 0
// 最大連接數(shù)
maxOpen int // <= 0 means unlimited
// ...
}
繼續(xù)看獲取連接:
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
// 檢測連接池是否被關(guān)閉
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
select {
default:
// 檢測ctx是否超時(shí)
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
}
lifetime := db.maxLifetime
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
}
接下來檢測是否有空閑連接:
numFree := len(db.freeConn)
// 如果有空閑連接
if strategy == cachedOrNewConn && numFree > 0 {
// 從隊(duì)頭取一個(gè)
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// Reset the session if required.
if err := conn.resetSession(ctx); err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
以上代碼是1.14版本,但是到了1.18以后,獲取空閑連接的方式發(fā)生了變化:
last := len(db.freeConn) - 1
if strategy == cachedOrNewConn && last >= 0 {
// 從最后一個(gè)位置獲取連接
conn := db.freeConn[last]
db.freeConn = db.freeConn[:last]
conn.inUse = true
if conn.expired(lifetime) {
db.maxLifetimeClosed++
db.mu.Unlock()
conn.Close()
return nil, driver.ErrBadConn
}
可以看出,1.14版本從隊(duì)首獲取,1.18改成從隊(duì)尾獲取連接
為啥從隊(duì)尾拿連接?
因?yàn)殛?duì)尾的連接是才放進(jìn)去的,該連接過期的概率比隊(duì)首連接小
繼續(xù)看:
// 如果已經(jīng)達(dá)到最大連接數(shù)
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()
waitStart := time.Now()
// 阻塞當(dāng)前請(qǐng)求,要么ctx超時(shí),要么別人歸還了連接
select {
case <-ctx.Done():
db.mu.Lock()
// 把自己從阻塞隊(duì)列中刪除
delete(db.connRequests, reqKey)
db.mu.Unlock()
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
select {
default:
case ret, ok := <-req:
if ok && ret.conn != nil {
db.putConn(ret.conn, ret.err, false)
}
}
return nil, ctx.Err()
case ret, ok := <-req:
// 別人歸還連接
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
if !ok {
return nil, errDBClosed
}
if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
return ret.conn, ret.err
}
}
這里需要注意,在ctx超時(shí)分支中:
- 首先把自己從阻塞隊(duì)列中刪除
- 再檢查一下req中是否有連接,如果有,將連接放回連接池
奇怪的是為啥把自己刪除后,req還可能收到連接呢?
因?yàn)?code>put連接時(shí),會(huì)先拿出一個(gè)阻塞連接的req,如果這里刪除req在put拿出req:
- 之前:那沒問題,put不可能再放該req發(fā)送連接
- 之后:那有可能put往該req發(fā)送了連接,因此需要再檢查下req中是否有連接,如果有歸還
也解釋了為啥阻塞隊(duì)列要用map:
- 用于快速找到自己的req,并刪除
最后看看put:
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
// 有阻塞的請(qǐng)求,轉(zhuǎn)移連接
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // Remove from pending requests.
if err == nil {
dc.inUse = true
}
req <- connRequest{
conn: dc,
err: err,
}
return true
// 判斷能否放回空閑隊(duì)列
} else if err == nil && !db.closed {
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
db.maxIdleClosed++
}
return false
}
到此這篇關(guān)于淺談Go連接池的設(shè)計(jì)與實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Go連接池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
5個(gè)可以在Golang中優(yōu)化代碼以提高性能的技巧分享
作為一名軟件工程師,確保你的代碼高效且性能良好是非常重要的。本文主要和大家分享5個(gè)可以在Golang中優(yōu)化代碼以提高性能的技巧,希望對(duì)大家有所幫助2023-03-03
詳解Go語言如何利用上下文進(jìn)行并發(fā)計(jì)算
在Go編程中,上下文(context)是一個(gè)非常重要的概念,它包含了與請(qǐng)求相關(guān)的信息,本文主要來和大家討論一下如何在并發(fā)計(jì)算中使用上下文,感興趣的可以了解下2024-02-02
go-micro使用Consul做服務(wù)發(fā)現(xiàn)的方法和原理解析
這篇文章主要介紹了go-micro使用Consul做服務(wù)發(fā)現(xiàn)的方法和原理,這里提供一個(gè)通過docker快速安裝Consul的方式,當(dāng)然前提是你得安裝了docker,需要的朋友可以參考下2022-04-04
golang中defer執(zhí)行時(shí)機(jī)的案例分析
這篇文章主要來通過一些案例和大家一起探討一下golang中defer的執(zhí)行時(shí)機(jī),文中的示例代碼講解詳細(xì),對(duì)我們深入了解golang有一定的幫助,感興趣的可以跟隨小編一起學(xué)習(xí)一下2023-11-11

