Nacos配置中心集群原理及源碼分析
Nacos作為配置中心,必然需要保證服務(wù)節(jié)點(diǎn)的高可用性,那么Nacos是如何實(shí)現(xiàn)集群的呢?
下面這個(gè)圖,表示Nacos集群的部署圖。

Nacos集群工作原理
Nacos作為配置中心的集群結(jié)構(gòu)中,是一種無中心化節(jié)點(diǎn)的設(shè)計(jì),由于沒有主從節(jié)點(diǎn),也沒有選舉機(jī)制,所以為了能夠?qū)崿F(xiàn)熱備,就需要增加虛擬IP(VIP)。
Nacos的數(shù)據(jù)存儲(chǔ)分為兩部分
- Mysql數(shù)據(jù)庫存儲(chǔ),所有Nacos節(jié)點(diǎn)共享同一份數(shù)據(jù),數(shù)據(jù)的副本機(jī)制由Mysql本身的主從方案來解決,從而保證數(shù)據(jù)的可靠性。
- 每個(gè)節(jié)點(diǎn)的本地磁盤,會(huì)保存一份全量數(shù)據(jù),具體路徑:/data/program/nacos-1/data/config-data/${GROUP}.
在Nacos的設(shè)計(jì)中,Mysql是一個(gè)中心數(shù)據(jù)倉庫,且認(rèn)為在Mysql中的數(shù)據(jù)是絕對(duì)正確的。 除此之外,Nacos在啟動(dòng)時(shí)會(huì)把Mysql中的數(shù)據(jù)寫一份到本地磁盤。
這么設(shè)計(jì)的好處是可以提高性能,當(dāng)客戶端需要請(qǐng)求某個(gè)配置項(xiàng)時(shí),服務(wù)端會(huì)想Ian從磁盤中讀取對(duì)應(yīng)文件返回,而磁盤的讀取效率要比數(shù)據(jù)庫效率高。
當(dāng)配置發(fā)生變更時(shí):
- Nacos會(huì)把變更的配置保存到數(shù)據(jù)庫,然后再寫入本地文件。
- 接著發(fā)送一個(gè)HTTP請(qǐng)求,給到集群中的其他節(jié)點(diǎn),其他節(jié)點(diǎn)收到事件后,從Mysql中dump剛剛寫入的數(shù)據(jù)到本地文件中。
另外,NacosServer啟動(dòng)后,會(huì)同步啟動(dòng)一個(gè)定時(shí)任務(wù),每隔6小時(shí),會(huì)dump一次全量數(shù)據(jù)到本地文件
配置變更同步入口
當(dāng)配置發(fā)生修改、刪除、新增操作時(shí),通過發(fā)布一個(gè)notifyConfigChange事件。
@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema) throws NacosException {
//省略..
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
}//省略
return true;
}
AsyncNotifyService
配置數(shù)據(jù)變更事件,專門有一個(gè)監(jiān)聽器AsyncNotifyService,它會(huì)處理數(shù)據(jù)變更后的同步事件。
@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
// Register ConfigDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
// Register A Subscriber to subscribe ConfigDataChangeEvent.
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
// Generate ConfigDataChangeEvent concurrently
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
Collection<Member> ipList = memberManager.allMembers(); //得到集群中的ip列表
// 構(gòu)建NotifySingleTask,并添加到隊(duì)列中。
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
for (Member member : ipList) { //遍歷集群中的每個(gè)節(jié)點(diǎn)
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
evt.isBeta));
}
//異步執(zhí)行任務(wù) AsyncTask
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
}
}
@Override
public Class<? extends Event> subscribeType() {
return ConfigDataChangeEvent.class;
}
});
}
AsyncTask
@Override
public void run() {
executeAsyncInvoke();
}
private void executeAsyncInvoke() {
while (!queue.isEmpty()) {//遍歷隊(duì)列中的數(shù)據(jù),直到數(shù)據(jù)為空
NotifySingleTask task = queue.poll(); //獲取task
String targetIp = task.getTargetIP(); //獲取目標(biāo)ip
if (memberManager.hasMember(targetIp)) { //如果集群中的ip列表包含目標(biāo)ip
// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
//判斷目標(biāo)ip的健康狀態(tài)
boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); //
if (unHealthNeedDelay) { //如果目標(biāo)服務(wù)是非健康,則繼續(xù)添加到隊(duì)列中,延后再執(zhí)行。
// target ip is unhealthy, then put it in the notification list
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
0, task.target);
// get delay time and set fail count to the task
asyncTaskExecute(task);
} else {
//構(gòu)建header
Header header = Header.newInstance();
header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
if (task.isBeta) {
header.addParam("isBeta", "true");
}
AuthHeaderUtil.addIdentityToHeader(header);
//通過restTemplate發(fā)起遠(yuǎn)程調(diào)用,如果調(diào)用成功,則執(zhí)行AsyncNotifyCallBack的回調(diào)方法
restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
}
}
}
}
目標(biāo)節(jié)點(diǎn)接收請(qǐng)求
數(shù)據(jù)同步的請(qǐng)求地址為,task.url=http://192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
@RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "tag", required = false) String tag) {
dataId = dataId.trim();
group = group.trim();
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
String isBetaStr = request.getHeader("isBeta");
if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
//
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}
return true;
}
dumpService.dump用來實(shí)現(xiàn)配置的更新,代碼如下
當(dāng)前任務(wù)會(huì)被添加到DumpTaskMgr中管理。
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
boolean isBeta) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}
TaskManager.addTask, 先調(diào)用父類去完成任務(wù)添加。
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
super.addTask(key, newTask);
MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}
在這種場(chǎng)景設(shè)計(jì)中,一般都會(huì)采用生產(chǎn)者消費(fèi)者模式來完成,因此這里不難猜測(cè)到,任務(wù)會(huì)被保存到一個(gè)隊(duì)列中,然后有另外一個(gè)線程來執(zhí)行。
NacosDelayTaskExecuteEngine
TaskManager的父類是NacosDelayTaskExecuteEngine,
這個(gè)類中有一個(gè)成員屬性protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;,專門來保存延期執(zhí)行的任務(wù)類型AbstractDelayTask.
在這個(gè)類的構(gòu)造方法中,初始化了一個(gè)延期執(zhí)行的任務(wù),其中具體的任務(wù)是ProcessRunnable.
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
ProcessRunnable
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
processTasks
protected void processTasks() {
//獲取所有的任務(wù)
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
//獲取任務(wù)處理器,這里返回的是DumpProcessor
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed
//執(zhí)行具體任務(wù)
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
retryFailedTask(taskKey, task);
}
}
}
DumpProcessor.process
讀取數(shù)據(jù)庫的最新數(shù)據(jù),然后更新本地緩存和磁盤
以上就是Nacos配置中心集群原理及源碼分析的詳細(xì)內(nèi)容,更多關(guān)于Nacos配置中心集群原理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
MyBatis-Plus中通用枚舉的實(shí)現(xiàn)
表中的有些字段值是固定的此時(shí)我們可以使用MyBatis-Plus的通用枚舉來實(shí)現(xiàn),本文主要介紹了MyBatis-Plus中通用枚舉的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2024-05-05
Springboot使用POI實(shí)現(xiàn)導(dǎo)出Excel文件示例
本篇文章主要介紹了Springboot使用POI實(shí)現(xiàn)導(dǎo)出Excel文件示例,非常具有實(shí)用價(jià)值,需要的朋友可以參考下。2017-02-02
如何解決報(bào)錯(cuò):java.net.BindException:無法指定被請(qǐng)求的地址問題
在Linux虛擬機(jī)上安裝并啟動(dòng)Tomcat時(shí)遇到啟動(dòng)失敗的問題,通過檢查端口及配置文件未發(fā)現(xiàn)異常,后發(fā)現(xiàn)/etc/hosts文件中缺少localhost的映射,添加后重啟Tomcat成功,Tomcat啟動(dòng)時(shí)會(huì)檢查localhost的IP映射,缺失或錯(cuò)誤都可能導(dǎo)致啟動(dòng)失敗2024-10-10
SpringCloud Edgware.SR3版本中Ribbon的timeout設(shè)置方法
今天小編就為大家分享一篇關(guān)于SpringCloud Edgware.SR3版本中Ribbon的timeout設(shè)置方法,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2018-12-12
Java構(gòu)建JDBC應(yīng)用程序的實(shí)例操作
在本篇文章里小編給大家整理了一篇關(guān)于Java構(gòu)建JDBC應(yīng)用程序的實(shí)例操作,有興趣的朋友們可以學(xué)習(xí)參考下。2021-03-03
關(guān)于@Autowired的使用及注意事項(xiàng)
這篇文章主要介紹了關(guān)于@Autowired的使用及注意事項(xiàng),具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05
Java中數(shù)組如何轉(zhuǎn)為字符串的幾種方法
數(shù)組是java中一個(gè)重要的類型,小伙伴們知道如何將數(shù)組轉(zhuǎn)為字符串嗎,這篇文章主要給大家介紹了關(guān)于Java中數(shù)組如何轉(zhuǎn)為字符串的幾種方法,需要的朋友可以參考下2024-03-03
java中unicode和中文相互轉(zhuǎn)換的簡(jiǎn)單實(shí)現(xiàn)
下面小編就為大家?guī)硪黄猨ava中unicode和中文相互轉(zhuǎn)換的簡(jiǎn)單實(shí)現(xiàn)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-08-08
java并發(fā)編程JUC CountDownLatch線程同步
這篇文章主要介紹CountDownLatch是什么、CountDownLatch 如何工作、CountDownLatch 的代碼例子來展開對(duì)java并發(fā)編程JUC CountDownLatch線程同步,需要的朋友可以參考下面文章內(nèi)容2021-09-09

