go實(shí)現(xiàn)grpc四種數(shù)據(jù)流模式
1. 什么是數(shù)據(jù)流
grpc中的stream,srteam顧名思義就是一種流,可以源源不斷的推送數(shù)據(jù),很適合傳輸一些大數(shù)據(jù),或者服務(wù)端和客戶(hù)端長(zhǎng)時(shí)間數(shù)據(jù)交互,比如客戶(hù)端可以向服務(wù)端訂閱一個(gè)數(shù)據(jù),服務(wù)端就可以利用stream,源源不斷地推送數(shù)據(jù)。
底層還原成socket編程
2. grpc的四種數(shù)據(jù)流
1.簡(jiǎn)單模式
2.服務(wù)端數(shù)據(jù)流模式(Server-side streaming RPC)
3.客戶(hù)端數(shù)據(jù)流模式(Client-side streaming RPC)
4.雙向數(shù)據(jù)流模式(Bidirectional streaming RPC)
2.1 簡(jiǎn)單模式
這種模式最為傳統(tǒng),即客戶(hù)端發(fā)起一次請(qǐng)求,服務(wù)端響應(yīng)一個(gè)數(shù)據(jù),這和大家平時(shí)熟悉的RPC沒(méi)有什么大的區(qū)別,上兩篇中介紹此模式。
2.2 服務(wù)端數(shù)據(jù)流模式
這種模式是客戶(hù)端發(fā)起一次請(qǐng)求,服務(wù)端返回一段連續(xù)的數(shù)據(jù)流。典型的例子是客戶(hù)端向服務(wù)端發(fā)送一個(gè)股票代碼,服務(wù)端就把該股票的實(shí)時(shí)數(shù)據(jù)源源不斷的返回給客戶(hù)端
2.3 客戶(hù)端數(shù)據(jù)流模式
與服務(wù)端數(shù)據(jù)流模式相反,這次是客戶(hù)端源源不斷的向服務(wù)端發(fā)送數(shù)據(jù)流,而在發(fā)送結(jié)束后,由服務(wù)端返回一個(gè)響應(yīng)。典型的例子是物聯(lián)網(wǎng)終端向服務(wù)器報(bào)送數(shù)據(jù)。
2.4 雙向數(shù)據(jù)流
顧名思義,這是客戶(hù)端和服務(wù)端都可以向?qū)Ψ桨l(fā)送數(shù)據(jù)流,這個(gè)時(shí)候雙方的數(shù)據(jù)可以同時(shí)互相發(fā)送,也就是可以實(shí)現(xiàn)實(shí)時(shí)交互。典型的例子是聊天機(jī)器人。
3. 上代碼
3.1 代碼目錄

