dubbo如何實(shí)現(xiàn)consumer從多個(gè)group中調(diào)用指定group的provider
背景
在工作中,遇到這樣的場(chǎng)景:
有個(gè)es索引構(gòu)建服務(wù),需要從各個(gè)業(yè)務(wù)服務(wù)獲取索引的信息,從而構(gòu)建索引,業(yè)務(wù)服務(wù)都實(shí)現(xiàn)同一個(gè)接口——IndexInfoProvider,通過(guò)設(shè)置不同的group來(lái)達(dá)到區(qū)分的效果(group就是es索引名)。
索引構(gòu)建服務(wù)在內(nèi)存維護(hù)了一個(gè)Map<String, IndexInfoProvider> providerMap,key是索引名——也就是provider的group,value是IndexInfoProvider服務(wù)的consumer。
為了圖方便,索引構(gòu)建服務(wù)還一次性初始化了所有consumer,假設(shè)總共有200個(gè)分組,那么Map就會(huì)緩存200個(gè)consumer,初始化的時(shí)候巨慢。如果不一次性初始化,而是按需的話,代碼會(huì)變得復(fù)雜一些,可能需要像雙重檢驗(yàn)這樣的措施。而且因?yàn)槭蔷彺?,必然也?huì)遇到緩存一致性的問(wèn)題,例如新增一個(gè)索引。
基于這樣的問(wèn)題,就想著,dubbo的一個(gè)consumer,能不能按需調(diào)用指定分組的provider呢~
過(guò)程
為什么必須緩存那么多consumer
為什么有200個(gè)分組,就要緩存200個(gè)consumer,而不是像我們平時(shí)那樣,1個(gè)就可以呢?

業(yè)務(wù)服務(wù)在實(shí)現(xiàn)IndexInfoProvider接口的時(shí)候,都指定是分組,而且分組名,就是對(duì)應(yīng)的索引名。
相應(yīng)的,消費(fèi)者就必須指定消費(fèi)的分組,200個(gè)索引,自然就需要200個(gè)consumer。
consumer只能調(diào)用一個(gè)group的provider么
在我們?nèi)粘5氖褂弥校琧onsumer基本都是只能調(diào)用一個(gè)group的provider。但實(shí)際上,dubbo是支持調(diào)用多個(gè)group的provider的。

這樣寫是指定a分組和b分組,多個(gè)分組之間用英文逗號(hào)分隔

這樣寫,是所有的分組
group="*"能代替Map<String, IndexInfoProvider>么
如果將es索引構(gòu)建服務(wù)的IndexInfoProvider接口的consumer分組設(shè)置為group="*",然后在調(diào)用接口的時(shí)候,根據(jù)需要,從一堆provider中篩選出指定group的provider,只調(diào)用這些provider,是不是就可以代替Map緩存了?
初步方案:
- 利用ThreadLocal來(lái)指定要調(diào)用的分組,在調(diào)用方法前設(shè)置group到ThreadLocal中。
- 實(shí)現(xiàn)dubbo的負(fù)載均衡拓展,只選取指定分組的節(jié)點(diǎn)來(lái)調(diào)用。
- 調(diào)用結(jié)束,清理ThreadLocal中的分組信息。
似乎是可行的?
理想很美好,現(xiàn)實(shí)很骨感
通過(guò)源碼和代碼debug發(fā)現(xiàn),consumer持有的Invoker,有點(diǎn)像一個(gè)責(zé)任鏈。
如果是單分組——!group.contains(",") && !group.equals("*"),那么會(huì)是這樣:
MockClusterInvoker->FailoverClusterInvoker
如果是多分組,則會(huì)變成:
MockClusterInvoker->MergeableClusterInvoker->FailoverClusterInvoker
負(fù)載均衡的機(jī)制,是得到了FailoverClusterInvoker才會(huì)生效
MockClusterInvoker只是一種降級(jí)機(jī)制,不是導(dǎo)致問(wèn)題的原因
問(wèn)題是出在MergeableClusterInvoker,下面是導(dǎo)致問(wèn)題的代碼?。。?/p>

代碼大概的意思是:消費(fèi)者沒(méi)有指定合并策略,那么就會(huì)調(diào)用第一個(gè)有效的服務(wù)提供者,如果都是無(wú)效,就直接調(diào)用第一個(gè)。
我是沒(méi)有指定合并策略的,所以會(huì)變成調(diào)用第一個(gè)有效的服務(wù)提供者。根本走不到負(fù)載均衡那里。
不指定合并策略不行,那指定呢?指定合并策略會(huì)怎樣呢?


指定了合并策略,會(huì)調(diào)用所有invoker,然后用Merger合并結(jié)果。
這種很適合這樣種場(chǎng)景:
一部分?jǐn)?shù)據(jù)在服務(wù)A,一部分?jǐn)?shù)據(jù)在服務(wù)B,需要分別調(diào)用A和B,然后把兩者的結(jié)果集合并
設(shè)想著這樣的方案:
- 指定了合并策略,那么所有分組的invoker都會(huì)被調(diào)用到,那么請(qǐng)求就到FailoverClusterInvoker了,負(fù)載均衡spi就生效了
- 修改一下負(fù)載均衡spi,如果目前調(diào)用的invoker不是指定分組的,那么就直接返回null
- 實(shí)現(xiàn)自己的合并spi,返回第一個(gè)不會(huì)null的結(jié)果
雖然不是什么優(yōu)雅的方式,但是,似乎是能做到的???
實(shí)際上,行不通,達(dá)不到想要的效果。原因是MergeableClusterInvoker內(nèi)部是用線程池并發(fā)調(diào)用的,ThreadLocal里的分組信息會(huì)丟失。

