源碼分析Nacos如何動態(tài)刷新配置
一 Nacos 刷新配置的源碼閱讀
在 ClientWorker 中配置了 定義了一個 的內(nèi)部類 LongPollingRunnable 并實現(xiàn)了Runnable 接口 直接到 cacheData.checkListenerMd5() 這個方法
public void run() {
// 獲取定義的Group
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// check failover config
for (CacheData cacheData : cacheMap.values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// check server config
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
if (!CollectionUtils.isEmpty(changedGroupKeys)) {
LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);
if (null != ct[1]) {
cache.setType(ct[1]);
}
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(ct[0]), ct[1]);
} catch (NacosException ioe) {
String message = String
.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
// 檢查當(dāng)前 配置文件的md5值是否改變
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
executorService.execute(this);
} catch (Throwable e) {
// If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
}
檢查當(dāng)前的md5值是否更改
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
// 如果md5值變了,就發(fā)送對應(yīng)事件通知
safeNotifyListener(dataId, group, content, type, md5, wrap);
}
}
}
安全的通知監(jiān)聽器配置改變:
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
final String md5, final ManagerListenerWrap listenerWrap) {
// 從包裝類中取出監(jiān)聽器
final Listener listener = listenerWrap.listener;
// 創(chuàng)建一個通知任務(wù)(異步或同步執(zhí)行)
Runnable job = new Runnable() {
@Override
public void run() {
// 當(dāng)前線程的原始類加載器
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
// 獲取監(jiān)聽器所屬類的類加載器(用于類加載隔離)
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
// 如果監(jiān)聽器是共享監(jiān)聽器的子類,設(shè)置上下文信息
if (listener instanceof AbstractSharedListener) {
AbstractSharedListener adapter = (AbstractSharedListener) listener;
adapter.fillContext(dataId, group);
LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
}
// 設(shè)置線程上下文類加載器為應(yīng)用加載器(避免多應(yīng)用部署時,SPI等加載錯類)
Thread.currentThread().setContextClassLoader(appClassLoader);
// 構(gòu)造配置響應(yīng)對象
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
// 通過過濾鏈處理配置(比如解密、轉(zhuǎn)換等)
configFilterChainManager.doFilter(null, cr);
// 獲取處理后的配置內(nèi)容
String contentTmp = cr.getContent();
// 調(diào)用監(jiān)聽器的 receiveConfigInfo 方法通知變更
listener.receiveConfigInfo(contentTmp);
// 如果是支持配置變更事件的監(jiān)聽器,觸發(fā)對應(yīng)事件
if (listener instanceof AbstractConfigChangeListener) {
// 解析變更內(nèi)容(對比老配置和新配置)
Map data = ConfigChangeHandler.getInstance()
.parseChangeData(listenerWrap.lastContent, content, type);
// 構(gòu)造事件對象并通知監(jiān)聽器
ConfigChangeEvent event = new ConfigChangeEvent(data);
((AbstractConfigChangeListener) listener).receiveConfigChange(event);
// 記錄這次通知的內(nèi)容
listenerWrap.lastContent = content;
}
// 更新上一次調(diào)用的 MD5 值
listenerWrap.lastCallMd5 = md5;
// 打印通知成功日志
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
listener);
} catch (NacosException ex) {
// 特定 Nacos 異常處理
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
} catch (Throwable t) {
// 捕獲所有其他異常,避免通知失敗影響主線程
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
group, md5, listener, t.getCause());
} finally {
// 恢復(fù)原始線程類加載器,避免線程池復(fù)用帶來問題
Thread.currentThread().setContextClassLoader(myClassLoader);
}
}
};
// 記錄通知開始時間
final long startNotify = System.currentTimeMillis();
try {
// 如果監(jiān)聽器提供了自定義線程池,則用線程池異步執(zhí)行
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
// 否則直接當(dāng)前線程執(zhí)行
job.run();
}
} catch (Throwable t) {
// 執(zhí)行過程出錯日志打印
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
group, md5, listener, t.getCause());
}
// 記錄通知完成時間
final long finishNotify = System.currentTimeMillis();
LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
name, (finishNotify - startNotify), dataId, group, md5, listener);
}
NacosContextRefresher 中 registerNacosListenersForApplications的方法
/**
* 為指定的 dataId + group 注冊一個 Nacos 配置監(jiān)聽器
* @param groupKey 配置分組(group)
* @param dataKey 配置標(biāo)識(dataId)
*/
private void registerNacosListener(final String groupKey, final String dataKey) {
// 生成一個唯一 key,用于標(biāo)識監(jiān)聽器(key = group + "++" + dataId)
String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
// 從 listenerMap 中獲取對應(yīng) key 的監(jiān)聽器,如果不存在則創(chuàng)建一個 AbstractSharedListener
Listener listener = listenerMap.computeIfAbsent(key,
lst -> new AbstractSharedListener() {
/**
* 當(dāng)配置變更時,會觸發(fā)該方法
*/
@Override
public void innerReceive(String dataId, String group, String configInfo) {
// 刷新次數(shù) +1(用于監(jiān)控/統(tǒng)計)
refreshCountIncrement();
// 記錄刷新歷史
nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
// 發(fā)布 Spring 的 RefreshEvent,通知上下文環(huán)境配置已變更
// 注意:這里是全量刷新,
applicationContext.publishEvent(
new RefreshEvent(this, null, "Refresh Nacos config"));
// 如果開啟了 debug 日志,打印變更信息
if (log.isDebugEnabled()) {
log.debug(String.format(
"Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
group, dataId, configInfo));
}
}
});
try {
// 調(diào)用 Nacos 客戶端 API,注冊監(jiān)聽器
configService.addListener(dataKey, groupKey, listener);
}
catch (NacosException e) {
// 注冊失敗,記錄警告日志
log.warn(String.format(
"register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
groupKey), e);
}
}
在SpringClould 中的 RefreshEventListener
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ApplicationReadyEvent) {
handle((ApplicationReadyEvent) event);
}
else if (event instanceof RefreshEvent) {
handle((RefreshEvent) event);
}
}
public void handle(RefreshEvent event) {
if (this.ready.get()) { // don't handle events before app is ready
log.debug("Event received " + event.getEventDesc());
Set<String> keys = this.refresh.refresh();
log.info("Refresh keys changed: " + keys);
}
}
ContextRefresher 中的 refresh 方法刷新所有作用域為 refresh 的bean
public synchronized Set<String> refresh() {
Set<String> keys = refreshEnvironment();
// 刷新所有的
this.scope.refreshAll();
return keys;
}
二 @refreshScope注解
定義
@RefreshScope 是 Spring Cloud 提供的注解,主要用于 ?支持配置的動態(tài)刷新?,特別是在結(jié)合像 Nacos、Consul、Spring Cloud Config 等配置中心時使用。@RefreshScope 使得標(biāo)注的 Bean 在配置變更并發(fā)布刷新事件時,能夠被重新實例化,從而實現(xiàn)“?配置熱更新?”。
使用背景
Spring Boot 默認(rèn)的 Bean 是單例的(@Singleton),一旦初始化完成,其屬性就不會再變化。如果你想在運行時通過配置中心動態(tài)刷新某個 Bean 中的屬性,就必須加上 @RefreshScope
與Nacos配合使用demo
1、依賴引入
確保你引入了以下依賴(以 Spring Boot 2.x / Spring Cloud Alibaba 為例):
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
2、application.yml配置
server:
port: 8080
spring:
application:
name: nacos-refresh-demo
cloud:
nacos:
config:
server-addr: 127.0.0.1:8848
file-extension: yaml
group: DEFAULT_GROUP
namespace: public
refresh-enabled: true
3、編寫配置類(使用 @RefreshScope)
package com.example.nacosdemo.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
@Data
@Component
@RefreshScope // 開啟動態(tài)刷新
@ConfigurationProperties(prefix = "custom")
public class CustomConfig {
private String name;
private Integer age;
}
4、測試 Controller
package com.example.nacosdemo.controller;
import com.example.nacosdemo.config.CustomConfig;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
public class TestController {
private final CustomConfig customConfig;
@GetMapping("/config")
public String getConfig() {
return "name: " + customConfig.getName() + ", age: " + customConfig.getAge();
}
}
然后更改你的Nacos中的配置,查看是否被更新呢
總結(jié)
觸發(fā)流程:
NacosConfigService內(nèi)部有ClientWorker線程定時輪詢配置變化;- 當(dāng)檢測到配置變更后,會回調(diào)配置監(jiān)聽器;
NacosContextRefresher是 Spring Cloud Alibaba 提供的監(jiān)聽器;- 它觸發(fā)
RefreshEvent事件; - Spring Cloud Context 的
RefreshScope監(jiān)聽RefreshEvent; - 清除舊 Bean 實例,下次注入重新構(gòu)建。
到此這篇關(guān)于源碼分析Nacos如何動態(tài)刷新配置的文章就介紹到這了,更多相關(guān)Nacos動態(tài)刷新配置內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Dom4j解析XML_動力節(jié)點Java學(xué)院整理
這篇文章主要介紹了Dom4j解析XML,dom4j是一個Java的XML API,類似于jdom,用來讀寫XML文件的,有興趣的可以了解一下2017-07-07
淺析SpringBoot中如何啟用MongoDB事務(wù)
這篇文章主要為大家詳細(xì)介紹了SpringBoot中如何啟用MongoDB事務(wù),文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-05-05
Spring Boot與Spark、Cassandra系統(tǒng)集成開發(fā)示例
本文演示以Spark作為分析引擎,Cassandra作為數(shù)據(jù)存儲,而使用Spring Boot來開發(fā)驅(qū)動程序的示例。對spring boot 與spark cassandra集成開發(fā)示例代碼感興趣的朋友跟著腳本之家小編一起學(xué)習(xí)吧2018-02-02
Java基于Spire Cloud Excel把Excel轉(zhuǎn)換成PDF
這篇文章主要介紹了Java基于Spire Cloud Excel把Excel轉(zhuǎn)換成PDF,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-05-05
詳談Spring是否支持對靜態(tài)方法進(jìn)行Aop增強(qiáng)
這篇文章主要介紹了Spring是否支持對靜態(tài)方法進(jìn)行Aop增強(qiáng),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
繼承JpaRepository后,找不到findOne()方法的解決
這篇文章主要介紹了繼承JpaRepository后,找不到findOne()方法的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08
SpringBoot中實現(xiàn)數(shù)據(jù)字典的示例代碼
這篇文章主要介紹了SpringBoot中實現(xiàn)數(shù)據(jù)字典的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09
java Iterator接口和LIstIterator接口分析
這篇文章主要介紹了java Iterator接口和LIstIterator接口分析的相關(guān)資料,需要的朋友可以參考下2017-05-05

