如何通過memberlist庫實現(xiàn)gossip管理集群及集群數(shù)據(jù)交互問題
通過memberlist庫實現(xiàn)gossip管理集群以及集群數(shù)據(jù)交互
概述
memberlist庫的簡單用法如下,注意下面使用for循環(huán)來執(zhí)行 list.Join ,原因是一開始各節(jié)點都沒有runing,直接執(zhí)行 Join 會出現(xiàn)連接拒絕的錯誤。
package main
import (
"fmt"
"github.com/hashicorp/memberlist"
"time"
)
func main() {
/* Create the initial memberlist from a safe configuration.
Please reference the godoc for other default config types.
http://godoc.org/github.com/hashicorp/memberlist#Config
*/
list, err := memberlist.Create(memberlist.DefaultLocalConfig())
if err != nil {
panic("Failed to create memberlist: " + err.Error())
}
t := time.NewTicker(time.Second * 5)
for {
select {
case <-t.C:
// Join an existing cluster by specifying at least one known member.
n, err := list.Join([]string{"192.168.80.129"})
if err != nil {
fmt.Println("Failed to join cluster: " + err.Error())
continue
}
fmt.Println("member number is:", n)
goto END
}
}
END:
for {
select {
case <-t.C:
// Ask for members of the cluster
for _, member := range list.Members() {
fmt.Printf("Member: %s %s\n", member.Name, member.Addr)
}
}
}
// Continue doing whatever you need, memberlist will maintain membership
// information in the background. Delegates can be used for receiving
// events when members join or leave.
}memberlist的兩個主要接口如下:
Create:根據(jù)入?yún)⑴渲脛?chuàng)建一個
Memberlist,初始化階段Memberlist僅包含本節(jié)點狀態(tài)。注意此時并不會連接到其他節(jié)點,執(zhí)行成功之后就可以允許其他節(jié)點加入該memberlist。Join:使用已有的
Memberlist來嘗試連接給定的主機,并與之同步狀態(tài),以此來加入某個cluster。執(zhí)行該操作可以讓其他節(jié)點了解到本節(jié)點的存在。最后返回成功建立連接的節(jié)點數(shù)以及錯誤信息,如果沒有與任何節(jié)點建立連接,則返回錯誤。注意當(dāng)join一個cluster時,至少需要指定集群中的一個已知成員,后續(xù)會通過gossip同步整個集群的成員信息。
memberlist提供的功能主要分為兩塊:維護成員狀態(tài)(gossip)以及數(shù)據(jù)同步(boardcast、SendReliable)。下面看幾個相關(guān)接口。
接口
memberlist.Create 的入?yún)⒁蠼o出相應(yīng)的 配置 信息, DefaultLocalConfig() 給出了通用的配置信息,但還需要實現(xiàn)相關(guān)接口來實現(xiàn)成員狀態(tài)的同步以及用戶數(shù)據(jù)的收發(fā)。注意下面有些接口是必選的,有些則可選:
type Config struct {
// ...
// Delegate and Events are delegates for receiving and providing
// data to memberlist via callback mechanisms. For Delegate, see
// the Delegate interface. For Events, see the EventDelegate interface.
//
// The DelegateProtocolMin/Max are used to guarantee protocol-compatibility
// for any custom messages that the delegate might do (broadcasts,
// local/remote state, etc.). If you don't set these, then the protocol
// versions will just be zero, and version compliance won't be done.
Delegate Delegate
Events EventDelegate
Conflict ConflictDelegate
Merge MergeDelegate
Ping PingDelegate
Alive AliveDelegate
//...
}memberlist使用如下 類型 的消息來同步集群狀態(tài)和處理用戶消息:
const ( pingMsg messageType = iota indirectPingMsg ackRespMsg suspectMsg aliveMsg deadMsg pushPullMsg compoundMsg userMsg // User mesg, not handled by us compressMsg encryptMsg nackRespMsg hasCrcMsg errMsg )
Delegate
如果要使用memberlist的gossip協(xié)議,則必須實現(xiàn)該接口。所有這些方法都必須是線程安全的。
type Delegate interface {
// NodeMeta is used to retrieve meta-data about the current node
// when broadcasting an alive message. It's length is limited to
// the given byte size. This metadata is available in the Node structure.
NodeMeta(limit int) []byte
// NotifyMsg is called when a user-data message is received.
// Care should be taken that this method does not block, since doing
// so would block the entire UDP packet receive loop. Additionally, the byte
// slice may be modified after the call returns, so it should be copied if needed
NotifyMsg([]byte)
// GetBroadcasts is called when user data messages can be broadcast.
// It can return a list of buffers to send. Each buffer should assume an
// overhead as provided with a limit on the total byte size allowed.
// The total byte size of the resulting data to send must not exceed
// the limit. Care should be taken that this method does not block,
// since doing so would block the entire UDP packet receive loop.
GetBroadcasts(overhead, limit int) [][]byte
// LocalState is used for a TCP Push/Pull. This is sent to
// the remote side in addition to the membership information. Any
// data can be sent here. See MergeRemoteState as well. The `join`
// boolean indicates this is for a join instead of a push/pull.
LocalState(join bool) []byte
// MergeRemoteState is invoked after a TCP Push/Pull. This is the
// state received from the remote side and is the result of the
// remote side's LocalState call. The 'join'
// boolean indicates this is for a join instead of a push/pull.
MergeRemoteState(buf []byte, join bool)
}主要方法如下:
NotifyMsg:用于接收用戶消息(
userMsg)。注意不能阻塞該方法,否則會阻塞整個UDP/TCP報文接收循環(huán)。此外由于數(shù)據(jù)可能在方法調(diào)用時被修改,因此應(yīng)該事先拷貝數(shù)據(jù)。該方法用于接收通過UDP/TCP方式發(fā)送的用戶消息(
userMsg):注意UDP方式并不是立即發(fā)送的,它會隨gossip周期性發(fā)送或在處理
pingMsg等消息時發(fā)送從GetBroadcasts獲取到的用戶消息。//使用UDP方式將用戶消息傳輸?shù)浇o定節(jié)點,消息大小受限于memberlist的UDPBufferSize配置。沒有使用gossip機制 func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error //與SendBestEffort機制相同,只不過一個指定了Node,一個指定了Node地址 func (m *Memberlist) SendToAddress(a Address, msg []byte) error //使用TCP方式將用戶消息傳輸?shù)浇o定節(jié)點,消息沒有大小限制。沒有使用gossip機制 func (m *Memberlist) SendReliable(to *Node, msg []byte) error
GetBroadcasts:用于在gossip周期性調(diào)度或處理處理
pingMsg等消息時攜帶用戶消息,因此并不是即時的。通常會把需要發(fā)送的消息通過TransmitLimitedQueue.QueueBroadcast保存起來,然后在發(fā)送時通過TransmitLimitedQueue.GetBroadcasts獲取需要發(fā)送的消息。見下面TransmitLimitedQueue的描述。LocalState:用于TCP Push/Pull,用于向遠端發(fā)送除成員之外的信息(可以發(fā)送任意數(shù)據(jù)),用于定期同步成員狀態(tài)。參數(shù)
join用于表示將該方法用于join階段,而非push/pull。MergeRemoteState:TCP Push/Pull之后調(diào)用,接收到遠端的狀態(tài)(即遠端調(diào)用LocalState的結(jié)果)。參數(shù)
join用于表示將該方法用于join階段,而非push/pull。
定期(PushPullInterval)調(diào)用pushPull來隨機執(zhí)行一次完整的狀態(tài)交互。但由于pushPull會與其他節(jié)點同步本節(jié)點的所有狀態(tài),因此代價也比較大。
EventDelegate
僅用于接收成員的joining 和leaving通知,可以用于更新本地的成員狀態(tài)信息。
type EventDelegate interface {
// NotifyJoin is invoked when a node is detected to have joined.
// The Node argument must not be modified.
NotifyJoin(*Node)
// NotifyLeave is invoked when a node is detected to have left.
// The Node argument must not be modified.
NotifyLeave(*Node)
// NotifyUpdate is invoked when a node is detected to have
// updated, usually involving the meta data. The Node argument
// must not be modified.
NotifyUpdate(*Node)
}ChannelEventDelegate 實現(xiàn)了簡單的 EventDelegate 接口:
type ChannelEventDelegate struct {
Ch chan<- NodeEvent
}ConflictDelegate
用于通知某個client在執(zhí)行join時產(chǎn)生了命名沖突。通常是因為兩個client配置了相同的名稱,但使用了不同的地址??梢杂糜诮y(tǒng)計錯誤信息。
type ConflictDelegate interface {
// NotifyConflict is invoked when a name conflict is detected
NotifyConflict(existing, other *Node)
}MergeDelegate
在集群執(zhí)行merge操作時調(diào)用。 NotifyMerge 方法的參數(shù) peers 提供了對端成員信息。 可以不實現(xiàn)該接口。
type MergeDelegate interface {
// NotifyMerge is invoked when a merge could take place.
// Provides a list of the nodes known by the peer. If
// the return value is non-nil, the merge is canceled.
NotifyMerge(peers []*Node) error
}PingDelegate
用于通知觀察者完成一個ping消息( pingMsg )要花費多長時間??梢栽?nbsp;NotifyPingComplete 中(使用histogram)統(tǒng)計ping的執(zhí)行時間。
type PingDelegate interface {
// AckPayload is invoked when an ack is being sent; the returned bytes will be appended to the ack
AckPayload() []byte
// NotifyPing is invoked when an ack for a ping is received
NotifyPingComplete(other *Node, rtt time.Duration, payload []byte)
}AliveDelegate
當(dāng)接收到 aliveMsg 消息時調(diào)用的接口,可以用于添加日志和指標等信息。
type AliveDelegate interface {
// NotifyAlive is invoked when a message about a live
// node is received from the network. Returning a non-nil
// error prevents the node from being considered a peer.
NotifyAlive(peer *Node) error
}Broadcast
可以隨gossip將數(shù)據(jù)廣播到memberlist集群。
// Broadcast is something that can be broadcasted via gossip to
// the memberlist cluster.
type Broadcast interface {
// Invalidates checks if enqueuing the current broadcast
// invalidates a previous broadcast
Invalidates(b Broadcast) bool
// Returns a byte form of the message
Message() []byte
// Finished is invoked when the message will no longer
// be broadcast, either due to invalidation or to the
// transmit limit being reached
Finished()
}Broadcast 接口通常作為 TransmitLimitedQueue.QueueBroadcast 的入?yún)ⅲ?/p>
func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) {
q.queueBroadcast(b, 0)
}alertmanager中的實現(xiàn)如下:
type simpleBroadcast []byte
func (b simpleBroadcast) Message() []byte { return []byte(b) }
func (b simpleBroadcast) Invalidates(memberlist.Broadcast) bool { return false }
func (b simpleBroadcast) Finished()TransmitLimitedQueue
TransmitLimitedQueue主要用于處理廣播消息。有兩個主要的方法: QueueBroadcast 和 GetBroadcasts ,前者用于保存廣播消息,后者用于在發(fā)送的時候獲取需要廣播的消息。隨gossip周期性調(diào)度或在處理 pingMsg 等消息時調(diào)用 GetBroadcasts 方法。
// TransmitLimitedQueue is used to queue messages to broadcast to
// the cluster (via gossip) but limits the number of transmits per
// message. It also prioritizes messages with lower transmit counts
// (hence newer messages).
type TransmitLimitedQueue struct {
// NumNodes returns the number of nodes in the cluster. This is
// used to determine the retransmit count, which is calculated
// based on the log of this.
NumNodes func() int
// RetransmitMult is the multiplier used to determine the maximum
// number of retransmissions attempted.
RetransmitMult int
mu sync.Mutex
tq *btree.BTree // stores *limitedBroadcast as btree.Item
tm map[string]*limitedBroadcast
idGen int64
}小結(jié)
memberlist中的消息分為兩種,一種是內(nèi)部用于同步集群狀態(tài)的消息,另一種是用戶消息。
GossipInterval 周期性調(diào)度的有兩個方法:
// GossipInterval and GossipNodes are used to configure the gossip // behavior of memberlist. // // GossipInterval is the interval between sending messages that need // to be gossiped that haven't been able to piggyback on probing messages. // If this is set to zero, non-piggyback gossip is disabled. By lowering // this value (more frequent) gossip messages are propagated across // the cluster more quickly at the expense of increased bandwidth. // // GossipNodes is the number of random nodes to send gossip messages to // per GossipInterval. Increasing this number causes the gossip messages // to propagate across the cluster more quickly at the expense of // increased bandwidth. // // GossipToTheDeadTime is the interval after which a node has died that // we will still try to gossip to it. This gives it a chance to refute. GossipInterval time.Duration GossipNodes int GossipToTheDeadTime time.Duration
用戶消息又分為兩種:
- 周期性同步:
- 以
PushPullInterval為周期,使用Delegate.LocalState和Delegate.MergeRemoteState以TCP方式同步用戶信息; - 使用
Delegate.GetBroadcasts隨gossip發(fā)送用戶信息。
- 以
- 主動發(fā)送:使用
SendReliable等方法實現(xiàn)主動發(fā)送用戶消息。
alertmanager的處理
alertmanager通過兩種方式發(fā)送用戶消息,即UDP方式和TCP方式。在alertmanager中,當(dāng)要發(fā)送的數(shù)據(jù)大于 MaxGossipPacketSize/2 將采用TCP方式( SendReliable 方法),否則使用UDP方式( Broadcast 接口)。
func (c *Channel) Broadcast(b []byte) {
b, err := proto.Marshal(&clusterpb.Part{Key: c.key, Data: b})
if err != nil {
return
}
if OversizedMessage(b) {
select {
case c.msgc <- b: //從c.msgc 接收數(shù)據(jù),并使用SendReliable發(fā)送
default:
level.Debug(c.logger).Log("msg", "oversized gossip channel full")
c.oversizeGossipMessageDroppedTotal.Inc()
}
} else {
c.send(b)
}
}
func OversizedMessage(b []byte) bool {
return len(b) > MaxGossipPacketSize/2
}demo
這里 實現(xiàn)了一個簡單的基于gossip管理集群信息,并通過TCP給集群成員發(fā)送信息的例子。
到此這篇關(guān)于通過memberlist庫實現(xiàn)gossip管理集群以及集群數(shù)據(jù)交互的文章就介紹到這了,更多相關(guān)memberlist庫gossip集群內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
adb shell input keyevent 控制按鍵輸入的數(shù)值(收藏版)
adb shell的功能很強大,可以使用很多功能,今天我們說下通過控制按鍵輸入,需要的朋友可以參考下2019-10-10
將git項目導(dǎo)入GitHub的方法(附創(chuàng)建分支)
下面小編就為大家?guī)硪黄獙it項目導(dǎo)入GitHub的方法(附創(chuàng)建分支)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-11-11
解決IDEA中g(shù)it拉取代碼時出現(xiàn)Update canceled問題
這篇文章主要介紹了解決IDEA中g(shù)it拉取代碼時出現(xiàn)Update canceled問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-07-07
git?push時卡住的解決方法(長時間不報錯也不自動退出)
這篇文章主要介紹了git?push時卡?。ㄩL時間不報錯也不自動退出),解決方法是通過設(shè)置ssh,用ssh來克隆代碼然后再進行提交,本文給大家詳細解決方案,需要的朋友可以參考下2023-01-01
Scala 環(huán)境搭建及IDEA工具的配置使用教程
這篇文章主要介紹了Scala 之 環(huán)境搭建 及 工具IDEA的配置使用,本文通過實例圖文相結(jié)合給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-06-06