還有機(jī)會(huì)么
MergeableClusterInvoker類是沒(méi)有留拓展的余地啦,還有其他機(jī)會(huì)么?MergeableClusterInvoker的Invokers列表從哪里來(lái)的?

Invokers是從directory來(lái)的,這里有沒(méi)有拓展的余地呢?
有的,在AbstractDirectory的list方法

看到這,發(fā)現(xiàn)用路由spi,還是有機(jī)會(huì)的,如果實(shí)現(xiàn)動(dòng)態(tài)路由,每次只給MergeableClusterInvoker返回指定分組的Invoker,是不是就可以呢?怎么讓routers包含我們自定義的路由spi呢?

需要url上攜帶了router參數(shù),但是這里的url的參數(shù)是由誰(shuí)決定的?@Reference 注解是沒(méi)有沒(méi)有的指定router參數(shù)的…
最后debuig,兜兜轉(zhuǎn)轉(zhuǎn),發(fā)現(xiàn)只能靠重寫ReferenceConfig類的loadRegistries方法,往url上加上動(dòng)態(tài)路由的參數(shù)。

成果
AssignGroupRouterFactory
public class AssignGroupRouterFactory implements RouterFactory {
public static final String NAME = "assignGroup";
@Override
public Router getRouter(URL url) {
return new AssignGroupRouter(url);
}
}
AssignGroupRouter
public class AssignGroupRouter implements Router {
private final URL url;
public AssignGroupRouter(URL url) {
this.url = url;
}
@Override
public URL getUrl() {
return url;
}
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
String assignGroup = IndexInfoProviderInvoker.getAssignGroup();
if (Objects.isNull(assignGroup)) {
return invokers;
}
return invokers.stream()
.filter(invoker -> {
URL invokerUrl = invoker.getUrl();
return Objects.equals(assignGroup, invokerUrl.getParameter(Constants.GROUP_KEY));
}).collect(Collectors.toList());
}
@Override
public int compareTo(Router o) {
return 1;
}
}
AssignGroupReferenceConfig
public class AssignGroupReferenceConfig<T> extends ReferenceConfig<T> {
@Override
protected List<URL> loadRegistries(boolean provider) {
List<URL> urls = super.loadRegistries(provider);
if (CollectionUtils.isEmpty(urls)) {
return urls;
}
// 指定動(dòng)態(tài)路由,路由方式為assignGroup
return urls.stream()
.map(url -> url.addParameter(Constants.ROUTER_KEY, "assignGroup").addParameter(Constants.RUNTIME_KEY,
"true"))
.collect(Collectors.toList());
}
}
IndexInfoProviderInvokerProxy
@Component
public class IndexInfoProviderInvokerProxy {
public static final ThreadLocal<String> INDEX_TYPE_THREAD_LOCAL = new ThreadLocal<>();
private final IndexInfoProvider indexInfoProvider;
public IndexInfoProviderInvokerProxy(IndexInfoProvider indexInfoProvider) {
this.indexInfoProvider = indexInfoProvider;
}
/**
* 獲取文檔id 在[start, end) 之間的所有索引文檔
*
* @param indexType 索引名
* @param start 起始id
* @param end 結(jié)束id
* @return 文檔列表
*/
public List<IndexDocument> getDocsByIdRange(String indexType, long start, long end) {
return invokeWitchIndexType(indexType, () -> indexInfoProvider.getDocsByIdRange(start, end));
}
private <T> T invokeWitchIndexType(String indexType, Supplier<T> supplier) {
INDEX_TYPE_THREAD_LOCAL.set(indexType);
T result = supplier.get();
INDEX_TYPE_THREAD_LOCAL.remove();
return result;
}
}
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot實(shí)現(xiàn)國(guó)際化過(guò)程詳解
這篇文章主要介紹了SpringBoot實(shí)現(xiàn)國(guó)際化過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
SpringBoot+Redis實(shí)現(xiàn)查找附近用戶的示例代碼
SpringDataRedis提供了十分簡(jiǎn)單的地理位置定位的功能,本文主要介紹了SpringBoot+Redis實(shí)現(xiàn)查找附近用戶的示例代碼,具有一定的參考價(jià)值,感興趣的可以了解一下2024-02-02
mybaties plus實(shí)體類設(shè)置typeHandler不生效的解決
這篇文章主要介紹了mybaties plus實(shí)體類設(shè)置typeHandler不生效的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-08-08
Java中的服務(wù)發(fā)現(xiàn)與負(fù)載均衡及Eureka與Ribbon的應(yīng)用小結(jié)
這篇文章主要介紹了Java中的服務(wù)發(fā)現(xiàn)與負(fù)載均衡:Eureka與Ribbon的應(yīng)用,通過(guò)使用Eureka和Ribbon,我們可以在Java項(xiàng)目中實(shí)現(xiàn)高效的服務(wù)發(fā)現(xiàn)和負(fù)載均衡,需要的朋友可以參考下2024-08-08
JAVA?biginteger類bigdecimal類的使用示例學(xué)習(xí)
這篇文章主要為大家介紹了JAVA?biginteger類bigdecimal類的使用示例學(xué)習(xí),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07
"Method?Not?Allowed"405問(wèn)題分析以及解決方法
項(xiàng)目中在提交表單時(shí),提示“HTTP 405”錯(cuò)誤——“Method Not Allowed”這里顯示的是,方法不被允許,下面這篇文章主要給大家介紹了關(guān)于"Method?Not?Allowed"405問(wèn)題分析以及解決方法的相關(guān)資料,需要的朋友可以參考下2022-10-10

