源碼閱讀之storm操作zookeeper-cluster.clj
storm操作zookeeper的主要函數(shù)都定義在命名空間backtype.storm.cluster中(即cluster.clj文件中)。 backtype.storm.cluster定義了兩個(gè)重要protocol:ClusterState和StormClusterState。
clojure中的protocol可以看成java中的接口,封裝了一組方法。ClusterState協(xié)議中封裝了一組與zookeeper進(jìn)行交互的基礎(chǔ)函數(shù),如獲取子節(jié)點(diǎn)函數(shù),獲取子節(jié)點(diǎn)數(shù)據(jù)函數(shù)等,ClusterState協(xié)議定義如下:
ClusterState協(xié)議
(defprotocol ClusterState (set-ephemeral-node [this path data]) (delete-node [this path]) (create-sequential [this path data]) ;; if node does not exist, create persistent with this data (set-data [this path data]) (get-data [this path watch?]) (get-version [this path watch?]) (get-data-with-version [this path watch?]) (get-children [this path watch?]) (mkdirs [this path]) (close [this]) (register [this callback]) (unregister [this id]))
StormClusterState協(xié)議封裝了一組storm與zookeeper進(jìn)行交互的函數(shù),可以將StormClusterState協(xié)議中的函數(shù)看成ClusterState協(xié)議中函數(shù)的"組合"。StormClusterState協(xié)議定義如下:
StormClusterState協(xié)議
(defprotocol StormClusterState (assignments [this callback]) (assignment-info [this storm-id callback]) (assignment-info-with-version [this storm-id callback]) (assignment-version [this storm-id callback]) (active-storms [this]) (storm-base [this storm-id callback]) (get-worker-heartbeat [this storm-id node port]) (executor-beats [this storm-id executor->node+port]) (supervisors [this callback]) (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist (setup-heartbeats! [this storm-id]) (teardown-heartbeats! [this storm-id]) (teardown-topology-errors! [this storm-id]) (heartbeat-storms [this]) (error-topologies [this]) (worker-heartbeat! [this storm-id node port info]) (remove-worker-heartbeat! [this storm-id node port]) (supervisor-heartbeat! [this supervisor-id info]) (activate-storm! [this storm-id storm-base]) (update-storm! [this storm-id new-elems]) (remove-storm-base! [this storm-id]) (set-assignment! [this storm-id info]) (remove-storm! [this storm-id]) (report-error [this storm-id task-id node port error]) (errors [this storm-id task-id]) (disconnect [this]))
命名空間backtype.storm.cluster除了定義ClusterState和StormClusterState這兩個(gè)重要協(xié)議外,還定義了兩個(gè)重要函數(shù):mk-distributed-cluster-state和mk-storm-cluster-state。
mk-distributed-cluster-state函數(shù)如下:
該函數(shù)返回一個(gè)實(shí)現(xiàn)了ClusterState協(xié)議的對(duì)象,通過這個(gè)對(duì)象就可以與zookeeper進(jìn)行交互了。
mk-distributed-cluster-state函數(shù)
(defn mk-distributed-cluster-state
;; conf綁定了storm.yaml中的配置信息,是一個(gè)map對(duì)象
[conf]
;; zk綁定一個(gè)zk client,Storm使用CuratorFramework與Zookeeper進(jìn)行交互
(let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]
;; 創(chuàng)建storm集群在zookeeper上的根目錄,默認(rèn)值為/storm
(zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
(.close zk))
;; callbacks綁定回調(diào)函數(shù)集合,是一個(gè)map對(duì)象
(let [callbacks (atom {})
;; active標(biāo)示zookeeper集群狀態(tài)
active (atom true)
;; zk重新綁定新的zk client,該zk client設(shè)置了watcher,這樣當(dāng)zookeeper集群的狀態(tài)發(fā)生變化時(shí),zk server會(huì)給zk client發(fā)送相應(yīng)的event,zk client設(shè)置的watcher會(huì)調(diào)用callbacks中相應(yīng)回調(diào)函數(shù)來處理event
;; 啟動(dòng)nimbus時(shí),callbacks是一個(gè)空集合,所以nimbus端收到event后不會(huì)調(diào)用任何回調(diào)函數(shù);但是啟動(dòng)supervisor時(shí),callbacks中注冊(cè)了回調(diào)函數(shù),所以當(dāng)supervisor收到zk server發(fā)送的event后,會(huì)調(diào)用相應(yīng)的回調(diào)函數(shù)
;; mk-client函數(shù)定義在zookeeper.clj文件中,請(qǐng)參見其定義部分
zk (zk/mk-client conf
(conf STORM-ZOOKEEPER-SERVERS)
(conf STORM-ZOOKEEPER-PORT)
:auth-conf conf
:root (conf STORM-ZOOKEEPER-ROOT)
;; :watcher綁定一個(gè)函數(shù),指定zk client的默認(rèn)watcher函數(shù),state標(biāo)示當(dāng)前zk client的狀態(tài);type標(biāo)示事件類型;path標(biāo)示zookeeper上產(chǎn)生該事件的znode
;; 該watcher函數(shù)主要功能就是執(zhí)行callbacks集合中的函數(shù),callbacks集合中的函數(shù)是在mk-storm-cluster-state函數(shù)中通過調(diào)用ClusterState的register函數(shù)添加的
:watcher (fn [state type path]
(when @active
(when-not (= :connected state)
(log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
(when-not (= :none type)
(doseq [callback (vals @callbacks)]
(callback type path))))))]
;; reify相當(dāng)于java中的implements,這里表示實(shí)現(xiàn)一個(gè)協(xié)議
(reify
ClusterState
;; register函數(shù)用于將回調(diào)函數(shù)加入callbacks中,key是一個(gè)32位的標(biāo)識(shí)
(register
[this callback]
(let [id (uuid)]
(swap! callbacks assoc id callback)
id))
;; unregister函數(shù)用于將指定key的回調(diào)函數(shù)從callbacks中刪除
(unregister
[this id]
(swap! callbacks dissoc id))
;; 在zookeeper上添加一個(gè)臨時(shí)節(jié)點(diǎn)
(set-ephemeral-node
[this path data]
(zk/mkdirs zk (parent-path path))
(if (zk/exists zk path false)
(try-cause
(zk/set-data zk path data) ; should verify that it's ephemeral
(catch KeeperException$NoNodeException e
(log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
(zk/create-node zk path data :ephemeral)
))
(zk/create-node zk path data :ephemeral)))
;; 在zookeeper上添加一個(gè)順序節(jié)點(diǎn)
(create-sequential
[this path data]
(zk/create-node zk path data :sequential))
;; 修改某個(gè)節(jié)點(diǎn)數(shù)據(jù)
(set-data
[this path data]
;; note: this does not turn off any existing watches
(if (zk/exists zk path false)
(zk/set-data zk path data)
(do
(zk/mkdirs zk (parent-path path))
(zk/create-node zk path data :persistent))))
;; 刪除指定節(jié)點(diǎn)
(delete-node
[this path]
(zk/delete-recursive zk path))
;; 獲取指定節(jié)點(diǎn)數(shù)據(jù)。path標(biāo)示節(jié)點(diǎn)路徑;watch?是一個(gè)布爾類型值,表示是否需要對(duì)該節(jié)點(diǎn)進(jìn)行"觀察",如果watch?=true,當(dāng)調(diào)用set-data函數(shù)修改該節(jié)點(diǎn)數(shù)據(jù)后,
;; 會(huì)給zk client發(fā)送一個(gè)事件,zk client接收事件后,會(huì)調(diào)用創(chuàng)建zk client時(shí)指定的默認(rèn)watcher函數(shù)(即:watcher綁定的函數(shù))
(get-data
[this path watch?]
(zk/get-data zk path watch?))
;; 與get-data函數(shù)的區(qū)別就是獲取指定節(jié)點(diǎn)數(shù)據(jù)的同時(shí),獲取節(jié)點(diǎn)數(shù)據(jù)的version,version表示節(jié)點(diǎn)數(shù)據(jù)修改的次數(shù)
(get-data-with-version
[this path watch?]
(zk/get-data-with-version zk path watch?))
;; 獲取指定節(jié)點(diǎn)的version,watch?的含義與get-data函數(shù)中的watch?相同
(get-version
[this path watch?]
(zk/get-version zk path watch?))
;; 獲取指定節(jié)點(diǎn)的子節(jié)點(diǎn)列表,watch?的含義與get-data函數(shù)中的watch?相同
(get-children
[this path watch?]
(zk/get-children zk path watch?))
;; 在zookeeper上創(chuàng)建一個(gè)節(jié)點(diǎn)
(mkdirs
[this path]
(zk/mkdirs zk path))
;; 關(guān)閉zk client
(close
[this]
(reset! active false)
(.close zk)))))
mk-storm-cluster-state函數(shù)定義如下:
mk-storm-cluster-state函數(shù)非常重要,該函數(shù)返回一個(gè)實(shí)現(xiàn)了StormClusterState協(xié)議的實(shí)例,通過該實(shí)例storm就可以更加方便與zookeeper進(jìn)行交互。
在啟動(dòng)nimbus和supervisor的函數(shù)中均調(diào)用了mk-storm-cluster-state函數(shù)。關(guān)于nimbus和supervisor的啟動(dòng)將在之后的文章中介紹。
mk-storm-cluster-state函數(shù)
(defn mk-storm-cluster-state
[cluster-state-spec]
;; satisfies?謂詞相當(dāng)于java中的instanceof,判斷cluster-state-spec是不是ClusterState實(shí)例
(let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
[false cluster-state-spec]
[true (mk-distributed-cluster-state cluster-state-spec)])
;; 綁定topology id->回調(diào)函數(shù)的map,當(dāng)/assignments/{topology id}數(shù)據(jù)發(fā)生變化時(shí),zk client執(zhí)行assignment-info-callback中topology id所對(duì)應(yīng)的回調(diào)函數(shù)
assignment-info-callback (atom {})
;; assignment-info-with-version-callback與assignment-info-callback類似
assignment-info-with-version-callback (atom {})
;; assignment-version-callback與assignments-callback類似
assignment-version-callback (atom {})
;; 當(dāng)/supervisors標(biāo)示的znode的子節(jié)點(diǎn)發(fā)生變化時(shí),zk client執(zhí)行supervisors-callback指向的函數(shù)
supervisors-callback (atom nil)
;; 當(dāng)/assignments標(biāo)示的znode的子節(jié)點(diǎn)發(fā)生變化時(shí),zk client執(zhí)行assignments-callback指向的函數(shù)
assignments-callback (atom nil)
;; 當(dāng)/storms/{topology id}標(biāo)示的znode的數(shù)據(jù)發(fā)生變化時(shí),zk client執(zhí)行storm-base-callback中topology id所對(duì)應(yīng)的回調(diào)函數(shù)
storm-base-callback (atom {})
;; register函數(shù)將"回調(diào)函數(shù)(fn ...)"添加到cluster-state的callbacks集合中,并返回標(biāo)示該回調(diào)函數(shù)的uuid
state-id (register
cluster-state
;; 定義"回調(diào)函數(shù)",type標(biāo)示事件類型,path標(biāo)示znode
(fn [type path]
;; subtree綁定路徑前綴如"assignments"、"storms"、"supervisors"等,args存放topology id
(let [[subtree & args] (tokenize-path path)]
;; condp相當(dāng)于java中的switch
(condp = subtree
;; 當(dāng)subtree="assignments"時(shí),如果args為空,說明是/assignments的子節(jié)點(diǎn)發(fā)生變化,執(zhí)行assignments-callback指向的回調(diào)函數(shù),否則
;; 說明/assignments/{topology id}標(biāo)示的節(jié)點(diǎn)數(shù)據(jù)發(fā)生變化,執(zhí)行assignment-info-callback指向的回調(diào)函數(shù)
ASSIGNMENTS-ROOT (if (empty? args)
(issue-callback! assignments-callback)
(issue-map-callback! assignment-info-callback (first args)))
;; 當(dāng)subtree="supervisors"時(shí),說明是/supervisors的子節(jié)點(diǎn)發(fā)生變化,執(zhí)行supervisors-callback指向的回調(diào)函數(shù)
SUPERVISORS-ROOT (issue-callback! supervisors-callback)
;; 當(dāng)subtree="storms"時(shí),說明是/storms/{topology id}標(biāo)示的節(jié)點(diǎn)數(shù)據(jù)發(fā)生變化,執(zhí)行storm-base-callback指向的回調(diào)函數(shù)
STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
;; this should never happen
(exit-process! 30 "Unknown callback for subtree " subtree args)))))]
;; 在zookeeper上創(chuàng)建storm運(yùn)行topology所必需的znode
(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
(mkdirs cluster-state p))
;; 返回一個(gè)實(shí)現(xiàn)StormClusterState協(xié)議的實(shí)例
(reify
StormClusterState
;; 獲取/assignments的子節(jié)點(diǎn)列表,如果callback不為空,將其賦值給assignments-callback,并對(duì)/assignments添加"節(jié)點(diǎn)觀察"
(assignments
[this callback]
(when callback
(reset! assignments-callback callback))
(get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
;; 獲取/assignments/{storm-id}節(jié)點(diǎn)數(shù)據(jù),即storm-id的分配信息,如果callback不為空,將其添加到assignment-info-callback中,并對(duì)/assignments/{storm-id}添加"數(shù)據(jù)觀察"
(assignment-info
[this storm-id callback]
(when callback
(swap! assignment-info-callback assoc storm-id callback))
(maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))
;; 獲取/assignments/{storm-id}節(jié)點(diǎn)數(shù)據(jù)包括version信息,如果callback不為空,將其添加到assignment-info-with-version-callback中,并對(duì)/assignments/{storm-id}添加"數(shù)據(jù)觀察"
(assignment-info-with-version
[this storm-id callback]
(when callback
(swap! assignment-info-with-version-callback assoc storm-id callback))
(let [{data :data version :version}
(get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))]
{:data (maybe-deserialize data)
:version version}))
;; 獲取/assignments/{storm-id}節(jié)點(diǎn)數(shù)據(jù)的version信息,如果callback不為空,將其添加到assignment-version-callback中,并對(duì)/assignments/{storm-id}添加"數(shù)據(jù)觀察"
(assignment-version
[this storm-id callback]
(when callback
(swap! assignment-version-callback assoc storm-id callback))
(get-version cluster-state (assignment-path storm-id) (not-nil? callback)))
;; 獲取storm集群中正在運(yùn)行的topology id即/storms的子節(jié)點(diǎn)列表
(active-storms
[this]
(get-children cluster-state STORMS-SUBTREE false))
;; 獲取storm集群中所有有心跳的topology id即/workerbeats的子節(jié)點(diǎn)列表
(heartbeat-storms
[this]
(get-children cluster-state WORKERBEATS-SUBTREE false))
;; 獲取所有有錯(cuò)誤的topology id即/errors的子節(jié)點(diǎn)列表
(error-topologies
[this]
(get-children cluster-state ERRORS-SUBTREE false))
;; 獲取指定storm-id進(jìn)程的心跳信息,即/workerbeats/{storm-id}/{node-port}節(jié)點(diǎn)數(shù)據(jù)
(get-worker-heartbeat
[this storm-id node port]
(-> cluster-state
(get-data (workerbeat-path storm-id node port) false)
maybe-deserialize))
;; 獲取指定進(jìn)程中所有線程的心跳信息
(executor-beats
[this storm-id executor->node+port]
;; need to take executor->node+port in explicitly so that we don't run into a situation where a
;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
;; we avoid situations like that
(let [node+port->executors (reverse-map executor->node+port)
all-heartbeats (for [[[node port] executors] node+port->executors]
(->> (get-worker-heartbeat this storm-id node port)
(convert-executor-beats executors)
))]
(apply merge all-heartbeats)))
;; 獲取/supervisors的子節(jié)點(diǎn)列表,如果callback不為空,將其賦值給supervisors-callback,并對(duì)/supervisors添加"節(jié)點(diǎn)觀察"
(supervisors
[this callback]
(when callback
(reset! supervisors-callback callback))
(get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))
;; 獲取/supervisors/{supervisor-id}節(jié)點(diǎn)數(shù)據(jù),即supervisor的心跳信息
(supervisor-info
[this supervisor-id]
(maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))
;; 設(shè)置進(jìn)程心跳信息
(worker-heartbeat!
[this storm-id node port info]
(set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))
;; 刪除進(jìn)程心跳信息
(remove-worker-heartbeat!
[this storm-id node port]
(delete-node cluster-state (workerbeat-path storm-id node port)))
;; 創(chuàng)建指定storm-id的topology的用于存放心跳信息的節(jié)點(diǎn)
(setup-heartbeats!
[this storm-id]
(mkdirs cluster-state (workerbeat-storm-root storm-id)))
;; 刪除指定storm-id的topology的心跳信息節(jié)點(diǎn)
(teardown-heartbeats!
[this storm-id]
(try-cause
(delete-node cluster-state (workerbeat-storm-root storm-id))
(catch KeeperException e
(log-warn-error e "Could not teardown heartbeats for " storm-id))))
;; 刪除指定storm-id的topology的錯(cuò)誤信息節(jié)點(diǎn)
(teardown-topology-errors!
[this storm-id]
(try-cause
(delete-node cluster-state (error-storm-root storm-id))
(catch KeeperException e
(log-warn-error e "Could not teardown errors for " storm-id))))
;; 創(chuàng)建臨時(shí)節(jié)點(diǎn)存放supervisor的心跳信息
(supervisor-heartbeat!
[this supervisor-id info]
(set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)))
;; 創(chuàng)建/storms/{storm-id}節(jié)點(diǎn)
(activate-storm!
[this storm-id storm-base]
(set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)))
;; 更新topology對(duì)應(yīng)的StormBase對(duì)象,即更新/storm/{storm-id}節(jié)點(diǎn)
(update-storm!
[this storm-id new-elems]
;; base綁定storm-id在zookeeper上的StormBase對(duì)象
(let [base (storm-base this storm-id nil)
;; executors綁定component名稱->組件并行度的map
executors (:component->executors base)
;; new-elems綁定合并后的組件并行度map,update函數(shù)將組件新并行度map合并到舊map中
new-elems (update new-elems :component->executors (partial merge executors))]
;; 更新StormBase對(duì)象中的組件并行度map,并寫入zookeeper的/storms/{storm-id}節(jié)點(diǎn)
(set-data cluster-state (storm-path storm-id)
(-> base
(merge new-elems)
Utils/serialize))))
;; 獲取storm-id的StormBase對(duì)象,即讀取/storms/{storm-id}節(jié)點(diǎn)數(shù)據(jù),如果callback不為空,將其賦值給storm-base-callback,并為/storms/{storm-id}節(jié)點(diǎn)添加"數(shù)據(jù)觀察"
(storm-base
[this storm-id callback]
(when callback
(swap! storm-base-callback assoc storm-id callback))
(maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))
;; 刪除storm-id的StormBase對(duì)象,即刪除/storms/{storm-id}節(jié)點(diǎn)
(remove-storm-base!
[this storm-id]
(delete-node cluster-state (storm-path storm-id)))
;; 更新storm-id的分配信息,即更新/assignments/{storm-id}節(jié)點(diǎn)數(shù)據(jù)
(set-assignment!
[this storm-id info]
(set-data cluster-state (assignment-path storm-id) (Utils/serialize info)))
;; 刪除storm-id的分配信息,同時(shí)刪除其StormBase信息,即刪除/assignments/{storm-id}節(jié)點(diǎn)和/storms/{storm-id}節(jié)點(diǎn)
(remove-storm!
[this storm-id]
(delete-node cluster-state (assignment-path storm-id))
(remove-storm-base! this storm-id))
;; 將組件異常信息寫入zookeeper
(report-error
[this storm-id component-id node port error]
;; path綁定"/errors/{storm-id}/{component-id}"
(let [path (error-path storm-id component-id)
;; data綁定異常信息,包括異常時(shí)間、異常堆棧信息、主機(jī)和端口
data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}
;; 創(chuàng)建/errors/{storm-id}/{component-id}節(jié)點(diǎn)
_ (mkdirs cluster-state path)
;; 創(chuàng)建/errors/{storm-id}/{component-id}的子順序節(jié)點(diǎn),并寫入異常信息
_ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
;; to-kill綁定除去順序節(jié)點(diǎn)編號(hào)最大的前10個(gè)節(jié)點(diǎn)的剩余節(jié)點(diǎn)的集合
to-kill (->> (get-children cluster-state path false)
(sort-by parse-error-path)
reverse
(drop 10))]
;; 刪除to-kill中包含的節(jié)點(diǎn)
(doseq [k to-kill]
(delete-node cluster-state (str path "/" k)))))
;; 得到給定的storm-id component-id下的異常信息
(errors
[this storm-id component-id]
(let [path (error-path storm-id component-id)
_ (mkdirs cluster-state path)
children (get-children cluster-state path false)
errors (dofor [c children]
(let [data (-> (get-data cluster-state (str path "/" c) false)
maybe-deserialize)]
(when data
(struct TaskError (:error data) (:time-secs data) (:host data) (:port data))
)))
]
(->> (filter not-nil? errors)
(sort-by (comp - :time-secs)))))
;; 關(guān)閉連接,在關(guān)閉連接前,將回調(diào)函數(shù)從cluster-state的callbacks中刪除
(disconnect
[this]
(unregister cluster-state state-id)
(when solo?
(close cluster-state))))))
zookeeper.clj中mk-client函數(shù)
mk-client函數(shù)創(chuàng)建一個(gè)CuratorFramework實(shí)例,為該實(shí)例注冊(cè)了CuratorListener,當(dāng)一個(gè)后臺(tái)操作完成或者指定的watch被觸發(fā)時(shí)將會(huì)執(zhí)行CuratorListener中的eventReceived()。eventReceived中調(diào)用的wacher函數(shù)就是mk-distributed-cluster-state中:watcher綁定的函數(shù)。
(defnk mk-client
[conf servers port
:root ""
:watcher default-watcher
:auth-conf nil]
(let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]
(.. fk
(getCuratorListenable)
(addListener
(reify CuratorListener
(^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
(when (= (.getType e) CuratorEventType/WATCHED)
(let [^WatchedEvent event (.getWatchedEvent e)]
(watcher (zk-keeper-states (.getState event))
(zk-event-types (.getType event))
(.getPath event))))))))
(.start fk)
fk))
以上就是storm與zookeeper進(jìn)行交互的源碼分析,我覺得最重要的部分就是如何給zk client添加"wacher",storm的很多功能都是通過zookeeper的wacher機(jī)制實(shí)現(xiàn)的,如"分配信息領(lǐng)取"。添加"wacher"大概分為以下幾個(gè)步驟:
mk-distributed-cluster-state函數(shù)創(chuàng)建了一個(gè)zk client,并通過:watcher給該zk client指定了"wacher"函數(shù),這個(gè)"wacher"函數(shù)只是簡單調(diào)用ClusterState的callbacks集合中的函數(shù),這樣這個(gè)"wacher"函數(shù)執(zhí)行 哪些函數(shù)將由ClusterState實(shí)例決定
ClusterState實(shí)例提供register函數(shù)來更新callbacks集合,ClusterState實(shí)例被傳遞給了mk-storm-cluster-state函數(shù),在mk-storm-cluster-state中調(diào)用register添加了一個(gè)函數(shù)(fn [type path] ... ),這個(gè)函數(shù)實(shí)現(xiàn)了"watcher"函數(shù)的全部邏輯
mk-storm-cluster-state中注冊(cè)的函數(shù)執(zhí)行的具體內(nèi)容由StormClusterState實(shí)例決定,對(duì)zookeeper節(jié)點(diǎn)添加"觀察"也是通過StormClusterState實(shí)例實(shí)現(xiàn)的,這樣我們就可以通過StormClusterState實(shí)例對(duì)我們感興趣的節(jié)點(diǎn)添加"觀察"和"回調(diào)函數(shù)",當(dāng)節(jié)點(diǎn)或節(jié)點(diǎn)數(shù)據(jù)發(fā)生變化后,zk server就會(huì)給zk client發(fā)送"通知",zk client中的"wather"函數(shù)將被調(diào)用,進(jìn)而我們注冊(cè)的"回到函數(shù)"將被執(zhí)行。
總結(jié)
這部分源碼與zookeeper聯(lián)系十分緊密,涉及了很多zookeeper中的概念和特性,如"數(shù)據(jù)觀察"和"節(jié)點(diǎn)觀察"等,有關(guān)zookeeper的wacher機(jī)制請(qǐng)參考:http://www.dhdzp.com/article/124295.htm,storm并沒有直接使用zookeeper的api,而是使用Curator框架,Curator框架簡化了訪問zookeeper的操作。關(guān)于Curator框架請(qǐng)參考:http://www.dhdzp.com/article/125785.htm。
以上就是本文關(guān)于源碼閱讀之storm操作zookeeper-cluster.clj的全部內(nèi)容了,感興趣的朋友可以參閱:zookeeper watch機(jī)制的理解、apache zookeeper使用方法實(shí)例詳解、為zookeeper配置相應(yīng)的acl權(quán)限等,希望對(duì)大家有所幫助。感謝各位的閱讀!
相關(guān)文章
springboot中如何實(shí)現(xiàn)kafa指定offset消費(fèi)
這篇文章主要介紹了springboot中如何實(shí)現(xiàn)kafa指定offset消費(fèi),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
Java算法之?dāng)?shù)組冒泡排序代碼實(shí)例講解
這篇文章主要介紹了Java算法之?dāng)?shù)組冒泡排序代碼實(shí)例講解,文中用代碼舉例講解的很清晰,有感興趣的同學(xué)可以研究下2021-03-03
SpringBoot實(shí)現(xiàn)掃碼登錄的項(xiàng)目實(shí)踐
本文主要介紹了SpringBoot實(shí)現(xiàn)掃碼登錄的項(xiàng)目實(shí)踐,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
java中xml和對(duì)象之間的互相轉(zhuǎn)換方法
在java開發(fā)中我們經(jīng)常會(huì)遇到Xml與對(duì)象互相轉(zhuǎn)換的情況,這篇文章主要給大家介紹了關(guān)于java中xml和對(duì)象之間的互相轉(zhuǎn)換方法,文中給出了兩種解決方法,需要的朋友可以參考下2023-06-06
Java虛擬機(jī)運(yùn)行時(shí)數(shù)據(jù)區(qū)域匯總
這篇文章主要給大家介紹了關(guān)于Java虛擬機(jī)運(yùn)行時(shí)數(shù)據(jù)區(qū)域的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08
基于Retrofit+Rxjava實(shí)現(xiàn)帶進(jìn)度顯示的下載文件
這篇文章主要為大家詳細(xì)介紹了基于Retrofit+Rxjava實(shí)現(xiàn)帶進(jìn)度顯示的下載文件,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-05-05
Spring Data JPA調(diào)用存儲(chǔ)過程實(shí)例代碼
本篇文章主要介紹了Spring Data JPA調(diào)用存儲(chǔ)過程實(shí)例代碼,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-04-04

