RocketMQ中的NameServer詳細(xì)解析
前言
NameServer是一個(gè)非常簡單的Topic路由注冊(cè)中心,支持Broker的動(dòng)態(tài)注冊(cè)與發(fā)現(xiàn)。
Producer和Conumser通過NameServer可以知道整個(gè)Broker集群的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。
NameServer各實(shí)例間相互不進(jìn)行信息通訊,因此不能保證NameServer的一致性(Consistency),可以保證可用性(Availability)。
即選擇了CAP中的AP。NameServer只能保證最終一致性,關(guān)于怎么保證最終一致性后文再講。
現(xiàn)在先從NameServer的啟動(dòng)開始。
NameServer為namesrv模塊

NamesrvStartup
public static NamesrvController main0(String[] args) {
//構(gòu)造NamesrvController
NamesrvController controller = createNamesrvController(args);
start(controller);
return controller;
}
public static NamesrvController start(final NamesrvController controller) throws Exception {
//初始化
boolean initResult = controller.initialize();
//啟動(dòng)
controller.start();
return controller;
}
NamesrvStartup作為NameServer的啟動(dòng)類,主要做了三件事:
- 構(gòu)造NamesrvController(NameServer控制器)
- 加載初始化,由NamesrvController負(fù)責(zé)
- 啟動(dòng)remotingServer,開啟Netty服務(wù)
NamesrvController
public class NamesrvController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//nameSrv的配置
private final NamesrvConfig namesrvConfig;
//netty的配置
private final NettyServerConfig nettyServerConfig;
//執(zhí)行單線程的任務(wù)調(diào)度,自定義編程名稱
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
//kv的配置管理
private final KVConfigManager kvConfigManager;
//路由管理器,有broker的ip和隊(duì)列信息,producer發(fā)送的queue信息,consumer的pull的queue信息
private final RouteInfoManager routeInfoManager;
//namesrv的netty的服務(wù)端實(shí)現(xiàn)
private RemotingServer remotingServer;
//處理接受到請(qǐng)求事件的回調(diào)監(jiān)聽服務(wù),主要處理netty的事件
private BrokerHousekeepingService brokerHousekeepingService;
private ExecutorService remotingExecutor;
private Configuration configuration;
private FileWatchService fileWatchService;
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
//構(gòu)造kv配置的管理
this.kvConfigManager = new KVConfigManager(this);
//構(gòu)造路由信息管理
this.routeInfoManager = new RouteInfoManager();
//構(gòu)造網(wǎng)絡(luò)連接事件管理
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
//配置
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}NamesrvController作為一個(gè)控制器主要負(fù)責(zé)NettyServer的創(chuàng)建,注冊(cè)requestProcessor,啟動(dòng)NettyServer及各種task
public boolean initialize() {
//加載kv配置
this.kvConfigManager.load();
//初始化NettyServer
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//執(zhí)行器,用于接受請(qǐng)求并進(jìn)行處理
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//注冊(cè)Netty的處理器,即remotingExecutor
this.registerProcessor();
//進(jìn)行掃描未活躍的Broker的任務(wù)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
//...省略其他代碼
return true;
}
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
//注冊(cè)request處理器
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
public void start() throws Exception {
//啟動(dòng)netty server
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}NettyRequestProcessor
NettyRequestProcessor為請(qǐng)求處理器,上一步注冊(cè)的處理器,一般使用默認(rèn)的處理器DefaultRequestProcessor,processRequest方法處理請(qǐng)求。
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
//...
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
//...省略其他代碼
}
return null;
}到此這篇關(guān)于RocketMQ中的NameServer詳細(xì)解析的文章就介紹到這了,更多相關(guān)RocketMQ中的NameServer內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java多線程批量處理百萬級(jí)的數(shù)據(jù)方法示例
這篇文章主要介紹了java多線程批量處理百萬級(jí)的數(shù)據(jù)的相關(guān)資料,文中通過代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用java多線程具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2025-02-02
淺談java中Math.random()與java.util.random()的區(qū)別
下面小編就為大家?guī)硪黄獪\談java中Math.random()與java.util.random()的區(qū)別。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-09-09
POI XSSFSheet shiftRows bug問題解決
這篇文章主要介紹了POI XSSFSheet shiftRows bug問題解決,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-07-07
通過代理類實(shí)現(xiàn)java連接數(shù)據(jù)庫(使用dao層操作數(shù)據(jù))實(shí)例分享
java通過代理類實(shí)現(xiàn)數(shù)據(jù)庫DAO操作代碼分享,大家參考使用吧2013-12-12
spring5 SAXParseException:cvc-elt.1: 找不到元素“beans 的聲明詳解
這篇文章主要給大家介紹了關(guān)于spring5 SAXParseException:cvc-elt.1: 找不到元素“beans 聲明的相關(guān)資料,需要的朋友可以參考下2020-08-08
Spring Boot mybatis-config 和 log4j 輸出sql 日志的方式
這篇文章主要介紹了Spring Boot mybatis-config 和 log4j 輸出sql 日志的方式,本文通過實(shí)例圖文相結(jié)合給大家介紹的非常詳細(xì),需要的朋友可以參考下2021-07-07
RabbitMQ的Direct Exchange模式實(shí)現(xiàn)的消息發(fā)布案例(示例代碼)
本文介紹了RabbitMQ的DirectExchange模式下的消息發(fā)布和消費(fèi)的實(shí)現(xiàn),詳細(xì)說明了如何在DirectExchange模式中進(jìn)行消息的發(fā)送和接收,以及消息處理的基本方法,感興趣的朋友跟隨小編一起看看吧2024-09-09

