一文詳解GO如何實(shí)現(xiàn)Redis的AOF持久化
GO實(shí)現(xiàn)Redis的AOF持久化
將用戶(hù)發(fā)來(lái)的指令以RESP協(xié)議的形式存儲(chǔ)在本地的AOF文件,重啟Redis后執(zhí)行此文件恢復(fù)數(shù)據(jù)
https://github.com/csgopher/go-redis
本文涉及以下文件:redis.conf:配置文件
aof:實(shí)現(xiàn)aof
redis.conf
appendonly yes
appendfilename appendonly.aof
aof/aof.go
type CmdLine = [][]byte
const (
aofQueueSize = 1 << 16
)
type payload struct {
cmdLine CmdLine
dbIndex int
}
type AofHandler struct {
db databaseface.Database
aofChan chan *payload
aofFile *os.File
aofFilename string
currentDB int
}
func NewAOFHandler(db databaseface.Database) (*AofHandler, error) {
handler := &AofHandler{}
handler.aofFilename = config.Properties.AppendFilename
handler.db = db
handler.LoadAof()
aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, err
}
handler.aofFile = aofFile
handler.aofChan = make(chan *payload, aofQueueSize)
go func() {
handler.handleAof()
}()
return handler, nil
}
func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {
if config.Properties.AppendOnly && handler.aofChan != nil {
handler.aofChan <- &payload{
cmdLine: cmdLine,
dbIndex: dbIndex,
}
}
}
func (handler *AofHandler) handleAof() {
handler.currentDB = 0
for p := range handler.aofChan {
if p.dbIndex != handler.currentDB {
// select db
data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
_, err := handler.aofFile.Write(data)
if err != nil {
logger.Warn(err)
continue
}
handler.currentDB = p.dbIndex
}
data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()
_, err := handler.aofFile.Write(data)
if err != nil {
logger.Warn(err)
}
}
}
func (handler *AofHandler) LoadAof() {
file, err := os.Open(handler.aofFilename)
if err != nil {
logger.Warn(err)
return
}
defer file.Close()
ch := parser.ParseStream(file)
fakeConn := &connection.Connection{}
for p := range ch {
if p.Err != nil {
if p.Err == io.EOF {
break
}
logger.Error("parse error: " + p.Err.Error())
continue
}
if p.Data == nil {
logger.Error("empty payload")
continue
}
r, ok := p.Data.(*reply.MultiBulkReply)
if !ok {
logger.Error("require multi bulk reply")
continue
}
ret := handler.db.Exec(fakeConn, r.Args)
if reply.IsErrorReply(ret) {
logger.Error("exec err", err)
}
}
}
- AofHandler:1.從管道中接收數(shù)據(jù) 2.寫(xiě)入AOF文件
- AddAof:用戶(hù)的指令包裝成payload放入管道
- handleAof:將管道中的payload寫(xiě)入磁盤(pán)
- LoadAof:重啟Redis后加載aof文件
database/database.go
type Database struct {
dbSet []*DB
aofHandler *aof.AofHandler
}
func NewDatabase() *Database {
mdb := &Database{}
if config.Properties.Databases == 0 {
config.Properties.Databases = 16
}
mdb.dbSet = make([]*DB, config.Properties.Databases)
for i := range mdb.dbSet {
singleDB := makeDB()
singleDB.index = i
mdb.dbSet[i] = singleDB
}
if config.Properties.AppendOnly {
aofHandler, err := aof.NewAOFHandler(mdb)
if err != nil {
panic(err)
}
mdb.aofHandler = aofHandler
for _, db := range mdb.dbSet {
singleDB := db
singleDB.addAof = func(line CmdLine) {
mdb.aofHandler.AddAof(singleDB.index, line)
}
}
}
return mdb
}
將AOF加入到database里
使用singleDB的原因:因?yàn)樵谘h(huán)中獲取返回變量的地址都完全相同,因此當(dāng)我們想要訪(fǎng)問(wèn)數(shù)組中元素所在的地址時(shí),不應(yīng)該直接獲取 range 返回的變量地址 db,而應(yīng)該使用 singleDB := db
database/db.go
type DB struct {
index int
data dict.Dict
addAof func(CmdLine)
}
func makeDB() *DB {
db := &DB{
data: dict.MakeSyncDict(),
addAof: func(line CmdLine) {},
}
return db
}
由于分?jǐn)?shù)據(jù)庫(kù)db引用不到aof,所以添加一個(gè)addAof匿名函數(shù),在NewDatabase中用這個(gè)匿名函數(shù)調(diào)用AddAof
database/keys.go
func execDel(db *DB, args [][]byte) resp.Reply {
......
if deleted > 0 {
db.addAof(utils.ToCmdLine2("del", args...))
}
return reply.MakeIntReply(int64(deleted))
}
func execFlushDB(db *DB, args [][]byte) resp.Reply {
db.Flush()
db.addAof(utils.ToCmdLine2("flushdb", args...))
return &reply.OkReply{}
}
func execRename(db *DB, args [][]byte) resp.Reply {
......
db.addAof(utils.ToCmdLine2("rename", args...))
return &reply.OkReply{}
}
func execRenameNx(db *DB, args [][]byte) resp.Reply {
......
db.addAof(utils.ToCmdLine2("renamenx", args...))
return reply.MakeIntReply(1)
}
database/string.go
func execSet(db *DB, args [][]byte) resp.Reply {
......
db.addAof(utils.ToCmdLine2("set", args...))
return &reply.OkReply{}
}
func execSetNX(db *DB, args [][]byte) resp.Reply {
......
db.addAof(utils.ToCmdLine2("setnx", args...))
return reply.MakeIntReply(int64(result))
}
func execGetSet(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
value := args[1]
entity, exists := db.GetEntity(key)
db.PutEntity(key, &database.DataEntity{Data: value})
db.addAof(utils.ToCmdLine2("getset", args...))
......
}
添加addAof方法
測(cè)試命令
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n
*2\r\n$6\r\nSELECT\r\n$1\r\n1\r\n
到此這篇關(guān)于一文詳解GO如何實(shí)現(xiàn)Redis的AOF持久化的文章就介紹到這了,更多相關(guān)GO實(shí)現(xiàn)Redis AOF持久化內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go string轉(zhuǎn)int,int64,int32及注意事項(xiàng)說(shuō)明
這篇文章主要介紹了Go string轉(zhuǎn)int,int64,int32及注意事項(xiàng)說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07
Go中阻塞以及非阻塞操作實(shí)現(xiàn)(Goroutine和main Goroutine)
本文主要介紹了Go中阻塞以及非阻塞操作實(shí)現(xiàn)(Goroutine和main Goroutine),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-05-05
Golang實(shí)現(xiàn)將視頻按照時(shí)間維度剪切的工具
這篇文章主要為大家詳細(xì)介紹了如何利用Golang實(shí)現(xiàn)將視頻按照時(shí)間維度進(jìn)行剪切,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起了解一下2022-12-12
Golang實(shí)現(xiàn)常見(jiàn)的限流算法的示例代碼
限流是項(xiàng)目中經(jīng)常需要使用到的一種工具,一般用于限制用戶(hù)的請(qǐng)求的頻率,也可以避免瞬間流量過(guò)大導(dǎo)致系統(tǒng)崩潰,或者穩(wěn)定消息處理速率,本文主要介紹了使用Go實(shí)現(xiàn)常見(jiàn)的限流算法,希望對(duì)大家有所幫助2023-04-04
golang?四則運(yùn)算計(jì)算器yacc歸約手寫(xiě)實(shí)現(xiàn)
這篇文章主要為大家介紹了golang?四則運(yùn)算?計(jì)算器?yacc?歸約的手寫(xiě)實(shí)現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07
GO語(yǔ)言并發(fā)編程之互斥鎖、讀寫(xiě)鎖詳解
這篇文章主要介紹了GO語(yǔ)言并發(fā)編程之互斥鎖、讀寫(xiě)鎖詳解,本文是GO并發(fā)編程實(shí)戰(zhàn)一書(shū)的樣章,詳細(xì)講解了互斥鎖、讀寫(xiě)鎖,然后給出了一個(gè)完整示例,需要的朋友可以參考下2014-11-11

