golang sql語句超時控制方案及原理
一般應(yīng)用程序在執(zhí)行一條sql語句時,都會給這條sql設(shè)置一個超時時間,如果到超時時間還未執(zhí)行完,則直接終止sql,釋放資源,返回錯誤。這里主要討論一下在golang+mysql的場景下,對sql語句進行超時控制的具體做法、實現(xiàn)原理以及對連接池連接數(shù)產(chǎn)生的影響。
基于context實現(xiàn)sql語句的超時控制:
使用context進行超時控制是golang的標準做法,可以說當一個函數(shù)第一個參數(shù)是ctx context.Context時,這個函數(shù)就應(yīng)該做出承諾,在收到ctx的取消信號時應(yīng)該提前終止該函數(shù)的執(zhí)行,并釋放資源。目前后端應(yīng)用程序操作數(shù)據(jù)庫時比較常用的做法是使用gorm框架,這個框架主要是起到sql拼接和屏蔽底層數(shù)據(jù)庫差異的作用,本身并沒有提供連接池以及mysql的client端驅(qū)動程序,連接池默認使用的是database/sql標準庫提供的連接池,驅(qū)動程序使用的是go-sql-driver/mysql。故要分析如何基于context進行超時控制,需要從這三層進行分析。
對于gorm,想要對一個sql進行超時控制,可以直接使用WithContext()方法,具體如下:
func main() {
ctx := context.TODO()
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
err := db.WithContext(ctx).Exec("select sleep(10)").Error
if err != nil {
log.Fatal(err)
}
}
?
// output
// [3001.379ms] [rows:0] select sleep(10)
// 2023/12/17 13:31:54 context deadline exceeded這里將ctx的超時時間設(shè)置為3s,同時sql語句為sleep 10s,最終在執(zhí)行時間到3s時,返回了context deadline exceeded錯誤。
gorm調(diào)用WithContext之后,最終會將這個ctx給到database/sql連接池的ExecContext函數(shù)之中,
func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error) {
var res Result
var err error
?
err = db.retry(func(strategy connReuseStrategy) error {
res, err = db.exec(ctx, query, args, strategy)
return err
})
?
return res, err
}該函數(shù)會從連接池中取出一個連接,然后由數(shù)據(jù)庫驅(qū)動層實際執(zhí)行sql,
func (mc *mysqlConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
dargs, err := namedValueToValue(args)
if err != nil {
return nil, err
}
// 這里是核心代碼,將ctx放入監(jiān)聽隊列
if err := mc.watchCancel(ctx); err != nil {
return nil, err
}
defer mc.finish()
?
return mc.Exec(query, dargs)
}go-sql-drive/mysql在實際和mysql server端通信之前,會調(diào)用watchCancel,監(jiān)聽當前ctx的取消信號,保證sql執(zhí)行過程中,能夠立刻收到取消信號,并做出sql取消的操作。watchCancel函數(shù)具體實現(xiàn)如下:
func (mc *mysqlConn) watchCancel(ctx context.Context) error {
if mc.watching {
// Reach here if canceled,
// so the connection is already invalid
mc.cleanup()
return nil
}
// When ctx is already cancelled, don't watch it.
if err := ctx.Err(); err != nil {
return err
}
// When ctx is not cancellable, don't watch it.
if ctx.Done() == nil {
return nil
}
// When watcher is not alive, can't watch it.
if mc.watcher == nil {
return nil
}
?
// 在正常情況下會走到這里,將ctx放入到一個watcher管道
mc.watching = true
mc.watcher <- ctx
return nil
}可以看到核心代碼是將這個ctx放入到一個管道,那么必定有一段程序是監(jiān)聽這個管道的,實際上是如下代碼:
func (mc *mysqlConn) startWatcher() {
watcher := make(chan context.Context, 1)
mc.watcher = watcher
finished := make(chan struct{})
mc.finished = finished
go func() {
for {
var ctx context.Context
select {
case ctx = <-watcher:
case <-mc.closech:
return
}
?
select {
// 在這里監(jiān)聽了ctx取消信號,并實際執(zhí)行cancel操作
case <-ctx.Done():
mc.cancel(ctx.Err())
case <-finished:
case <-mc.closech:
return
}
}
}()
}這個startWatcher函數(shù)內(nèi)部會單獨啟動一個協(xié)程,監(jiān)聽本連接的watcher管道,針對于每一個從管道中取出的ctx,監(jiān)聽其取消信號是否結(jié)束,同一個連接上的sql語句肯定是依次執(zhí)行的,這樣依次監(jiān)聽每一個ctx是不會有什么問題的,而這個startWatcher會在連接創(chuàng)建的時候調(diào)用,保證后續(xù)這個連接上的每個語句添加的ctx都會被監(jiān)聽。
如果真的監(jiān)聽到取消信號,就會調(diào)用cancel函數(shù)進行取消,
// finish is called when the query has canceled.
func (mc *mysqlConn) cancel(err error) {
mc.canceled.Set(err)
mc.cleanup()
}
?
func (mc *mysqlConn) cleanup() {
if !mc.closed.TrySet(true) {
return
}
?
// Makes cleanup idempotent
close(mc.closech)
if mc.netConn == nil {
return
}
// 核心代碼如下,關(guān)閉了通信所使用的TCP連接
if err := mc.netConn.Close(); err != nil {
errLog.Print(err)
}
}最終在收到取消信號時,會關(guān)閉和mysql server進行通信的TCP連接。
基于DSN中的readTimeout和writeTimeout實現(xiàn)sql語句的超時控制:
還有一種方法是在打開一個db對象的dsn中指定,具體做法如下:
func init() {
dsn := "root:12345678@tcp(localhost:3306)/test?charset=utf8mb4&parseTime=True&loc=Local&timeout=1500ms&readTimeout=3s&writeTimeout=3s"
var err error
db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatalln(err)
}
db.Logger.LogMode(logger.Info)
}
?
func main() {
ctx := context.TODO()
err := db.WithContext(ctx).Exec("select sleep(10)").Error
if err != nil {
log.Fatal(err)
}
}
?
// output
// [3002.597ms] [rows:0] select sleep(10)
// 2023/12/17 14:21:11 invalid connection在dsn中指定readTimeout=3s&writeTimeout=3s,同時執(zhí)行一個sleep(10),同樣可以在第三秒時報錯,但報錯會有一點點奇怪,invalid connection,看起來好像和超時沒有啥關(guān)系,這是因為這兩個超時時間的含義其實是針對于mysql底層使用的TCP連接而言的,即readTimeout是從TCP連接中讀取一個數(shù)據(jù)包的超時時間,writeTimeout是向一個TCP連接寫入一個數(shù)據(jù)包的超時時間,并且這個超時是基于連接的deadline實現(xiàn)的,所以一旦超時就會認為這個連接是異常的,最終返回這樣一個連接異常的報錯。
具體的實現(xiàn)原理仍然是在go-sql-driver/mysql中,在創(chuàng)建連接時,會處理這兩個timeout,
...
?
// 這就是上面說的啟動監(jiān)聽ctx的邏輯
mc.startWatcher()
if err := mc.watchCancel(ctx); err != nil {
mc.cleanup()
return nil, err
}
defer mc.finish()
?
mc.buf = newBuffer(mc.netConn)
?
// 在解析了dsn之后,就會將兩個timeout賦值給核心連接對象的兩個屬性
mc.buf.timeout = mc.cfg.ReadTimeout
mc.writeTimeout = mc.cfg.WriteTimeout
?
...在實際和mysql server進行通信時,會用到這兩個屬性,
func (b *buffer) readNext(need int) ([]byte, error) {
if b.length < need {
// refill
if err := b.fill(need); err != nil {
return nil, err
}
}
?
offset := b.idx
b.idx += need
b.length -= need
return b.buf[offset:b.idx], nil
}
?
// fill reads into the buffer until at least _need_ bytes are in it
func (b *buffer) fill(need int) error {
...go
for {
// 若timeout>0,則基于這個超時時間,給連接設(shè)置一個新的deadline
if b.timeout > 0 {
if err := b.nc.SetReadDeadline(time.Now().Add(b.timeout)); err != nil {
return err
}
}
?
nn, err := b.nc.Read(b.buf[n:])
n += nn
?
switch err {
case nil:
if n < need {
continue
}
b.length = n
return nil
?
case io.EOF:
if n >= need {
b.length = n
return nil
}
return io.ErrUnexpectedEOF
?
default:
return err
}
}
}
在每次需要從mysql server獲取數(shù)據(jù)的時候,都會給這次讀操作設(shè)置一個deadline,具體的時間就是當前時間+timeout值,這樣每次從server端讀取數(shù)據(jù)的時候,一旦超出這個時間,就會報一個io timeout錯誤,而上游再收到這個錯誤之后,則會進行如下處理:
data, err = mc.buf.readNext(pktLen)
if err != nil {
if cerr := mc.canceled.Value(); cerr != nil {
return nil, cerr
}
errLog.Print(err)
// 關(guān)閉當前連接
mc.Close()
// 返回invalid connection錯誤
return nil, ErrInvalidConn
}首先將這條連接關(guān)閉,之后返回了invalid connection錯誤,這也就是為什么上面例子中超時的報錯是invalid connection。核心需要關(guān)注的還是Close方法,這里是超時后續(xù)的處理:
func (mc *mysqlConn) Close() (err error) {
// Makes Close idempotent
if !mc.closed.IsSet() {
// 向mysql server發(fā)送quit命令,表明自己要退出了
err = mc.writeCommandPacket(comQuit)
}
?
// 調(diào)用cleanup,和上面監(jiān)聽ctx取消信號后的操作是一致的
mc.cleanup()
?
return
}首先發(fā)送一個quit命令,告知自己需要退出,之后也使用cleanup方法,關(guān)閉tcp連接??梢钥吹竭@里的超時控制邏輯和基于ctx的對比,基本是一致的,就是目前這種方案還給server端發(fā)送了一個quit指令,從外部使用上看似乎加不加這個指令效果都是一樣的,只要連接關(guān)閉了mysql server端就可以回收自己的資源(不一定能立刻回收,但最終會回收)。我查找了一些資料和mysql的官方文檔,并沒有找到如果不發(fā)送quit指令,直接關(guān)閉tcp連接會有什么影響,我也沒有研究過mysql的源碼,如果有人知道的話,還請不吝賜教。但我想應(yīng)該沒有太大問題,要不然基于ctx的超時控制早就出問題了。
sql語句超時時連接池如何處理:
以上兩種sql超時的方案我個人覺得都沒有什么問題,底層最終面對超時時所做的操作也基本一致(關(guān)閉TCP連接),我個人更喜歡基于ctx的方案,畢竟ctx設(shè)計之初就是用來做這件事的,也可以和其他場景下的超時控制保持一致,報錯信息也更友好一些。接下來需要考慮的就是一旦底層出現(xiàn)報錯,連接被關(guān)閉,上層的連接池是如何處理的,
func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []any) (res Result, err error) {
defer func() {
// 核心代碼在這里,執(zhí)行完sql之后需要釋放當前連接,釋放時會基于err是否為nil做出處理
release(err)
}()
execerCtx, ok := dc.ci.(driver.ExecerContext)
var execer driver.Execer
if !ok {
execer, ok = dc.ci.(driver.Execer)
}
if ok {
var nvdargs []driver.NamedValue
var resi driver.Result
withLock(dc, func() {
nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
if err != nil {
return
}
// 驅(qū)動層實際進行查詢
resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs)
})
if err != driver.ErrSkip {
if err != nil {
return nil, err
}
return driverResult{dc, resi}, nil
}
}
...
}exec執(zhí)行完之后,會釋放該連接,將其放回連接池,以供其他查詢使用,放回連接池時會依據(jù)本條sql是否有錯誤進行處理,release函數(shù)時注入進來的,實際上是releaseConn函數(shù),該函數(shù)內(nèi)部調(diào)用了putConn函數(shù),
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
if !errors.Is(err, driver.ErrBadConn) {
// 這里判斷了一下連接是不是已經(jīng)不可用了,若已經(jīng)不可用則將err賦值為ErrBadConn
if !dc.validateConnection(resetSession) {
err = driver.ErrBadConn
}
}
db.mu.Lock()
if !dc.inUse {
db.mu.Unlock()
if debugGetPut {
fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
}
panic("sql: connection returned that was never out")
}
// 若連接已到最大生存時間,也要標記連接已經(jīng)不可用
if !errors.Is(err, driver.ErrBadConn) && dc.expired(db.maxLifetime) {
db.maxLifetimeClosed++
err = driver.ErrBadConn
}
if debugGetPut {
db.lastPut[dc] = stack()
}
dc.inUse = false
dc.returnedAt = nowFunc()
?
for _, fn := range dc.onPut {
fn()
}
dc.onPut = nil
// 若連接不可用,進行如下處理
if errors.Is(err, driver.ErrBadConn) {
// 有一個連接被關(guān)閉,考慮打開一個新的連接
db.maybeOpenNewConnections()
db.mu.Unlock()
// 關(guān)閉該連接
dc.Close()
return
}
if putConnHook != nil {
putConnHook(db, dc)
}
// sql執(zhí)行正常,或者有一些錯誤但連接是正常的,會正常的歸還連接
added := db.putConnDBLocked(dc, nil)
db.mu.Unlock()
?
if !added {
dc.Close()
return
}
}整體而言,在sql執(zhí)行出現(xiàn)異常時,會判斷一下連接是否可用,這個判斷也是于驅(qū)動層完成,驅(qū)動層實現(xiàn)了如下方法:
// IsValid implements driver.Validator interface
// (From Go 1.15)
func (mc *mysqlConn) IsValid() bool {
return !mc.closed.IsSet()
}用于告知連接池這個連接是否還正常,而在sql執(zhí)行超時最后調(diào)用的cleanup方法里,首先就是標記這個連接已經(jīng)不可用了
func (mc *mysqlConn) cleanup() {
// 標記連接不可用
if !mc.closed.TrySet(true) {
return
}
}在判斷連接異常,或者超出最大生存時間之后,就是調(diào)用連接池的Close方法,注意是連接池的Close,不是驅(qū)動層的Close,這個Close最終會調(diào)用到finalClose。
func (dc *driverConn) finalClose() error {
var err error
var openStmt []*driverStmt
withLock(dc, func() {
openStmt = make([]*driverStmt, 0, len(dc.openStmt))
for ds := range dc.openStmt {
openStmt = append(openStmt, ds)
}
dc.openStmt = nil
})
for _, ds := range openStmt {
ds.Close()
}
withLock(dc, func() {
// 這里調(diào)用驅(qū)動層進行連接的關(guān)閉
dc.finalClosed = true
err = dc.ci.Close()
dc.ci = nil
})
?
dc.db.mu.Lock()
// 當前打開連接數(shù)減一
dc.db.numOpen--
dc.db.maybeOpenNewConnections()
dc.db.mu.Unlock()
?
dc.db.numClosed.Add(1)
return err
}這個方法主要是做兩件事,一是在驅(qū)動層實際關(guān)閉連接,這主要是針對達到最大生存時間的連接,對于sql執(zhí)行超時這種本身就已經(jīng)關(guān)閉了的連接是不會再關(guān)閉一次的,TrySet會執(zhí)行不成功,后面TCP鏈接關(guān)閉的操作是不會繼續(xù)執(zhí)行的。關(guān)閉連接之后,讓當前打開的連接數(shù)減一,從而保證可以正常打開新的連接。
有可能帶來的問題:
綜上所述,這兩種超時控制的方法實現(xiàn)原理雖然有所區(qū)別,但在發(fā)現(xiàn)超時后做的事情是一致的,都是關(guān)閉該連接,并且讓連接池打開連接數(shù)量減一,這其實存在著一個問題,因為client端雖然正常調(diào)了Close,認為連接已經(jīng)關(guān)閉了,但其實mysql server端在非sleep狀態(tài)下是感知不到連接關(guān)閉的消息的,一種具體的情況就是比如mysql的某個連接正在執(zhí)行一個耗時的查詢,但是這時到了超時時間,client主動關(guān)閉了連接,但是mysql server端是不會立刻終止查詢并關(guān)閉連接的,show processlist時,仍然能看到連接中的sql還在正常執(zhí)行。其實觀察TCP連接的狀態(tài)也能看到這一現(xiàn)象,在雙方正常通信時,狀態(tài)為
tcp6 0 0 ::1.3306 ::1.61351 ESTABLISHED
tcp6 0 0 ::1.61351 ::1.3306 ESTABLISHED
在client端主動調(diào)用Close之后,server端由于要執(zhí)行當前sql會一直保持在CLOSE_WAIT狀態(tài),client端進入FIN_WAIT_2狀態(tài),直到server端在sql執(zhí)行完成后才會進行后續(xù)的揮手過程,才能真正關(guān)閉連接。
tcp6 5 0 ::1.3306 ::1.61351 CLOSE_WAIT
tcp6 0 0 ::1.61351 ::1.3306 FIN_WAIT_2
問題就在于server端連接還未關(guān)閉,但連接池那邊連接數(shù)已經(jīng)減一了,后續(xù)可以創(chuàng)建新的連接了,這就導(dǎo)致mysql server端的連接數(shù)是會高于連接池的最大連接數(shù)的,如果超時的sql很多,很有可能導(dǎo)致連接數(shù)超出連接池最大連接數(shù)限制達到mysql server端的最大連接數(shù),后續(xù)新的連接將無法建立,直接返回too many connectios錯誤,如果這些連接執(zhí)行的sql又真的很慢,或者發(fā)生死鎖,可能會出現(xiàn)mysql較長時間直接拒絕服務(wù)的情況。這表明在生產(chǎn)環(huán)境下盡量不要將mysql的max connectios參數(shù)設(shè)置的和數(shù)據(jù)庫最大連接數(shù)比較接近,還是要留出一定的余量,避免在出現(xiàn)很多sql超時時這部分泄露的連接直接將mysql連接數(shù)打滿,導(dǎo)致數(shù)據(jù)庫出現(xiàn)不可用。
到此這篇關(guān)于golang sql語句超時控制方案及原理的文章就介紹到這了,更多相關(guān)golang sql超時控制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
十個Golang開發(fā)中應(yīng)該避免的錯誤總結(jié)
Go是一種靜態(tài)類型的、并發(fā)的、垃圾收集的編程語言,由谷歌開發(fā)。開發(fā)人員在編寫Go代碼時總會有一些常見的錯誤,下面是Go語言中需要避免的十大壞錯誤,希望對大家有所幫助2023-03-03
GoFrame框架garray并發(fā)安全數(shù)組使用開箱體驗
這篇文章主要介紹了GoFrame框架garray并發(fā)安全數(shù)組使用開箱體驗,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-06-06
go?doudou開發(fā)單體RESTful服務(wù)快速上手教程
這篇文章主要為大家介紹了go?doudou開發(fā)單體RESTful服務(wù)快速上手教程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-12-12
使用Golang的channel交叉打印兩個數(shù)組的操作
這篇文章主要介紹了使用Golang的channel交叉打印兩個數(shù)組的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-04-04

