go語言K8S?的?informer機制淺析
正文
Kubernetes的控制器模式是其非常重要的一個設計模式,整個Kubernetes定義的資源對象以及其狀態(tài)都保存在etcd數(shù)據(jù)庫中,通過apiserver對其進行增刪查改,而各種各樣的控制器需要從apiserver及時獲取這些對象,然后將其應用到實際中,即將這些對象的實際狀態(tài)調(diào)整為期望狀態(tài),讓他們保持匹配。
不過這因為這樣,各種控制器需要和apiserver進行頻繁交互,需要能夠及時獲取對象狀態(tài)的變化,而如果簡單的通過暴力輪詢的話,會給apiserver造成很大的壓力,且效率很低,因此,Kubernetes設計了Informer這個機制,用來作為控制器跟apiserver交互的橋梁,它主要有兩方面的作用:

依賴Etcd的List&Watch機制,在本地維護了一份目標對象的緩存。
Etcd的Watch機制能夠使客戶端及時獲知這些對象的狀態(tài)變化,然后通過List機制,更新本地緩存,這樣就在客戶端為這些API對象維護了一份和Etcd數(shù)據(jù)庫中幾乎一致的數(shù)據(jù),然后控制器等客戶端就可以直接訪問緩存獲取對象的信息,而不用去直接訪問apiserver,這一方面顯著提高了性能,另一方面則大大降低了對apiserver的訪問壓力;
依賴Etcd的Watch機制,觸發(fā)控制器等客戶端注冊到Informer中的事件方法。
客戶端可能會對某些對象的某些事件感興趣,當這些事件發(fā)生時,希望能夠執(zhí)行某些操作,比如通過apiserver新建了一個pod,那么kube-scheduler中的控制器收到了這個事件,然后將這個pod加入到其隊列中,等待進行調(diào)度。
Kubernetes的各個組件本身就內(nèi)置了非常多的控制器,而自定義的控制器也需要通過Informer跟apiserver進行交互,因此,Informer在Kubernetes中應用非常廣泛,本篇文章就重點分析下Informer的機制原理,以加深對其的理解。
使用方法
先來看看Informer是怎么用的,以Endpoint為例,來看下其使用Informer的相關代碼:
創(chuàng)建Informer工廠
# client-go/informers/factory.go sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
首先創(chuàng)建了一個SharedInformerFactory,這個結構主要有兩個作用:
- 一個是用來作為創(chuàng)建Informer的工廠,典型的工廠模式,在Kubernetes中這種設計模式也很常用;
- 一個是共享Informer,所謂共享,就是多個Controller可以共用同一個Informer,因為不同的Controller可能對同一種API對象感興趣,這樣相同的API對象,緩存就只有一份,通知機制也只有一套,大大提高了效率,減少了資源浪費。
創(chuàng)建對象Informer結構體
# client-go/informers/core/v1/endpoints.go
type EndpointsInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.EndpointsLister
}
endpointsInformer := kubeInformerFactory.Core().V1().Endpoints()
使用InformerFactory創(chuàng)建出對應版本的對象的Informer結構體,如Endpoints對象對應的就是EndpointsInformer結構體,該結構體實現(xiàn)了兩個方法:Informer()和Lister()
- 前者用來構建出最終的Informer,即我們本篇文章的重點:SharedIndexInformer,
- 后者用來獲取創(chuàng)建出來的Informer的緩存接口:Indexer,該接口可以用來查詢緩存的數(shù)據(jù),我準備下一篇文章單獨介紹其底層如何實現(xiàn)緩存的。
注冊事件方法
# Client-go/tools/cache/shared_informer.go
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此處省略 workqueue 的使用
DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
})
func onAdd(obj interface{}) {
node := obj.(*corev1.Endpoint)
fmt.Println("add a endpoint:", endpoint.Name)
}
這里,首先調(diào)用Infomer()創(chuàng)建出來SharedIndexInformer,然后向其中注冊事件方法,這樣當有對應的事件發(fā)生時,就會觸發(fā)這里注冊的方法去做相應的事情。其次調(diào)用Lister()獲取到緩存接口,就可以通過它來查詢Informer中緩存的數(shù)據(jù)了,而且Informer中緩存的數(shù)據(jù),是可以有索引的,這樣可以加快查詢的速度。
啟動Informer
# kubernetes/cmd/kube-controller-manager/app/controllermanager.go controllerContext.InformerFactory.Start(controllerContext.Stop)
這里InformerFactory的啟動,會遍歷Factory中創(chuàng)建的所有Informer,依次將其啟動。
機制解析
Informer的實現(xiàn)都是在client-go這個庫中,通過上述的工廠方法,其實最終創(chuàng)建出來的是一個叫做SharedIndexInformer的結構體:
# k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor *sharedProcessor
cacheMutationDetector MutationDetector
listerWatcher ListerWatcher
......
}
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
可以看到,在創(chuàng)建SharedIndexInformer時,就創(chuàng)建出了processor, indexer等結構,而在Informer啟動時,還創(chuàng)建出了controller, fifo queue, reflector等結構。
Reflector
Reflector的作用,就是通過List&Watch的方式,從apiserver獲取到感興趣的對象以及其狀態(tài),然后將其放到一個稱為”Delta”的先進先出隊列中。
所謂的Delta FIFO Queue,就是隊列中的元素除了對象本身外,還有針對該對象的事件類型:
type Delta struct {
Type DeltaType
Object interface{}
}
目前有5種Type: Added, Updated, Deleted, Replaced, Resync,所以,針對同一個對象,可能有多個Delta元素在隊列中,表示對該對象做了不同的操作,比如短時間內(nèi),多次對某一個對象進行了更新操作,那么就會有多個Updated類型的Delta放入到隊列中。后續(xù)隊列的消費者,可以根據(jù)這些Delta的類型,來回調(diào)注冊到Informer中的事件方法。
而所謂的List&Watch,就是
- 先調(diào)用該API對象的List接口,獲取到對象列表,將它們添加到隊列中,Delta元素類型為Replaced,
- 然后再調(diào)用Watch接口,持續(xù)監(jiān)聽該API對象的狀態(tài)變化事件,將這些事件按照不同的事件類型,組成對應的Delta類型,添加到隊列中,Delta元素類型有Added, Updated, Deleted三種。
此外,Informer還會周期性的發(fā)送Resync類型的Delta元素到隊列中,目的是為了周期性的觸發(fā)注冊到Informer中的事件方法UpdateFunc,保證對象的期望狀態(tài)和實際狀態(tài)一致,該周期是由一個叫做resyncPeriod的參數(shù)決定的,在向Informer中添加EventHandler時,可以指定該參數(shù),若為0的話,則關閉該功能。需要注意的是,Resync類型的Delta元素中的對象,是通過Indexer從緩存中獲取到的,而不是直接從apiserver中拿的,即這里resync的,其實是”緩存”的對象的期望狀態(tài)和實際狀態(tài)的一致性。
根據(jù)以上Reflector的機制,依賴Etcd的Watch機制,通過事件來獲知對象變化狀態(tài),建立本地緩存。即使在Informer中,也沒有周期性的調(diào)用對象的List接口,正常情況下,List&Watch只會執(zhí)行一次,即先執(zhí)行List把數(shù)據(jù)拉過來,放入隊列中,后續(xù)就進入Watch階段。
那什么時候才會再執(zhí)行List呢?其實就是異常的時候,在List或者Watch的過程中,如果有異常,比如apiserver重啟了,那么Reflector就開始周期性的執(zhí)行List&Watch,直到再次正常進入Watch階段。為了在異常時段,不給apiserver造成壓力,這個周期是一個稱為backoff的可變的時間間隔,默認是一個指數(shù)型的間隔,即越往后重試的間隔越長,到一定時間又會重置回一開始的頻率。而且,為了讓不同的apiserver能夠均勻負載這些Watch請求,客戶端會主動斷開跟apiserver的連接,這個超時時間為60秒,然后重新發(fā)起Watch請求。此外,在控制器重啟過程中,也會再次執(zhí)行List,所以會觀察到之前已經(jīng)創(chuàng)建好的API對象,又重新觸發(fā)了一遍AddFunc方法。
從以上這些點,可以看出來,Kubernetes在性能和穩(wěn)定性的提升上,還是下了很多功夫的。
Controller
這里Controller的作用是通過輪詢不斷從隊列中取出Delta元素,根據(jù)元素的類型,一方面通過Indexer更新本地的緩存,一方面調(diào)用Processor來觸發(fā)注冊到Informer的事件方法:
# k8s.io/client-go/tools/cache/controller.go
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
}
}
這里的c.config.Process是定義在shared_informer.go中的HandleDeltas()方法:
# k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
Processer & Listener
Processer和Listener則是觸發(fā)事件方法的機制,在創(chuàng)建Informer時,會創(chuàng)建一個Processer,而在向Informer中通過調(diào)用AddEventHandler()注冊事件方法時,會為每一個Handler生成一個Listener,然后將該Lisener中添加到Processer中,每一個Listener中有兩個channel:addCh和nextCh。Listener通過select監(jiān)聽在這兩個channel上,當Controller從隊列中取出新的元素時,會調(diào)用processer來給它的listener發(fā)送“通知”,這個“通知”就是向addCh中添加一個元素,即add(),然后一個goroutine就會將這個元素從addCh轉移到nextCh,即pop(),從而觸發(fā)另一個goroutine執(zhí)行注冊的事件方法,即run()。
# k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
Indexer
Indexer是對緩存進行增刪查改的接口,緩存本質上就是用map構建的key:value鍵值對,都存在items這個map中,key為/:
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
而為了加速查詢,還可以選擇性的給這些緩存添加索引,索引存儲在indecies中,所謂索引,就是在向緩存中添加記錄時,就將其key添加到索引結構中,在查找時,可以根據(jù)索引條件,快速查找到指定的key記錄,比如默認有個索引是按照namespace進行索引,可以根據(jù)快速找出屬于某個namespace的某種對象,而不用去遍歷所有的緩存。
Indexer對外提供了Replace(), Resync(), Add(), Update(), Delete(), List(), Get(), GetByKey(), ByIndex()等接口。
總結
本篇對Kubernetes Informer的使用方法和實現(xiàn)原理,進行了深入分析,整體上看,Informer的設計是相當不錯的,基于事件機制,一方面構建本地緩存,一方面觸發(fā)事件方法,使得控制器能夠快速響應和快速獲取數(shù)據(jù),此外,還有諸如共享Informer, resync, index, watch timeout等機制,使得Informer更加高效和穩(wěn)定,有了Informer,控制器模式可以說是如虎添翼。
以上就是go語言K8S 的 informer機制淺析的詳細內(nèi)容,更多關于go K8S informer機制淺析的資料請關注腳本之家其它相關文章!
相關文章
使用?gomonkey?Mock?函數(shù)及方法示例詳解
在 Golang 語言中,寫單元測試的時候,不可避免的會涉及到對其他函數(shù)及方法的 Mock,即在假設其他函數(shù)及方法響應預期結果的同時,校驗被測函數(shù)的響應是否符合預期,這篇文章主要介紹了使用?gomonkey?Mock?函數(shù)及方法,需要的朋友可以參考下2022-06-06
Go語言defer與return執(zhí)行的先后順序詳解
這篇文章主要為大家介紹了Go語言defer與return執(zhí)行的先后順序詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-12-12
GO語言中創(chuàng)建切片的三種實現(xiàn)方式
這篇文章主要介紹了GO語言中創(chuàng)建切片的三種實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-09-09

