Golang中tinyrpc框架的源碼解讀詳解
tinyrpc是一個高性能的基于protocol buffer的rpc框架。項目代碼非常少,很適合初學(xué)者進行g(shù)olang的學(xué)習(xí)。
tinyrpc功能
tinyrpc基于TCP協(xié)議,支持各種壓縮格式,基于protocol buffer的序列化協(xié)議。其rpc是基于golang原生的net/rpc開發(fā)而成。
tinyrpc項目結(jié)構(gòu)
tinyrpc基于net/rpc開發(fā)而成,在此基礎(chǔ)上集成了額外的能力。項目結(jié)構(gòu)如圖:

功能目錄如下:
- codec 編碼模塊
- compressor 壓縮模塊
- header 請求/響應(yīng)頭模塊
- protoc-gen-tinyrpc 代碼生成插件
- serializer 序列化模塊
tinyrpc源碼解讀
客戶端和服務(wù)端構(gòu)建
客戶端是以net/rpc的rpc.Client為基礎(chǔ)構(gòu)建,在此基礎(chǔ)上定義了Option以配置壓縮方式和序列化方式:
type Option func(o *options)
type options struct {
compressType compressor.CompressType
serializer serializer.Serializer
}在創(chuàng)建客戶端的時候?qū)⑴渲煤玫膲嚎s算法和序列化方式作為創(chuàng)建客戶端的參數(shù):
func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client {
options := options{
compressType: compressor.Raw,
serializer: serializer.Proto,
}
for _, option := range opts {
option(&options)
}
return &Client{rpc.NewClientWithCodec(
codec.NewClientCodec(conn, options.compressType, options.serializer))}
}服務(wù)端是以net/rpc的rpc.Server為基礎(chǔ)構(gòu)建,在此基礎(chǔ)上擴展了Server的定義:
type Server struct {
*rpc.Server
serializer.Serializer
}在創(chuàng)建客戶端和開啟服務(wù)時傳入序列化方式:
func NewServer(opts ...Option) *Server {
options := options{
serializer: serializer.Proto,
}
for _, option := range opts {
option(&options)
}
return &Server{&rpc.Server{}, options.serializer}
}
func (s *Server) Serve(lis net.Listener) {
log.Printf("tinyrpc started on: %s", lis.Addr().String())
for {
conn, err := lis.Accept()
if err != nil {
continue
}
go s.Server.ServeCodec(codec.NewServerCodec(conn, s.Serializer))
}
}壓縮算法compressor
壓縮算法的實現(xiàn)中首先是定義了壓縮的接口:
type Compressor interface {
Zip([]byte) ([]byte, error)
Unzip([]byte) ([]byte, error)
}壓縮的接口包含壓縮和解壓方法。
壓縮算法使用的是uint類型,使用iota來初始化,并且使用map來進行所有壓縮算法實現(xiàn)的管理:
type CompressType uint16
const (
Raw CompressType = iota
Gzip
Snappy
Zlib
)
// Compressors which supported by rpc
var Compressors = map[CompressType]Compressor{
Raw: RawCompressor{},
Gzip: GzipCompressor{},
Snappy: SnappyCompressor{},
Zlib: ZlibCompressor{},
}序列化 serializer
序列化部分代碼非常簡單,提供了一個接口:
type Serializer interface {
Marshal(message interface{}) ([]byte, error)
Unmarshal(data []byte, message interface{}) error
}目前只有ProtoSerializer一個實現(xiàn),ProtoSerializer內(nèi)部的實現(xiàn)是基于"google.golang.org/protobuf/proto"來實現(xiàn)的,并沒有什么特殊的處理,因此就不花費筆墨詳述了。
請求/響應(yīng)頭 header
tinyrpc定義了自己的請求頭和響應(yīng)頭:
// RequestHeader request header structure looks like:
// +--------------+----------------+----------+------------+----------+
// | CompressType | Method | ID | RequestLen | Checksum |
// +--------------+----------------+----------+------------+----------+
// | uint16 | uvarint+string | uvarint | uvarint | uint32 |
// +--------------+----------------+----------+------------+----------+
type RequestHeader struct {
sync.RWMutex
CompressType compressor.CompressType
Method string
ID uint64
RequestLen uint32
Checksum uint32
}請求頭由壓縮類型,方法,id,請求長度和校驗碼組成。
// ResponseHeader request header structure looks like:
// +--------------+---------+----------------+-------------+----------+
// | CompressType | ID | Error | ResponseLen | Checksum |
// +--------------+---------+----------------+-------------+----------+
// | uint16 | uvarint | uvarint+string | uvarint | uint32 |
// +--------------+---------+----------------+-------------+----------+
type ResponseHeader struct {
sync.RWMutex
CompressType compressor.CompressType
ID uint64
Error string
ResponseLen uint32
Checksum uint32
}響應(yīng)頭由壓縮類型,id,錯誤信息,返回長度和校驗碼組成。
為了實現(xiàn)頭的重用,tinyrpc為頭構(gòu)建了緩存池:
var (
RequestPool sync.Pool
ResponsePool sync.Pool
)
func init() {
RequestPool = sync.Pool{New: func() interface{} {
return &RequestHeader{}
}}
ResponsePool = sync.Pool{New: func() interface{} {
return &ResponseHeader{}
}}
}在使用時get出來,生命周期結(jié)束后放回池子,并且在put之前需要進行重置:
h := header.RequestPool.Get().(*header.RequestHeader)
defer func() {
h.ResetHeader()
header.RequestPool.Put(h)
}()// ResetHeader reset request header
func (r *RequestHeader) ResetHeader() {
r.Lock()
defer r.Unlock()
r.ID = 0
r.Checksum = 0
r.Method = ""
r.CompressType = 0
r.RequestLen = 0
}
// ResetHeader reset response header
func (r *ResponseHeader) ResetHeader() {
r.Lock()
defer r.Unlock()
r.Error = ""
r.ID = 0
r.CompressType = 0
r.Checksum = 0
r.ResponseLen = 0
}搞清楚了頭的結(jié)構(gòu)以及對象池的復(fù)用邏輯,那么具體的頭的編碼與解碼就是很簡單的拆裝工作,就不在此一行一行解析了,大家有興趣可以自行去閱讀。
編碼 codec
由于tinyrpc是基于net/rpc開發(fā),那么其codec模塊自然也是依賴于net/rpc的ClientCodec和ServerCodec接口來實現(xiàn)的。
客戶端實現(xiàn)
客戶端是基于ClientCodec實現(xiàn)的能力:
type ClientCodec interface {
WriteRequest(*Request, any) error
ReadResponseHeader(*Response) error
ReadResponseBody(any) error
Close() error
}client定義了一個clientCodec類型,并且實現(xiàn)了ClientCodec的接口方法:
type clientCodec struct {
r io.Reader
w io.Writer
c io.Closer
compressor compressor.CompressType // rpc compress type(raw,gzip,snappy,zlib)
serializer serializer.Serializer
response header.ResponseHeader // rpc response header
mutex sync.Mutex // protect pending map
pending map[uint64]string
}WriteRequest實現(xiàn):
// WriteRequest Write the rpc request header and body to the io stream
func (c *clientCodec) WriteRequest(r *rpc.Request, param interface{}) error {
c.mutex.Lock()
c.pending[r.Seq] = r.ServiceMethod
c.mutex.Unlock()
if _, ok := compressor.Compressors[c.compressor]; !ok {
return NotFoundCompressorError
}
reqBody, err := c.serializer.Marshal(param)
if err != nil {
return err
}
compressedReqBody, err := compressor.Compressors[c.compressor].Zip(reqBody)
if err != nil {
return err
}
h := header.RequestPool.Get().(*header.RequestHeader)
defer func() {
h.ResetHeader()
header.RequestPool.Put(h)
}()
h.ID = r.Seq
h.Method = r.ServiceMethod
h.RequestLen = uint32(len(compressedReqBody))
h.CompressType = compressor.CompressType(c.compressor)
h.Checksum = crc32.ChecksumIEEE(compressedReqBody)
if err := sendFrame(c.w, h.Marshal()); err != nil {
return err
}
if err := write(c.w, compressedReqBody); err != nil {
return err
}
c.w.(*bufio.Writer).Flush()
return nil
}可以看到代碼的實現(xiàn)還是比較清晰的,主要分為幾個步驟:
- 將數(shù)據(jù)進行序列化構(gòu)成請求體
- 選擇相應(yīng)的壓縮算法進行壓縮
- 從Pool中獲取請求頭實例將數(shù)據(jù)全部填入其中構(gòu)成最后的請求頭
- 分別通過io操作發(fā)送處理過的請求頭和請求體
ReadResponseHeader實現(xiàn):
// ReadResponseHeader read the rpc response header from the io stream
func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {
c.response.ResetHeader()
data, err := recvFrame(c.r)
if err != nil {
return err
}
err = c.response.Unmarshal(data)
if err != nil {
return err
}
c.mutex.Lock()
r.Seq = c.response.ID
r.Error = c.response.Error
r.ServiceMethod = c.pending[r.Seq]
delete(c.pending, r.Seq)
c.mutex.Unlock()
return nil
}此方法作用是讀取返回的響應(yīng)頭,并解析成具體的結(jié)構(gòu)體
ReadResponseBody實現(xiàn):
func (c *clientCodec) ReadResponseBody(param interface{}) error {
if param == nil {
if c.response.ResponseLen != 0 {
if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil {
return err
}
}
return nil
}
respBody := make([]byte, c.response.ResponseLen)
err := read(c.r, respBody)
if err != nil {
return err
}
if c.response.Checksum != 0 {
if crc32.ChecksumIEEE(respBody) != c.response.Checksum {
return UnexpectedChecksumError
}
}
if c.response.GetCompressType() != c.compressor {
return CompressorTypeMismatchError
}
resp, err := compressor.Compressors[c.response.GetCompressType()].Unzip(respBody)
if err != nil {
return err
}
return c.serializer.Unmarshal(resp, param)
}此方法是用于讀取返回的響應(yīng)結(jié)構(gòu)體,流程如下:
- 讀取流獲取響應(yīng)體
- 根據(jù)響應(yīng)頭中的校驗碼來比對響應(yīng)體是否完整
- 根據(jù)壓縮算法來解壓具體的結(jié)構(gòu)體
- 進行反序列化
服務(wù)端實現(xiàn)
服務(wù)端是基于ServerCodec實現(xiàn)的能力:
type ServerCodec interface {
ReadRequestHeader(*Request) error
ReadRequestBody(any) error
WriteResponse(*Response, any) error
// Close can be called multiple times and must be idempotent.
Close() error
}和客戶端類似,server定義了一個serverCodec類型,并且實現(xiàn)了ServerCodec的接口方法:
type serverCodec struct {
r io.Reader
w io.Writer
c io.Closer
request header.RequestHeader
serializer serializer.Serializer
mutex sync.Mutex // protects seq, pending
seq uint64
pending map[uint64]*reqCtx
}ReadRequestHeader實現(xiàn):
// ReadRequestHeader read the rpc request header from the io stream
func (s *serverCodec) ReadRequestHeader(r *rpc.Request) error {
s.request.ResetHeader()
data, err := recvFrame(s.r)
if err != nil {
return err
}
err = s.request.Unmarshal(data)
if err != nil {
return err
}
s.mutex.Lock()
s.seq++
s.pending[s.seq] = &reqCtx{s.request.ID, s.request.GetCompressType()}
r.ServiceMethod = s.request.Method
r.Seq = s.seq
s.mutex.Unlock()
return nil
}此方法用于讀取請求頭并解析成結(jié)構(gòu)體
ReadRequestBody實現(xiàn):
// ReadRequestBody read the rpc request body from the io stream
func (s *serverCodec) ReadRequestBody(param interface{}) error {
if param == nil {
if s.request.RequestLen != 0 {
if err := read(s.r, make([]byte, s.request.RequestLen)); err != nil {
return err
}
}
return nil
}
reqBody := make([]byte, s.request.RequestLen)
err := read(s.r, reqBody)
if err != nil {
return err
}
if s.request.Checksum != 0 {
if crc32.ChecksumIEEE(reqBody) != s.request.Checksum {
return UnexpectedChecksumError
}
}
if _, ok := compressor.
Compressors[s.request.GetCompressType()]; !ok {
return NotFoundCompressorError
}
req, err := compressor.
Compressors[s.request.GetCompressType()].Unzip(reqBody)
if err != nil {
return err
}
return s.serializer.Unmarshal(req, param)
}此方法用于讀取請求體,流程和讀取響應(yīng)體差不多,大致如下:
- 讀取流并解析成請求體
- 根據(jù)請求頭中的校驗碼進行校驗
- 根據(jù)壓縮算法進行解壓
- 反序列化
WriteResponse實現(xiàn):
// WriteResponse Write the rpc response header and body to the io stream
func (s *serverCodec) WriteResponse(r *rpc.Response, param interface{}) error {
s.mutex.Lock()
reqCtx, ok := s.pending[r.Seq]
if !ok {
s.mutex.Unlock()
return InvalidSequenceError
}
delete(s.pending, r.Seq)
s.mutex.Unlock()
if r.Error != "" {
param = nil
}
if _, ok := compressor.
Compressors[reqCtx.compareType]; !ok {
return NotFoundCompressorError
}
var respBody []byte
var err error
if param != nil {
respBody, err = s.serializer.Marshal(param)
if err != nil {
return err
}
}
compressedRespBody, err := compressor.
Compressors[reqCtx.compareType].Zip(respBody)
if err != nil {
return err
}
h := header.ResponsePool.Get().(*header.ResponseHeader)
defer func() {
h.ResetHeader()
header.ResponsePool.Put(h)
}()
h.ID = reqCtx.requestID
h.Error = r.Error
h.ResponseLen = uint32(len(compressedRespBody))
h.Checksum = crc32.ChecksumIEEE(compressedRespBody)
h.CompressType = reqCtx.compareType
if err = sendFrame(s.w, h.Marshal()); err != nil {
return err
}
if err = write(s.w, compressedRespBody); err != nil {
return err
}
s.w.(*bufio.Writer).Flush()
return nil
}此方法用于寫入響應(yīng)體,大致與寫入請求體差不多,流程如下:
- 將響應(yīng)體序列化
- 使用壓縮算法將響應(yīng)體進行壓縮
- 使用Pool管理響應(yīng)頭
- 分別發(fā)送返回頭和返回體
總結(jié)
tinyrpc是基于golang原生的net/rpc包實現(xiàn),在此基礎(chǔ)上實現(xiàn)了壓縮和序列化等能力擴展。整體來看tinyrpc的代碼非常簡單,比較適合剛接觸golang的程序員來進行閱讀學(xué)習(xí),學(xué)習(xí)一些golang的基礎(chǔ)的開發(fā)技巧和一些語言特性。
以上就是Golang中tinyrpc框架的源碼解讀詳解的詳細內(nèi)容,更多關(guān)于Golang tinyrpc框架的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go語言基礎(chǔ)學(xué)習(xí)之?dāng)?shù)組的使用詳解
數(shù)組相必大家都很熟悉,各大語言也都有數(shù)組的身影。Go 語言也提供了數(shù)組類型的數(shù)據(jù)結(jié)構(gòu)。本文就來通過一些簡單的示例帶大家了解一下Go語言中數(shù)組的使用,希望對大家有所幫助2022-12-12
Golang中Kafka的重復(fù)消費和消息丟失問題的解決方案
在Kafka中無論是生產(chǎn)者發(fā)送消息到Kafka集群還是消費者從Kafka集群中拉取消息,都是容易出現(xiàn)問題的,比較典型的就是消費端的重復(fù)消費問題、生產(chǎn)端和消費端產(chǎn)生的消息丟失問題,下面將對這兩個問題出現(xiàn)的場景以及常見的解決方案進行講解2023-08-08
VsCode下開發(fā)Go語言的環(huán)境配置超詳細圖文詳解
vscode是一款跨平臺、輕量級、插件多的開源IDE,在vscode不僅可以配置C/C++、Python、R、Ruby等語言的環(huán)境,還可以配置Go語言的環(huán)境,下面這篇文章主要給大家介紹了關(guān)于VsCode下開發(fā)Go語言的環(huán)境配置,需要的朋友可以參考下2024-03-03
淺談goland導(dǎo)入自定義包時出錯(一招解決問題)
這篇文章主要介紹了淺談goland導(dǎo)入自定義包時出錯(一招解決問題),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12

