C++ 實(shí)現(xiàn)線程安全的頻率限制器(推薦)
很早以前,在學(xué)習(xí)使用 Python 的deque容器時(shí),我寫(xiě)了一篇文章python3 deque 雙向隊(duì)列創(chuàng)建與使用方法分析。最近需要壓測(cè)線上服務(wù)的性能,又不愿意總是在 QA 那邊排隊(duì),于是需要自己寫(xiě)一個(gè)壓測(cè)用的客戶端。其中一個(gè)核心需求就是要實(shí)現(xiàn) QPS 限制。
于是,終究逃不開(kāi)要在 C++ 中實(shí)現(xiàn)一個(gè)線程安全的頻率限制器。
設(shè)計(jì)思路
所謂頻率限制,就是要在一個(gè)時(shí)間段(inteval)中,限制操作的次數(shù)(limit)。這又可以引出兩種強(qiáng)弱不同的表述:
- 強(qiáng)表述:在任意一個(gè)長(zhǎng)度等于設(shè)定的時(shí)間段(interval)的滑動(dòng)窗口中,頻率限制器放行的操作次數(shù)(count)都不高于限制次數(shù)(limit)。
- 弱表述:在一組長(zhǎng)度等于設(shè)定的時(shí)間段(interval)且緊密相連的固定窗口中,每一個(gè)窗口里頻率限制器放行的操作次數(shù)(count)都不高于限制次數(shù)(limit)。
不難發(fā)現(xiàn),強(qiáng)表述通過(guò)「滑動(dòng)窗口」的方式,不僅限制了頻率,還要求了操作在時(shí)間上的均勻性。前作的頻率限制器,實(shí)際上對(duì)應(yīng)了這里的強(qiáng)表述。但實(shí)際工程中,我們通常只需要實(shí)現(xiàn)弱表述的頻率限制器就足夠使用了。
對(duì)于弱表述,實(shí)現(xiàn)起來(lái)主要思路如下:
當(dāng)操作計(jì)數(shù)(count)小于限制(limit)時(shí)直接放行;
單線程實(shí)現(xiàn)
在不考慮線程安全時(shí),不難給出這樣的實(shí)現(xiàn)。
struct ms_clock {
using rep = std::chrono::milliseconds::rep;
using period = std::chrono::milliseconds::period;
using duration = std::chrono::duration<rep, period>;
using time_point = std::chrono::time_point<ms_clock, duration>;
static
time_point now() noexcept {
return time_point(std::chrono::duration_cast<duration>(
std::chrono::steady_clock::now().time_since_epoch()));
}
};
} // namespace __details
class RateLimiter {
public:
using clock = __details::ms_clock; // 1.
private:
const uint64_t limit_;
const clock::duration interval_;
const clock::rep interval_count_;
mutable uint64_t count_{std::numeric_limits<uint64_t>::max()}; // 2.a.
mutable clock::rep timestamp_{0}; // 2.b.
public:
constexpr RateLimiter(uint64_t limit, clock::duration interval) :
limit_(limit), interval_(interval), interval_count_(interval_.count()) {}
RateLimiter(const RateLimiter&) = delete; // 3.a.
RateLimiter& operator=(const RateLimiter&) = delete; // 3.b.
RateLimiter(RateLimiter&&) = delete; // 3.c.
RateLimiter& operator=(RateLimiter&&) = delete; // 3.d.
bool operator()() const;
double qps() const {
return 1000.0 * this->limit_ / this->interval_count_;
}
};
bool RateLimiter::operator()() const {
auto orig_count = this->count_++;
if (orig_count < this->limit_) { // 4.
return true;
} else {
auto ts = this->timestamp_;
auto now = clock::now().time_since_epoch().count();
if (now - ts < this->interval_count_) { // 5.
return false;
}
this->timestamp_ = now;
this->count_ = 1;
return true;
}
}
這里,
(1) 表明頻率限制器使用單位為毫秒的時(shí)鐘。在實(shí)際使用時(shí),也可以按需改成微妙甚至納秒。
(2) 使用mutable修飾內(nèi)部狀態(tài)count_和timestamp_。這是因?yàn)閮蓚€(gè)limit_和interval_相同的頻率限制器,在邏輯上是等價(jià)的,但他們的內(nèi)部狀態(tài)卻不一定相同。因此,為了讓const成員函數(shù)能夠修改內(nèi)部狀態(tài)(而不改變邏輯等價(jià)),我們要給內(nèi)部狀態(tài)數(shù)據(jù)成員加上mutable修飾。
(2.a) 處將count_設(shè)置為類型可表示的最大值是為了讓
(4) 的判斷失敗,而對(duì)timestamp_進(jìn)行初始化。
(2.b) 處將timestamp_設(shè)置為0則是基于同樣的原因,讓 (5) 的判斷失敗。
(2.a) 和 (2.b) 處通過(guò)巧妙的初值設(shè)計(jì),將初始化狀態(tài)與后續(xù)正常工作狀態(tài)的邏輯統(tǒng)一了起來(lái),而無(wú)須丑陋的判斷。
(3) 禁止了對(duì)象的拷貝和移動(dòng)。這是因?yàn)橐粋€(gè)頻率限制器應(yīng)綁定一組操作,而不應(yīng)由兩組或更多組操作共享(對(duì)于拷貝的情形),或是中途失效(對(duì)于移動(dòng)的情形)。
如此一來(lái),函數(shù)調(diào)用運(yùn)算符就忠實(shí)地實(shí)現(xiàn)了前述邏輯。
多線程改造
第一步改造
當(dāng)有多線程同時(shí)調(diào)用RateLimiter::operator()時(shí),顯而易見(jiàn),在count_和timestamp_上會(huì)產(chǎn)生競(jìng)爭(zhēng)。我們有兩種辦法解決這個(gè)問(wèn)題:要不然加鎖,要不然把count_和timestamp_設(shè)為原子變量然后用原子操作解決問(wèn)題。于是,對(duì)函數(shù)調(diào)用運(yùn)算符,我們有如下第一步的改造。
class RateLimiter {
// 其余保持不變
private:
mutable std::atomic<uint64_t> count_{std::numeric_limits<uint64_t>::max()}; // 1.a.
mutable std::atomic<clock::rep> timestamp_{0}; // 1.b.
// 其余保持不變
};
bool RateLimiter::operator()() const {
auto orig_count = this->count_.fetch_add(1UL); // 2.
if (orig_count < this->limit_) {
return true;
} else {
auto ts = this->timestamp_.load(); // 3.
auto now = clock::now().time_since_epoch().count();
if (now - ts < this->interval_count_) {
return false;
}
this->timestamp_.store(now); // 4.
this->count_.store(1UL); // 5.
return true;
}
}
這里,
- (1) 將
count_和timestamp_聲明為原子的,從而方便后續(xù)進(jìn)行原子操作。 - (2) -- (5) 則將原有操作分別改為對(duì)應(yīng)的原子操作。
這樣看起來(lái)很完美,但其實(shí)是有 bug 的。我們重點(diǎn)關(guān)注 (4) 這里。(4) 的本意是更新timestamp_,以備下一次orig_count >= this->limit_時(shí)進(jìn)行判斷。準(zhǔn)確設(shè)置這一timestamp是頻率限制器正確工作的基石。但是,如果有兩個(gè)(或更多)線程,同時(shí)走到了 (4)會(huì)發(fā)生什么?
- 因?yàn)樵硬僮鞯拇嬖?,兩個(gè)線程會(huì)先后執(zhí)行 (4)。于是
timestamp_的值究竟是什么,我們完全不可預(yù)期。 - 此外,兩個(gè)線程會(huì)先后執(zhí)行 (5),即原子地將
count_置為1。但是你想,頻率限制器先后放行了兩次操作,但為什么count_是1呢?這是不是就「偷跑」了一次操作?
為此,我們要保證只有一個(gè)線程會(huì)真正設(shè)置timestamp_,而拒絕其他同樣走到 (4) 位置的線程的操作,以避免其重復(fù)設(shè)置timestamp_以及錯(cuò)誤地將count_再次置為1。
第二步改進(jìn)
于是有以下改進(jìn)。
bool RateLimiter::operator()() const {
auto orig_count = this->count_.fetch_add(1UL); // 3.
if (orig_count < this->limit_) { // 4.
return true;
} else {
auto ts = this->timestamp_.load();
auto now = clock::now().time_since_epoch().count();
if (now - ts < this->interval_count_) { // 5.
return false;
}
if (not this->timestamp_.compare_and_exchange_strong(ts, now)) { // 1.
return false;
}
this->count_.store(1UL); // 2.
return true;
}
}
這里,(1) 是一個(gè) CAS 原子操作。它會(huì)原子地比較timestamp_和ts的值(Compare):若他們相等,則將now的值寫(xiě)入timestamp_(Swap),并返回true;若他們不相等,則將timestamp_的值寫(xiě)入ts,并返回false。如果沒(méi)有其他線程搶先修改timestamp_的值,那么 CAS 操作應(yīng)該成功并返回true,繼續(xù)執(zhí)行后面的代碼;否則,說(shuō)明其他線程已經(jīng)搶先修改了timestamp_,當(dāng)前線程的操作被記入上一個(gè)周期而被頻率限制器拒絕。
現(xiàn)在要考慮 (2)。如果執(zhí)行完 (1) 之后立即立刻馬上沒(méi)有任何延遲地執(zhí)行 (2),那么當(dāng)然一切大吉。但如果這時(shí)候當(dāng)前線程被切出去,會(huì)發(fā)生什么?我們要分情況討論。
如果ts == 0,也就是「當(dāng)前線程」是頻率限制器第一次修改timestamp_。于是,當(dāng)前線程可能會(huì)在 (3) 處將count_(溢出地)自增為0,從而可能有其他線程通過(guò) (4)。此時(shí),當(dāng)前線程在當(dāng)前分片有可能應(yīng)當(dāng)被拒絕操作。為此,我們需要在 (1) 和 (2) 之間做額外的判斷。
if (ts == 0) {
auto orig_count = this->count.fetch_add(1UL);
return (orig_count < this->limit_);
}
如果ts != 0,也就是「當(dāng)前線程」并非頻率限制器第一次修改timestamp_。那么其他線程在 (4) 處必然判斷失敗,但在 (5) 處的判斷可能成功,從而可能繼續(xù)成功執(zhí)行 (1),從而接連兩次執(zhí)行 (2)。但這不影響正確性。因?yàn)橥ㄟ^(guò) (5) 表明相對(duì)當(dāng)前線程填入的timestamp_,已經(jīng)由過(guò)了一個(gè)時(shí)間段(interval),而在這個(gè)時(shí)間段里,只有當(dāng)前線程的一次操作會(huì)被接受。
第三次改進(jìn)
由此,我們得到:
bool RateLimiter::operator()() const {
auto orig_count = this->count_.fetch_add(1UL);
if (orig_count < this->limit_) {
return true;
} else {
auto ts = this->timestamp_.load();
auto now = clock::now().time_since_epoch().count();
if (now - ts < this->interval_count_) {
return false;
}
if (not this->timestamp_.compare_and_exchange_strong(ts, now)) {
return false;
}
if (ts == 0) {
auto orig_count = this->count.fetch_add(1UL);
return (orig_count < this->limit_);
}
this->count_.store(1UL);
return true;
}
}
至此,我們的代碼在邏輯上已經(jīng)成立了,接下來(lái)要做一些性能方面的優(yōu)化。
原子操作默認(rèn)采用std::memory_order_seq_cst的內(nèi)存模型。這是 C++ 中最嚴(yán)格的內(nèi)存模型,它有兩個(gè)保證:
- 程序指令和源碼順序一致;
- 所有線程的所有操作都有一致的順序。
為了實(shí)現(xiàn)順序一致性(sequential consistency),編譯器會(huì)使用很多對(duì)抗編譯器優(yōu)化和 CPU 亂序執(zhí)行(Out-of-Order Execution)的手段,因而性能較差。對(duì)于此處的count_,我們無(wú)需順序一致性模型,只需要獲取釋放模型(Aquire-Release)模型就足夠了。
第四次改進(jìn)
于是有第四次改進(jìn):
bool RateLimiter::operator()() const {
auto orig_count = this->count_.fetch_add(1UL, std::memory_order_acq_rel);
if (orig_count < this->limit_) {
return true;
} else {
auto ts = this->timestamp_.load();
auto now = clock::now().time_since_epoch().count();
if (now - ts < this->interval_count_) {
return false;
}
if (not this->timestamp_.compare_and_exchange_strong(ts, now)) {
return false;
}
if (ts == 0) {
auto orig_count = this->count.fetch_add(1UL, std::memory_order_acq_rel);
return (orig_count < this->limit_);
}
this->count_.store(1UL, std::memory_order_release);
return true;
}
}
至此,我們就完整實(shí)現(xiàn)了一個(gè)頻率限制器,可以愉快地開(kāi)始拉 QPS 壓測(cè)了!
總結(jié)
到此這篇關(guān)于在 C++ 中實(shí)現(xiàn)一個(gè)線程安全的頻率限制器的文章就介紹到這了,更多相關(guān)在 C++ 中實(shí)現(xiàn)一個(gè)線程安全的頻率限制器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
二叉樹(shù)中葉子節(jié)點(diǎn)的統(tǒng)計(jì)和樹(shù)高問(wèn)題
今天小編就為大家分享一篇關(guān)于二叉樹(shù)中葉子節(jié)點(diǎn)的統(tǒng)計(jì)和樹(shù)高問(wèn)題,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-03-03
C語(yǔ)言修煉之路初識(shí)指針陰陽(yáng)竅?地址還歸大道真下篇
指針是指向另一個(gè)變量的變量。意思是一個(gè)指針保存的是另一個(gè)變量的內(nèi)存地址。換句話說(shuō),指針保存的并不是普通意義上的數(shù)值,而是另一個(gè)變量的地址值。一個(gè)指針保存了另一個(gè)變量的地址值,就說(shuō)這個(gè)指針“指向”了那個(gè)變量2022-02-02
C++中測(cè)試程序運(yùn)行時(shí)間的幾種方法總結(jié)
本文介紹了C++中測(cè)量程序運(yùn)行時(shí)間的幾種方法,包括使用GetTickCount()、clock()、Boost庫(kù)的timer類以及高精度時(shí)控函數(shù)QueryPerformanceFrequency和QueryPerformanceCounter,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-09-09
c語(yǔ)言求出給定范圍內(nèi)的所有質(zhì)數(shù)
本文主要介紹了c語(yǔ)言求出給定范圍內(nèi)的所有質(zhì)數(shù)的小程序。具有很好的參考價(jià)值。下面跟著小編一起來(lái)看下吧2017-04-04
C語(yǔ)言左旋轉(zhuǎn)字符串與翻轉(zhuǎn)字符串中單詞順序的方法
這篇文章主要介紹了C語(yǔ)言左旋轉(zhuǎn)字符串與翻轉(zhuǎn)字符串中單詞順序的方法,給出了相關(guān)的兩道算法題目作為例子,需要的朋友可以參考下2016-02-02
C++保存txt文件實(shí)現(xiàn)方法代碼實(shí)例
這篇文章主要介紹了C++保存txt文件實(shí)現(xiàn)方法代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11
vs2019 MFC實(shí)現(xiàn)office界面的畫(huà)圖小項(xiàng)目
本文主要介紹了vs2019 MFC實(shí)現(xiàn)office界面的畫(huà)圖小項(xiàng)目,對(duì)大家入門(mén)有一定的幫助,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-06-06