3.2 編寫(xiě)stream.proto文件
stream是常量,寫(xiě)在哪一邊,哪一邊就是數(shù)據(jù)流
syntax = "proto3";
option go_package = "./;proto";
service Greeter {
// 定義方法,stream是常量,流模式
rpc ServerStream (StreamRequestData) returns (stream StreamResponseData); //服務(wù)端流模式,拉消息
rpc ClientStream (stream StreamRequestData) returns (StreamResponseData); //客戶(hù)端流模式,推消息
rpc AllStream (stream StreamRequestData) returns (stream StreamResponseData); //雙向流模式,能推能拉
}
message StreamRequestData {
string data = 1; //編號(hào)
}
message StreamResponseData {
string data = 1; //編號(hào)
}
生成go的protobuf文件命令:
cd到proto目錄下
命令:protoc -I . hello.proto --go_out=plugins=grpc:.
3.3 編寫(xiě)server文件
package main
import (
"file_test/grpc_go_stream/proto"
"fmt"
"net"
"sync"
"time"
"google.golang.org/grpc"
)
const port = 8082
type server struct{}
func (s *server) ServerStream(req *proto.StreamRequestData, res proto.Greeter_ServerStreamServer) error {
i := 0
for {
i++
//業(yè)務(wù)代碼
_ = res.Send(&proto.StreamResponseData{
Data: fmt.Sprintf("這是發(fā)給%s的數(shù)據(jù)流", req.Data),
})
time.Sleep(time.Second * 1)
if i > 10 {
break
}
}
return nil
}
func (s *server) ClientStream(cliStr proto.Greeter_ClientStreamServer) error {
for {
//業(yè)務(wù)代碼
res, err := cliStr.Recv()
if err != nil {
fmt.Println("本次客戶(hù)端流數(shù)據(jù)發(fā)送完了:",err)
break
}
fmt.Println("客戶(hù)端發(fā)來(lái)消息:",res.Data)
}
return nil
}
func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
wg:=sync.WaitGroup{}
wg.Add(2)
//接受客戶(hù)端消息的協(xié)程
go func() {
defer wg.Done()
for {
//業(yè)務(wù)代碼
res, err := allStr.Recv()
if err != nil {
fmt.Println("本次客戶(hù)端流數(shù)據(jù)發(fā)送完了:",err)
break
}
fmt.Println("收到客戶(hù)端發(fā)來(lái)消息:",res.Data)
}
}()
//發(fā)送消息給客戶(hù)端的協(xié)程
go func() {
defer wg.Done()
i := 0
for {
i++
//業(yè)務(wù)代碼
_ = allStr.Send(&proto.StreamResponseData{
Data: fmt.Sprintf("這是發(fā)給客戶(hù)端的數(shù)據(jù)流"),
})
time.Sleep(time.Second * 1)
if i > 10 {
break
}
}
}()
wg.Wait()
return nil
}
// 啟動(dòng)
func start() {
// 1.實(shí)例化server
g := grpc.NewServer()
// 2.注冊(cè)邏輯到server中
proto.RegisterGreeterServer(g, &server{})
// 3.啟動(dòng)server
lis, err := net.Listen("tcp", "127.0.0.1:8082")
if err != nil {
panic("監(jiān)聽(tīng)錯(cuò)誤:" + err.Error())
}
err = g.Serve(lis)
if err != nil {
panic("啟動(dòng)錯(cuò)誤:" + err.Error())
}
}
func main() {
start()
}
3.4 編寫(xiě)client文件
package main
import (
"context"
"file_test/grpc_go_stream/proto"
"fmt"
"sync"
"time"
"google.golang.org/grpc"
)
var rpc proto.GreeterClient
func serverStreamDemo() {
//服務(wù)端流模式
res,err:=rpc.ServerStream(context.Background(),&proto.StreamRequestData{Data: "jeff"})
if err != nil {
panic("rpc請(qǐng)求錯(cuò)誤:"+err.Error())
}
for {
data,err:=res.Recv() //
if err != nil {
fmt.Println("客戶(hù)端發(fā)送完了:",err)
return
}
fmt.Println("客戶(hù)端返回?cái)?shù)據(jù)流值:",data.Data)
}
}
func clientStreamDemo() {
//客戶(hù)端流模式
cliStr, err := rpc.ClientStream(context.Background())
if err != nil {
panic("rpc請(qǐng)求錯(cuò)誤:" + err.Error())
}
i := 0
for {
i++
_ = cliStr.Send(&proto.StreamRequestData{
Data: "jeff",
})
time.Sleep(time.Second * 1)
if i > 10 {
break
}
}
}
func clientAndServerStreamDemo() {
//雙向流模式
allStr, _ := rpc.AllStream(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
//接受服務(wù)端消息的協(xié)程
go func() {
defer wg.Done()
for {
//業(yè)務(wù)代碼
res, err := allStr.Recv()
if err != nil {
fmt.Println("本次服務(wù)端流數(shù)據(jù)發(fā)送完了:", err)
break
}
fmt.Println("收到服務(wù)端發(fā)來(lái)消息:", res.Data)
}
}()
//發(fā)送消息給服務(wù)端的協(xié)程
go func() {
defer wg.Done()
i := 0
for {
i++
//業(yè)務(wù)代碼
_ = allStr.Send(&proto.StreamRequestData{
Data: fmt.Sprintf("這是發(fā)給服務(wù)端的數(shù)據(jù)流"),
})
time.Sleep(time.Second * 1)
if i > 10 {
break
}
}
}()
wg.Wait()
}
// 啟動(dòng)
func start() {
conn, err := grpc.Dial("127.0.0.1:8082", grpc.WithInsecure())
if err != nil {
panic("rpc連接錯(cuò)誤:" + err.Error())
}
defer conn.Close()
rpc = proto.NewGreeterClient(conn) //初始化
serverStreamDemo() //服務(wù)端流模式
clientStreamDemo() //客戶(hù)端流模式
clientAndServerStreamDemo() // 雙向流模式
}
func main() {
start()
}以上就是go實(shí)現(xiàn)grpc四種數(shù)據(jù)流模式的詳細(xì)內(nèi)容,更多關(guān)于go實(shí)現(xiàn)grpc流模式的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
淺談Golang 嵌套 interface 的賦值問(wèn)題
這篇文章主要介紹了淺談Golang 嵌套 interface 的賦值問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-04-04
go 原生http web 服務(wù)跨域restful api的寫(xiě)法介紹
這篇文章主要介紹了go 原生http web 服務(wù)跨域restful api的寫(xiě)法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-04-04
GoLand 2020.3 正式發(fā)布有不少新功能(支持泛型)
這是 2020 年第 3 個(gè)版本,也是最后一個(gè)版本,你還將發(fā)現(xiàn)許多新的代碼編輯功能,具體內(nèi)容詳情跟隨小編看看有哪些新特性2020-12-12
Golang使用泛型對(duì)數(shù)組進(jìn)行去重的實(shí)現(xiàn)
本文主要介紹了Golang使用泛型對(duì)數(shù)組進(jìn)行去重的實(shí)現(xiàn),通過(guò)使用類(lèi)型參數(shù)T和類(lèi)型約束any,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2025-02-02
golang http 連接超時(shí)和傳輸超時(shí)的例子
今天小編就為大家分享一篇golang http 連接超時(shí)和傳輸超時(shí)的例子,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-07-07

