java客戶端Etcd官方倉庫jetcd中KeepAlive接口實現(xiàn)
前言
Etcd的Java客戶端有很多開源實現(xiàn),Jetcd是Etcd官方倉庫的Java客戶端,整體api接口設計實現(xiàn)和官方go客戶端類似,簡潔易用。其中,租期續(xù)約的接口提供了兩個分別是keepAliveOnce和keepAlive。功能如其名,keepAliveOnce是單次續(xù)約的接口,如果要保持租約,需要手動觸發(fā)這個接口,所以這個接口基本不用。而keepAlive是自動續(xù)約?;畹慕涌?。大多數(shù)場景下,使用keepAlive即可,但是針對不同的場景,我們還需要考慮幾個問題,如租約ttl的設置,以及keepAlive異常時的處理。
Jetcd項目地址:https://github.com/etcd-io/jetcd
背景問題
我們有一個基于mysql的binlog訂閱數(shù)據(jù)變更的應用,線上有非常重要的應用基于這個服務,因為存在單點故障,后面使用了jetcd的lock + keepAlive的機制實現(xiàn)了主備服務秒級切換的功能,具體參見《高可用架構etcd選主故障主備秒級切換實現(xiàn)》,系統(tǒng)上線運行后發(fā)現(xiàn),binlog的服務經(jīng)常切換發(fā)生主備切換,而實際情況是,binlog的服務非常穩(wěn)定,在沒有上線主備切換服務前,從來沒有發(fā)生過線上binlog服務宕掉的情況。最后查明問題出在了租約TTL的設置上面。這里先拋出問題和定位,下面先看下Jetcd的keepAlive具體實現(xiàn),然后在分析為什么導致這個問題。
KeepAlive實現(xiàn)
先看下keepAlive的用法
private long acquireActiveLease() throws InterruptedException, ExecutionException {
long leaseId = leaseClient.grant(leaseTTL).get().getID();
logger.debug("LeaderSelector get leaseId:[{}] and ttl:[{}]", leaseId, leaseTTL);
this.leaseCloser = leaseClient.keepAlive(leaseId, new StreamObserver() {
@Override
public void onNext(LeaseKeepAliveResponse value) {
logger.debug("LeaderSelector lease keeps alive for [{}]s:", value.getTTL());
}
@Override
public void onError(Throwable t) {
logger.debug("LeaderSelector lease renewal Exception!", t.fillInStackTrace());
cancelTask();
}
@Override
public void onCompleted() {
logger.info("LeaderSelector lease renewal completed! start canceling task.");
cancelTask();
}
});
return leaseId;
}租約實現(xiàn)都在LeaseImpl類里,通過EtcdClient拿到LeaseImpl實例后,首先通過grant方法設置ttl拿到租約的id,然后將租約作為入?yún)⒄{用keepAlive方法,第二個入?yún)⑹且粋€觀察者對象,內(nèi)置了三個接口,分別是onNext:確定下一次租約續(xù)約時間后觸發(fā),onError:續(xù)約異常時觸發(fā),onCompleted:租約過期后觸發(fā)。
keepAlive方法代碼:
public synchronized CloseableClient keepAlive(long leaseId, StreamObserverobserver) {
if (this.closed) {
throw newClosedLeaseClientException();
}
KeepAlive keepAlive = this.keepAlives.computeIfAbsent(leaseId, (key) -> new KeepAlive(leaseId));
keepAlive.addObserver(observer);
if (!this.hasKeepAliveServiceStarted) {
this.hasKeepAliveServiceStarted = true;
this.start();
}
return new CloseableClient() {
@Override
public void close() {
keepAlive.removeObserver(observer);
}
};
}LeaseImpl內(nèi)部維護了一個以LeaseId為key,KeepAlive對象為value的map,KeepAlive的類中維護了一個StreamObserver集合,到期時間deadLine,下次續(xù)約時間nextKeepAlive和續(xù)約leaseId。第一次調用keepAlive方法時會觸發(fā)start,啟動續(xù)約的線程(sendKeepAliveExecutor())和檢查是否過期的線程(deadLineExecutor())。
private void sendKeepAliveExecutor() {
this.keepAliveResponseObserver = Observers.observer(
response -> processKeepAliveResponse(response),
error -> processOnError()
);
this.keepAliveRequestObserver = this.leaseStub.leaseKeepAlive(this.keepAliveResponseObserver);
this.keepAliveFuture = scheduledExecutorService.scheduleAtFixedRate(
() -> {
// send keep alive req to the leases whose next keep alive is before now.
this.keepAlives.entrySet().stream()
.filter(entry -> entry.getValue().getNextKeepAlive() < System.currentTimeMillis())
.map(Entry::getKey)
.map(leaseId -> LeaseKeepAliveRequest.newBuilder().setID(leaseId).build())
.forEach(keepAliveRequestObserver::onNext);
},
0,
500,
TimeUnit.MILLISECONDS
);
}sendKeepAliveExecutor方法是整個keepAlive功能實現(xiàn)的核心,這個方法在LeaseImpl實例里只會被觸發(fā)一次,開啟了一個時間間隔為500毫秒的的定時任務調度。每次從keepAlives中篩選出nextkeepAlive時間小于當前時間的KeepAlive對象,觸發(fā)續(xù)約。
nextkeepAlive初始化值就是創(chuàng)建KeepAlive實例時的當前時間,然后在續(xù)約的響應流觀察者實例中,執(zhí)行了processKeepAliveResponse方法,在這個里面維護了KeepAlive對象的nextkeepAlive。
private synchronized void processKeepAliveResponse(io.etcd.jetcd.api.LeaseKeepAliveResponse leaseKeepAliveResponse) {
if (this.closed) {
return;
}
final long leaseID = leaseKeepAliveResponse.getID();
final long ttl = leaseKeepAliveResponse.getTTL();
final KeepAlive ka = this.keepAlives.get(leaseID);
if (ka == null) {
// return if the corresponding keep alive has closed.
return;
}
if (ttl > 0) {
long nextKeepAlive = System.currentTimeMillis() + ttl * 1000 / 3;
ka.setNextKeepAlive(nextKeepAlive);
ka.setDeadLine(System.currentTimeMillis() + ttl * 1000);
ka.onNext(leaseKeepAliveResponse);
} else {
// lease expired; close all keep alive
this.removeKeepAlive(leaseID);
ka.onError(
newEtcdException(
ErrorCode.NOT_FOUND,
"etcdserver: requested lease not found"
)
);
}
}可以看到,在首次續(xù)約后的響應處理中,nextKeepAlive被設置為當前時間加上ttl的1/3時間后,也就是說如果我們設置一個key的過期時間為6s,那么在使用keepAlive時續(xù)期的時間間隔為,每2s執(zhí)行續(xù)約一次。如果ttl小于零,說明key已經(jīng)過期被刪除了,就直接觸發(fā)onError,傳遞了一個requested lease not found的異常對象。
文末小結
1、回到最上面binlog的主備頻繁切換的問題,由于我們將ttl的時間設置的過小5s。只要client和etcd 服務失聯(lián)5s以上,期間可能由于各種原因導致keepAlive沒有正常續(xù)約上,就會觸發(fā)主備切換。這個時候binlog服務本身是沒有任何問題的,卻要因為失去領導權,而選擇自殺。后面將ttl調整到了20s后,主備切換就沒有那么敏感了。
2、還有一個場景,在將etcd作為服務注冊中心時,也會使用到keepAlive,即使設置了ttl為20s,還是有可能沒有續(xù)約上,導致注冊的服務過期被刪了,這個時候,我們的服務進程還是健康的。這個場景下,需要在onError、onCompleted事件中重新獲取租約以及添加新的KeepAlive。
以上就是java客戶端Etcd官方倉庫jetcd中KeepAlive接口實現(xiàn)的詳細內(nèi)容,更多關于jetcd中的KeepAlive實現(xiàn)的資料請關注腳本之家其它相關文章!
相關文章
SpringBoot環(huán)境Druid數(shù)據(jù)源使用及特點
Druid 是目前比較流行的高性能的,分布式列存儲的OLAP框架(具體來說是MOLAP)。本文給大家分享SpringBoot環(huán)境Druid數(shù)據(jù)源使用及特點介紹,感興趣的朋友跟隨小編一起看看吧2021-07-07
mybatis-plus用insertBatchSomeColumn方法批量新增指定字段
mybatisPlus底層的新增方法是一條一條的新增的,下面這篇文章主要給大家介紹了關于mybatis-plus用insertBatchSomeColumn方法批量新增指定字段的相關資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2023-05-05

