java開(kāi)發(fā)Dubbo負(fù)載均衡與集群容錯(cuò)示例詳解
負(fù)載均衡與集群容錯(cuò)
Invoker
在Dubbo中Invoker就是一個(gè)具有調(diào)用功能的對(duì)象,在服務(wù)提供端就是實(shí)際的服務(wù)實(shí)現(xiàn),只是將服務(wù)實(shí)現(xiàn)封裝起來(lái)變成一個(gè)Invoker。
在服務(wù)消費(fèi)端,從注冊(cè)中心得到服務(wù)提供者的信息之后,將一條條信息封裝為Invoker,這個(gè)Invoker就具備了遠(yuǎn)程調(diào)用的能力。
綜上,Dubbo就是創(chuàng)建了一個(gè)統(tǒng)一的模型,將可調(diào)用(可執(zhí)行體)的服務(wù)對(duì)象都統(tǒng)一封裝為Invoker。
而ClusterInvoker就是將多個(gè)服務(wù)引入的Invoker封裝起來(lái),對(duì)外統(tǒng)一暴露一個(gè)Invoker,并且賦予這些Invoker集群容錯(cuò)的功能。
服務(wù)目錄
服務(wù)目錄,即Directory,實(shí)際上它就是多個(gè)Invoker的集合,服務(wù)提供端一般都會(huì)集群分布,同樣的服務(wù)會(huì)有多個(gè)提供者,因此需要一個(gè)服務(wù)目錄來(lái)統(tǒng)一存放它們,需要調(diào)用服務(wù)的時(shí)候便從這個(gè)服務(wù)目錄中進(jìn)行挑選。
同時(shí)服務(wù)目錄還是實(shí)現(xiàn)了NotifyListener接口,當(dāng)集群中新增了一臺(tái)服務(wù)提供者或者下線了一臺(tái)服務(wù)提供者,目錄都會(huì)對(duì)服務(wù)提供者進(jìn)行更新,新增或者刪除對(duì)應(yīng)的Invoker。

從上圖中,可以看到用了一個(gè)抽象類AbstractDirectory來(lái)實(shí)現(xiàn) Directory接口,抽象類中運(yùn)用到了模板方法模式,將一些公共方法和邏輯寫好,作為一個(gè)骨架,然后具體實(shí)現(xiàn)由了兩個(gè)子類來(lái)完成,兩個(gè)子類分別為StaticDirectory和RegistryDirectory。
RegistryDirectory
RegistryDirectory實(shí)現(xiàn)了NotifyListener接口,可以監(jiān)聽(tīng)注冊(cè)中心的變化,當(dāng)注冊(cè)中心配置發(fā)生變化時(shí),服務(wù)目錄也可以收到變更通知,然后根據(jù)更新之后的配置刷新Invoker列表。
由此可知RegistryDirectory共有三個(gè)作用:
獲取Invoker列表監(jiān)聽(tīng)注冊(cè)中心刷新Invoker列表
獲取Invoker列表
RegistryDirectory實(shí)現(xiàn)了父類AbstractDirectory的抽象方法doList(),該方法可以得到Invoker列表
public List<Invoker<T>> doList(Invocation invocation) {
if (this.forbidden) {
throw new RpcException(....);
} else {
List<Invoker<T>> invokers = null;
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; //獲取方法調(diào)用名和Invoker的映射表
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
//以下就是根據(jù)方法名和方法參數(shù)獲取可調(diào)用的Invoker
if (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = (List)localMethodInvokerMap.get(methodName + "." + args[0]);
}
if (invokers == null) {
invokers = (List)localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
invokers = (List)localMethodInvokerMap.get("*");
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = (List)iterator.next();
}
}
}
return (List)(invokers == null ? new ArrayList(0) : invokers);
}
}
監(jiān)聽(tīng)注冊(cè)中心
通過(guò)實(shí)現(xiàn)NotifyListener接口可以感知注冊(cè)中心的數(shù)據(jù)變更。
RegistryDirectory定義了三個(gè)集合invokerUrls routerUrls configuratorUrls分別處理對(duì)應(yīng)的配置然后轉(zhuǎn)化成對(duì)象。
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList();
List<URL> routerUrls = new ArrayList();
List<URL> configuratorUrls = new ArrayList();
Iterator i$ = urls.iterator();
while(true) {
while(true) {
while(i$.hasNext()) {
//....根據(jù)urls填充上述三個(gè)列表
}
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls); //根據(jù)urls轉(zhuǎn)化為configurators配置
}
List localConfigurators;
if (routerUrls != null && !routerUrls.isEmpty()) {
localConfigurators = this.toRouters(routerUrls);
if (localConfigurators != null) {
this.setRouters(localConfigurators); //根據(jù)urls轉(zhuǎn)化為routers配置
}
}
localConfigurators = this.configurators;
this.overrideDirectoryUrl = this.directoryUrl;
Configurator configurator;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for(Iterator i$ = localConfigurators.iterator(); i$.hasNext(); this.overrideDirectoryUrl = configurator.configure(this.overrideDirectoryUrl)) {
configurator = (Configurator)i$.next();
}
}
this.refreshInvoker(invokerUrls); //根據(jù)invokerUrls刷新invoker列表
return;
}
}
}
刷新Invoker列表
private void refreshInvoker(List<URL> invokerUrls) {
//如果invokerUrls只有一個(gè)URL并且協(xié)議是empty,那么清除所有invoker
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && "empty".equals(((URL)invokerUrls.get(0)).getProtocol())) {
this.forbidden = true;
this.methodInvokerMap = null;
this.destroyAllInvokers();
} else {
this.forbidden = false;
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; //獲取舊的Invoker列表
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet();
this.cachedInvokerUrls.addAll(invokerUrls);
}
if (invokerUrls.isEmpty()) {
return;
}
//根據(jù)URL生成InvokerMap
Map<String, Invoker<T>> newUrlInvokerMap = this.toInvokers(invokerUrls);
//根據(jù)新的InvokerMap生成方法名和Invoker列表對(duì)應(yīng)的Map
Map<String, List<Invoker<T>>> newMethodInvokerMap = this.toMethodInvokers(newUrlInvokerMap);
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = this.multiGroup ? this.toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
this.destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); //銷毀無(wú)效的Invoker
} catch (Exception var6) {
logger.warn("destroyUnusedInvokers error. ", var6);
}
}
}
上述操作就是根據(jù)invokerUrls數(shù)量以及協(xié)議頭是否為empty來(lái)判斷是否禁用所有invokers,如果不禁用的話將invokerUrls轉(zhuǎn)化為Invoker,并且得到<url,Invoker>的映射關(guān)系。
再進(jìn)一步進(jìn)行轉(zhuǎn)化,得到<methodName,List>的映射關(guān)系,再將同一組的Invoker進(jìn)行合并,將合并結(jié)果賦值給methodInvokerMap,這個(gè)methodInvokerMap就是在doList中使用到的Map。
最后刷新InvokerMap,銷毀無(wú)效的Invoker。
StaticDirectory
StaticDirectory是靜態(tài)目錄,所有Invoker是固定的不會(huì)刪減的,并且所有Invoker由構(gòu)造器來(lái)傳入。
內(nèi)部邏輯也相當(dāng)簡(jiǎn)單,只定義了一個(gè)列表用于存儲(chǔ)Invokers。實(shí)現(xiàn)父類的方法也只是將這些Invokers原封不動(dòng)地返回。
private final List<Invoker<T>> invokers;
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
return this.invokers;
}
服務(wù)路由
服務(wù)路由規(guī)定了服務(wù)消費(fèi)者可以調(diào)用哪些服務(wù)提供者,Dubbo常用的是條件路由ConditionRouter。
條件路由由兩個(gè)條件組成,格式為[服務(wù)消費(fèi)者匹配條件] => [服務(wù)提供者匹配條件],例如172.26.29.15 => 172.27.19.89規(guī)定了只有IP為172.26.29.15的服務(wù)消費(fèi)者才可以訪問(wèn)IP為172.27.19.89的服務(wù)提供者,不可以調(diào)用其他的服務(wù)。
路由一樣是通過(guò)RegistryDirectory中的notify()更新的,在調(diào)用toMethodInvokers()的時(shí)候會(huì)進(jìn)行服務(wù)器級(jí)別的路由和方法級(jí)別的路由。
Cluster
在前面的流程中我們已經(jīng)通過(guò)Directory獲取了服務(wù)目錄,并且通過(guò)路由獲取了一個(gè)或多個(gè)Invoker,但是對(duì)于服務(wù)消費(fèi)者還是需要進(jìn)行選擇,篩選出一個(gè)Invoker進(jìn)行調(diào)用。
Dubbo默認(rèn)的Cluster實(shí)現(xiàn)有多種,如下:
FailoverClusterFailfastClusterFailsafeClusterFailbackClusterBroadcastClusterAvailableCluster
每個(gè)Cluster內(nèi)部返回的都是xxxClusterInvoker,例如FailoverCluster:
public class FailoverCluster implements Cluster {
public static final String NAME = "failover";
public FailoverCluster() {
}
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker(directory);
}
}
FailoverClusterInvoker
FailoverClusterInvoker實(shí)現(xiàn)的功能是失敗調(diào)用(有重試次數(shù))自動(dòng)切換。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
this.checkInvokers(invokers, invocation);
//重試次數(shù)
int len = this.getUrl().getMethodParameter(invocation.getMethodName(), "retries", 2) + 1;
if (len <= 0) {
len = 1;
}
RpcException le = null;
List<Invoker<T>> invoked = new ArrayList(invokers.size());
Set<String> providers = new HashSet(len);
//根據(jù)重試次數(shù)循環(huán)調(diào)用
for(int i = 0; i < len; ++i) {
if (i > 0) {
this.checkWhetherDestroyed();
copyinvokers = this.list(invocation);
this.checkInvokers(copyinvokers, invocation);
}
//負(fù)載均衡篩選出一個(gè)Invoker作本次調(diào)用
Invoker<T> invoker = this.select(loadbalance, invocation, copyinvokers, invoked);
//將使用過(guò)的Invoker保存起來(lái),下次重試時(shí)做過(guò)濾用
invoked.add(invoker);
//記錄到上下文中
RpcContext.getContext().setInvokers(invoked);
try {
//發(fā)起調(diào)用
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("....");
}
Result var12 = result;
return var12;
} catch (RpcException var17) { //catch異常 繼續(xù)下次循環(huán)重試
if (var17.isBiz()) {
throw var17;
}
le = var17;
} catch (Throwable var18) {
le = new RpcException(var18.getMessage(), var18);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(....);
}
上述方法中,首先獲取重試次數(shù)len,根據(jù)重試次數(shù)進(jìn)行循環(huán)調(diào)用,調(diào)用發(fā)生異常會(huì)被catch住,然后重新調(diào)用。
每次循環(huán)會(huì)通過(guò)負(fù)載均衡選出一個(gè)Invoker,然后利用這個(gè)Invoker進(jìn)行遠(yuǎn)程調(diào)用,每次選出的Invoker會(huì)記錄下來(lái),在下次調(diào)用的select()中會(huì)將使用上次調(diào)用的Invoker進(jìn)行重試,如果上一次沒(méi)有調(diào)用或者上次調(diào)用的Invoker下線了,那么會(huì)重新進(jìn)行負(fù)載均衡進(jìn)行選擇。
FailfastClusterInvoker
FailfastClusterInvoker只會(huì)進(jìn)行一次遠(yuǎn)程調(diào)用,如果失敗后立馬拋出異常。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
this.checkInvokers(invokers, invocation);
Invoker invoker = this.select(loadbalance, invocation, invokers, (List)null); //負(fù)載均衡選擇Invoker
try {
return invoker.invoke(invocation); //發(fā)起遠(yuǎn)程調(diào)用
} catch (Throwable var6) { //失敗調(diào)用直接將錯(cuò)誤拋出
if (var6 instanceof RpcException && ((RpcException)var6).isBiz()) {
throw (RpcException)var6;
} else {
throw new RpcException(....);
}
}
}
FailsafeClusterInvoker
FailsafeClusterInvoker是一種安全失敗的cluster,調(diào)用發(fā)生錯(cuò)誤僅僅是記錄一下日志,然后就返回了空結(jié)果。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
this.checkInvokers(invokers, invocation);
//負(fù)載均衡選出Invoker后直接進(jìn)行調(diào)用
Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)null);
return invoker.invoke(invocation);
} catch (Throwable var5) { //調(diào)用錯(cuò)誤只是打印日志
logger.error("Failsafe ignore exception: " + var5.getMessage(), var5);
return new RpcResult();
}
}
FailbackClusterInvoker
FailbackClusterInvoker調(diào)用失敗后,會(huì)記錄下本次調(diào)用,然后返回一個(gè)空結(jié)果給服務(wù)消費(fèi)者,并且會(huì)通過(guò)一個(gè)定時(shí)任務(wù)對(duì)失敗的調(diào)用進(jìn)行重試。適用于執(zhí)行消息通知等最大努力場(chǎng)景。
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
this.checkInvokers(invokers, invocation);
//負(fù)載均衡選出Invoker
Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)null);
//執(zhí)行調(diào)用,執(zhí)行成功返回調(diào)用結(jié)果
return invoker.invoke(invocation);
} catch (Throwable var5) {
//調(diào)用失敗
logger.error("....");
//記錄下本次失敗調(diào)用
this.addFailed(invocation, this);
//返回空結(jié)果
return new RpcResult();
}
}
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
if (this.retryFuture == null) {
synchronized(this) {
//如果未創(chuàng)建重試本次調(diào)用的定時(shí)任務(wù)
if (this.retryFuture == null) {
//創(chuàng)建定時(shí)任務(wù)
this.retryFuture = this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
//定時(shí)進(jìn)行重試
FailbackClusterInvoker.this.retryFailed();
} catch (Throwable var2) {
FailbackClusterInvoker.logger.error("....", var2);
}
}
}, 5000L, 5000L, TimeUnit.MILLISECONDS);
}
}
}
//將invocation和router存入map
this.failed.put(invocation, router);
}
void retryFailed() {
if (this.failed.size() != 0) {
Iterator i$ = (new HashMap(this.failed)).entrySet().iterator();
while(i$.hasNext()) {
Entry<Invocation, AbstractClusterInvoker<?>> entry = (Entry)i$.next();
Invocation invocation = (Invocation)entry.getKey();
Invoker invoker = (Invoker)entry.getValue();
try {
//進(jìn)行重試調(diào)用
invoker.invoke(invocation);
//調(diào)用成功未產(chǎn)生異常則移除本次失敗調(diào)用的記錄,銷毀定時(shí)任務(wù)
this.failed.remove(invocation);
} catch (Throwable var6) {
logger.error("....", var6);
}
}
}
}
邏輯比較簡(jiǎn)單,大致就是當(dāng)調(diào)用錯(cuò)誤時(shí)返回空結(jié)果,并記錄下本次失敗調(diào)用到failed<invocation,router>中,并且會(huì)創(chuàng)建一個(gè)定時(shí)任務(wù)定時(shí)地去調(diào)用failed中記錄的失敗調(diào)用,如果調(diào)用成功了就從failed中移除這個(gè)調(diào)用。
ForkingClusterInvoker
ForkingClusterInvoker運(yùn)行時(shí),會(huì)將所有Invoker都放入線程池中并發(fā)調(diào)用,只要有一個(gè)Invoker調(diào)用成功了就返回結(jié)果,doInvoker方法立即停止運(yùn)行。
適用于對(duì)實(shí)時(shí)性比較高的讀寫操作。
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
Result var19;
try {
this.checkInvokers(invokers, invocation);
int forks = this.getUrl().getParameter("forks", 2);
int timeout = this.getUrl().getParameter("timeout", 1000);
final Object selected;
if (forks > 0 && forks < invokers.size()) {
selected = new ArrayList();
for(int i = 0; i < forks; ++i) {
Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)selected);
if (!((List)selected).contains(invoker)) {
//選擇好的Invoker放入這個(gè)selected列表
((List)selected).add(invoker);
}
}
} else {
selected = invokers;
}
RpcContext.getContext().setInvokers((List)selected);
final AtomicInteger count = new AtomicInteger();
//阻塞隊(duì)列
final BlockingQueue<Object> ref = new LinkedBlockingQueue();
Iterator i$ = ((List)selected).iterator();
while(i$.hasNext()) {
final Invoker<T> invoker = (Invoker)i$.next();
this.executor.execute(new Runnable() {
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable var3) {
int value = count.incrementAndGet();
if (value >= ((List)selected).size()) { //等待所有調(diào)用都產(chǎn)生異常才入隊(duì)
ref.offer(var3);
}
}
}
});
}
try {
//阻塞獲取結(jié)果
Object ret = ref.poll((long)timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable)ret;
throw new RpcException(....);
}
var19 = (Result)ret;
} catch (InterruptedException var14) {
throw new RpcException(....);
}
} finally {
RpcContext.getContext().clearAttachments();
}
return var19;
}
BroadcastClusterInvoker
BroadcastClusterInvoker運(yùn)行時(shí)會(huì)將所有Invoker逐個(gè)調(diào)用,在最后判斷中如果有一個(gè)調(diào)用產(chǎn)生錯(cuò)誤,則拋出異常。
適用于通知所有提供者更新緩存或日志等本地資源的場(chǎng)景。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
this.checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers(invokers);
RpcException exception = null;
Result result = null;
Iterator i$ = invokers.iterator();
while(i$.hasNext()) {
Invoker invoker = (Invoker)i$.next();
try {
result = invoker.invoke(invocation);
} catch (RpcException var9) {
exception = var9;
logger.warn(var9.getMessage(), var9);
} catch (Throwable var10) {
exception = new RpcException(var10.getMessage(), var10);
logger.warn(var10.getMessage(), var10);
}
}
//如果調(diào)用過(guò)程中發(fā)生過(guò)錯(cuò)誤 拋出異常
if (exception != null) {
throw exception;
} else {
//返回調(diào)用結(jié)果
return result;
}
}
AbstractClusterInvoker
AbstractClusterInvoker是上述所有類的父類,內(nèi)部結(jié)構(gòu)較為簡(jiǎn)單。AvailableCluster內(nèi)部返回結(jié)果就是AvailableClusterInvoker。
public class AvailableCluster implements Cluster {
public static final String NAME = "available";
public AvailableCluster() {
}
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new AbstractClusterInvoker<T>(directory) {
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
Iterator i$ = invokers.iterator();
Invoker invoker;
do { //循環(huán)判斷:哪個(gè)invoker能用就調(diào)用哪個(gè)
if (!i$.hasNext()) {
throw new RpcException("No provider available in " + invokers);
}
invoker = (Invoker)i$.next();
} while(!invoker.isAvailable());
return invoker.invoke(invocation);
}
};
}
}
小結(jié)
上述中有很多種集群的實(shí)現(xiàn),各適用于不同的場(chǎng)景,加了Cluster這個(gè)中間層,向服務(wù)消費(fèi)者屏蔽了集群調(diào)用的細(xì)節(jié),并且支持不同場(chǎng)景使用不同的模式。
負(fù)載均衡
Dubbo中的負(fù)載均衡,即LoadBalance,服務(wù)提供者一般都是集群分布,所以需要Dubbo選擇出合適的服務(wù)提供者來(lái)給服務(wù)消費(fèi)者調(diào)用。
Dubbo中提供了多種負(fù)載均衡算法:
RandomLoadBalanceLeastActiveLoadBalanceConsistentHashLoadBalanceRoundRobinLoadBalance
AbstractLoadBalance
實(shí)現(xiàn)類都繼承了于這個(gè)類,該類實(shí)現(xiàn)了LoadBalance,使用模板方法模式,將一些公用的邏輯封裝好,而具體的實(shí)現(xiàn)由子類自定義。
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers != null && !invokers.isEmpty()) {
//子類實(shí)現(xiàn)
return invokers.size() == 1 ? (Invoker)invokers.get(0) : this.doSelect(invokers, url, invocation);
} else {
return null;
}
}
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> var1, URL var2, Invocation var3);
服務(wù)剛啟動(dòng)需要預(yù)熱,不能突然讓服務(wù)負(fù)載過(guò)高,需要進(jìn)行服務(wù)的降權(quán)。
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), "weight", 100); //獲得權(quán)重
if (weight > 0) {
long timestamp = invoker.getUrl().getParameter("remote.timestamp", 0L); //啟動(dòng)時(shí)間
if (timestamp > 0L) {
int uptime = (int)(System.currentTimeMillis() - timestamp); //計(jì)算已啟動(dòng)時(shí)長(zhǎng)
int warmup = invoker.getUrl().getParameter("warmup", 600000);
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight); //降權(quán)
}
}
}
return weight;
}
RandomLoadBalance
使用了加權(quán)隨機(jī)算法,假設(shè)現(xiàn)在有三個(gè)節(jié)點(diǎn)A,B,C,然后賦予這幾個(gè)節(jié)點(diǎn)一定權(quán)重,分別為1,2,3,那么可計(jì)算得到總權(quán)重為6,那么這幾個(gè)節(jié)點(diǎn)被訪問(wèn)的可能性分別為1/6,2/6,3/6。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); //Invoker個(gè)數(shù)
int totalWeight = 0; //總權(quán)重
boolean sameWeight = true; //權(quán)重是否相同
int offset;
int i;
for(offset = 0; offset < length; ++offset) {
i = this.getWeight((Invoker)invokers.get(offset), invocation); //得到權(quán)重
totalWeight += i; //計(jì)算總權(quán)重
//是否權(quán)重都相同
if (sameWeight && offset > 0 && i != this.getWeight((Invoker)invokers.get(offset - 1), invocation)) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
offset = this.random.nextInt(totalWeight); //獲得隨機(jī)偏移量
//判斷偏移量落在哪個(gè)片段上
for(i = 0; i < length; ++i) {
offset -= this.getWeight((Invoker)invokers.get(i), invocation);
if (offset < 0) {
return (Invoker)invokers.get(i);
}
}
}
return (Invoker)invokers.get(this.random.nextInt(length));
}
LeastActiveLoadBalance
最少活躍數(shù)負(fù)載均衡,接收一個(gè)請(qǐng)求后,請(qǐng)求活躍數(shù)+1,處理完一個(gè)請(qǐng)求后,請(qǐng)求活躍數(shù)-1,請(qǐng)求活躍數(shù)少既說(shuō)明現(xiàn)在服務(wù)器壓力小也說(shuō)明該服務(wù)器處理請(qǐng)求快,沒(méi)有堆積什么請(qǐng)求。
總的流程是先遍歷Invokers列表,尋找當(dāng)前請(qǐng)求活躍數(shù)最少的Invoker,如果有多個(gè)Invoker具有相同的最小請(qǐng)求活躍數(shù),則根據(jù)他們的權(quán)重來(lái)進(jìn)行篩選。
ConsistentHashLoadBalance

將服務(wù)器的IP等信息生成一個(gè)Hash值,將這個(gè)值映射到Hash圓環(huán)上作為某個(gè)節(jié)點(diǎn),當(dāng)查找節(jié)點(diǎn)時(shí),通過(guò)一個(gè)Key來(lái)順時(shí)針查找。
Dubbo還引入了160個(gè)虛擬節(jié)點(diǎn),使得數(shù)據(jù)更加分散,避免請(qǐng)求積壓在某個(gè)節(jié)點(diǎn)上。
并且Hash值是方法級(jí)別的,一個(gè)服務(wù)的每個(gè)方法都有一個(gè)ConsistentHashSelector,根據(jù)參數(shù)值來(lái)計(jì)算得出Hash值,
RoundRobinLoadBalance
加權(quán)輪詢負(fù)載均衡,這種輪詢是平滑的,假設(shè)A和B的權(quán)重為10:30,那么輪詢的結(jié)果可能是A、B、B、A、A、B、B、B…,40次調(diào)用下來(lái)A調(diào)用了10次,B調(diào)用了30次。
總結(jié)

服務(wù)引入時(shí),會(huì)將多個(gè)遠(yuǎn)程調(diào)用塞入Directory,然后通過(guò)Cluster來(lái)封裝,同時(shí)根據(jù)需要提供各種容錯(cuò)功能,最終統(tǒng)一暴露一個(gè)Invoker給服務(wù)消費(fèi)者,服務(wù)消費(fèi)者調(diào)用的時(shí)候會(huì)從目錄得到Invoker列表,經(jīng)過(guò)路由的過(guò)濾以及負(fù)載均衡最終得到一個(gè)Invoker發(fā)起調(diào)用。
以上就是java開(kāi)發(fā)Dubbo負(fù)載均衡與集群容錯(cuò)示例詳解的詳細(xì)內(nèi)容,如有不足或錯(cuò)誤歡迎指正。
更多關(guān)于Dubbo負(fù)載均衡與集群容錯(cuò)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- 使用Java實(shí)現(xiàn)6種常見(jiàn)負(fù)載均衡算法
- Java實(shí)現(xiàn)5種負(fù)載均衡算法(小結(jié))
- Java負(fù)載均衡算法實(shí)現(xiàn)之輪詢和加權(quán)輪詢
- 使用Java實(shí)現(xiàn)5種負(fù)載均衡算法實(shí)例
- Java Grpc實(shí)例創(chuàng)建負(fù)載均衡詳解
- 詳解Java實(shí)現(xiàn)負(fù)載均衡的幾種算法代碼
- Java?Ribbon與openfeign區(qū)別和用法講解
- Java中的服務(wù)發(fā)現(xiàn)與負(fù)載均衡及Eureka與Ribbon的應(yīng)用小結(jié)
相關(guān)文章
SpringBoot數(shù)據(jù)訪問(wèn)的實(shí)現(xiàn)
本文主要介紹了SpringBoot數(shù)據(jù)訪問(wèn)的實(shí)現(xiàn),引入各種xxxTemplate,xxxRepository來(lái)簡(jiǎn)化我們對(duì)數(shù)據(jù)訪問(wèn)層的操作,感興趣的可以了解一下2023-11-11
解決在Gradle/IDEA中無(wú)法正常使用readLine的問(wèn)題原因
這篇文章主要介紹了在Gradle/IDEA中無(wú)法正常使用readLine的解決方法,原因是由于Gradle的標(biāo)準(zhǔn)輸入默認(rèn)并不與系統(tǒng)標(biāo)準(zhǔn)輸入綁定,需手動(dòng)設(shè)置,需要的朋友可以參考下2021-12-12
spring cloud hystrix 超時(shí)時(shí)間使用方式詳解
這篇文章主要介紹了spring cloud hystrix 超時(shí)時(shí)間使用方式,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01
JAVA通過(guò)HttpClient發(fā)送HTTP請(qǐng)求的方法示例
本篇文章主要介紹了JAVA通過(guò)HttpClient發(fā)送HTTP請(qǐng)求的方法示例,詳細(xì)的介紹了HttpClient使用,具有一定的參考價(jià)值,有興趣的可以了解一下2017-09-09
Lombok中@Builder和@SuperBuilder注解的用法案例
@Builder?是?lombok?中的注解,可以使用builder()構(gòu)造的Person.PersonBuilder對(duì)象進(jìn)行鏈?zhǔn)秸{(diào)用,給所有屬性依次賦值,這篇文章主要介紹了Lombok中@Builder和@SuperBuilder注解的用法,需要的朋友可以參考下2023-01-01
Java項(xiàng)目導(dǎo)入IDEA的流程配置以及常見(jiàn)問(wèn)題解決方法
通常一個(gè)團(tuán)隊(duì)中可能有人用eclipse,有人用intelliJ,那么經(jīng)常會(huì)出現(xiàn)需要導(dǎo)入別人用eclipse建好的web項(xiàng)目,下面這篇文章主要給大家介紹了關(guān)于Java項(xiàng)目導(dǎo)入IDEA的流程配置以及常見(jiàn)問(wèn)題解決方法的相關(guān)資料,需要的朋友可以參考下2023-05-05
Java Filter 過(guò)濾器詳細(xì)介紹及實(shí)例代碼
Filter也稱之為過(guò)濾器,它是Servlet技術(shù)中最實(shí)用的技術(shù),本文章WEB開(kāi)發(fā)人員通過(guò)Filter技術(shù),對(duì)web服務(wù)器管理的所有web資源進(jìn)行攔截,從而實(shí)現(xiàn)一些特殊的功能,本文章將向大家介紹Java 中的 Filter 過(guò)濾器,需要的朋友可以參考一下2016-12-12

