C++定時器實現(xiàn)和時間輪介紹
定時器
有些時候我們需要延遲執(zhí)行一些功能,比如每10s進行一次數(shù)據(jù)采集。或者告知用戶技能冷卻有多少時間,如果我們將執(zhí)行這些功能的任務交給主線程,就會造成主線程的阻塞。因此我們可以選擇一個創(chuàng)建一個子線程,讓其檢測定時器中的任務,當有任務的時間到了的時候,就去執(zhí)行這個任務。
最小堆實現(xiàn)定時器
定時器可以由很多種數(shù)據(jù)結構實現(xiàn),比如最小堆、紅黑樹、跳表、甚至數(shù)組都可以,其本質都是拿到最小時間的任務,然后取出該任務并執(zhí)行。
綜合實現(xiàn)難度和效率來看,最小堆是最容易實現(xiàn)定時器的數(shù)據(jù)結構。
最小堆主要有以下接口:
#pragma once
#include <vector>
#include <map>
using namespace std;
typedef void (*TimerHandler)(struct TimerNode *node);
//獲取系統(tǒng)時間,單位是毫秒
static uint32_t current_time()
{
uint32_t t;
struct timespec ti;
clock_gettime(CLOCK_MONOTONIC, &ti);
t = (uint32_t)ti.tv_sec * 1000;
t += ti.tv_nsec / 1000000;
return t;
}
struct TimerNode
{
//該任務在最小堆中的下標位置
int idx = 0;
//該任務是第幾號任務
int id = 0;
//幾毫秒后執(zhí)行該任務
unsigned int expire = 0;
//回調函數(shù)
TimerHandler cb = NULL;
};
class MinHeapTimer
{
public:
MinHeapTimer()
{
_heap.clear();
_map.clear();
}
int Count();
//加入任務,expire為該任務的失效時間,expire過后就要執(zhí)行回調函數(shù)cb
int AddTimer(uint32_t expire, TimerHandler cb);
//刪除一個任務
bool DelTimer(int id);
//獲取一個任務
void ExpireTimer();
private:
//用于比較兩個任務的過期時間
bool _compare(int lhs, int rhs);
//向下調整算法,每次刪除一個節(jié)點就要向下調整
void _shiftDown(int parent);
//向上調整算法,每添加一個數(shù)都要調用向上調整算法,保證根節(jié)點為最小節(jié)點
void _shiftUp(int child);
//刪除的子函數(shù)
void _delNode(TimerNode *node);
void resign(int pos);
private:
//數(shù)組中存儲任務節(jié)點
vector<TimerNode *> _heap;
//存儲值和響應節(jié)點的映射關系
map<int, TimerNode *> _map;
//任務的個數(shù),注意不是_heap的size
int _count = 0;
};具體的實現(xiàn)以及測試為:
#pragma once
#include <vector>
#include <map>
using namespace std;
typedef void (*TimerHandler)(struct TimerNode *node);
//獲取系統(tǒng)時間,單位是毫秒
static uint32_t current_time()
{
uint32_t t;
struct timespec ti;
clock_gettime(CLOCK_MONOTONIC, &ti);
t = (uint32_t)ti.tv_sec * 1000;
t += ti.tv_nsec / 1000000;
return t;
}
struct TimerNode
{
//該任務在最小堆中的下標位置
int idx = 0;
//該任務是第幾號任務
int id = 0;
unsigned int expire = 0;
//回調函數(shù)
TimerHandler cb = NULL;
};
class MinHeapTimer
{
public:
MinHeapTimer()
{
_heap.clear();
_map.clear();
}
int Count()
{
return ++_count;
}
//加入任務,expire為該任務的失效時間,expire過后就要執(zhí)行回調函數(shù)cb
int AddTimer(uint32_t expire, TimerHandler cb)
{
int64_t timeout = current_time() + expire;
TimerNode *node = new TimerNode;
int id = Count();
node->id = id;
node->expire = timeout;
node->cb = cb;
node->idx = (int)_heap.size();
_heap.push_back(node);
_shiftUp((int)_heap.size() - 1);
_map.insert(make_pair(id, node));
return id;
}
//刪除一個任務
bool DelTimer(int id)
{
auto iter = _map.find(id);
if (iter == _map.end())
return false;
_delNode(iter->second);
return true;
}
//獲取一個任務
void ExpireTimer()
{
if (_heap.empty())
{
return;
}
//獲取當前時間
uint32_t now = current_time();
while (!_heap.empty())
{
//獲取最近的一個任務
TimerNode *node = _heap.front();
//當最近一個任務的時間大于當前時間,說明沒有任務要執(zhí)行
if (now < node->expire)
{
break;
}
//遍歷一下堆,這一步可以不加
for (int i = 0; i < _heap.size(); i++)
{
std::cout << "touch idx: " << _heap[i]->idx
<< " id: " << _heap[i]->id << " expire: "
<< _heap[i]->expire << std::endl;
}
//執(zhí)行最近任務的回調函數(shù)
if (node->cb)
{
node->cb(node);
}
//執(zhí)行完就刪掉這個任務
_delNode(node);
}
}
private:
//用于比較兩個任務的過期時間
bool _compare(int lhs, int rhs)
{
return _heap[lhs]->expire < _heap[rhs]->expire;
}
//向下調整算法,每次刪除一個節(jié)點就要向下調整
void _shiftDown(int parent)
{
int child = parent * 2 + 1;
while (child < _heap.size() - 1)
{
if (child + 1 < _heap.size() - 1 && !_compare(child, child + 1))
{
child++;
}
if (!_compare(parent, child))
{
std::swap(_heap[parent], _heap[child]);
_heap[parent]->idx = parent;
_heap[child]->idx = child;
parent = child;
child = parent * 2 + 1;
}
else
{
break;
}
}
}
//向上調整算法,每添加一個數(shù)都要調用向上調整算法,保證根節(jié)點為最小節(jié)點
void _shiftUp(int child)
{
int parent = (child - 1) / 2;
while (child > 0)
{
if (!_compare(parent, child))
{
std::swap(_heap[parent], _heap[child]);
_heap[parent]->idx = parent;
_heap[child]->idx = child;
child = parent;
parent = (child - 1) / 2;
}
else
{
break;
}
}
}
//刪除的子函數(shù)
void _delNode(TimerNode *node)
{
int last = (int)_heap.size() - 1;
int idx = node->idx;
if (idx != last)
{
std::swap(_heap[idx], _heap[last]);
_heap[idx]->idx = idx;
resign(idx);
}
_heap.pop_back();
_map.erase(node->id);
delete node;
}
void resign(int pos)
{
//向上調整和向下調整只會發(fā)生一個
_shiftDown(pos);
_shiftUp(pos);
}
private:
//數(shù)組中存儲任務節(jié)點
vector<TimerNode *> _heap;
//存儲值和響應節(jié)點的映射關系
map<int, TimerNode *> _map;
//任務的個數(shù),注意不是_heap的size
int _count = 0;
};#include <time.h>
#include <unistd.h>
#include <iostream>
#include "minheap.h"
void print_hello(TimerNode *te)
{
std::cout << "hello world time = " << te->idx << "\t" << te->id << "\t" << current_time() << std::endl;
}
int main()
{
MinHeapTimer mht;
//一號任務,立刻執(zhí)行
mht.AddTimer(0, print_hello);
//二號任務,一秒后執(zhí)行
mht.AddTimer(1000, print_hello);
mht.AddTimer(7000, print_hello);
mht.AddTimer(2000, print_hello);
mht.AddTimer(9000, print_hello);
mht.AddTimer(10000, print_hello);
mht.AddTimer(6000, print_hello);
mht.AddTimer(3000, print_hello);
while (1)
{
mht.ExpireTimer();
// usleep(10000);
sleep(1);
}
return 0;
}
時間輪
上面的定時器在任務數(shù)量很多時,效率會很低,因為我們需要用最小堆來維護這些任務,并且每刪除一個任務,都要進行調整。其主要原因是我們不知道其他任務什么時候執(zhí)行,所以我們只能進行調整,將最近的任務放到堆頂。
如果我們能夠向哈希桶那樣,將要執(zhí)行的任務形成鏈表,掛到要執(zhí)行的位置,當時間走到那個位置的時候,就執(zhí)行這些任務,效率豈不是更高?

單層級時間輪
客戶端每 5 秒鐘發(fā)送?跳包;服務端若 10 秒內(nèi)沒收到?跳數(shù)據(jù),則清除連接。
考慮到正常情況下 5 秒鐘發(fā)送?次?跳包,10 秒才檢測?次,如下圖到索引為 10 的時候并不能踢掉連接;所以需要每收到?次?跳包則 used++ ,每檢測?次 used-- ;當檢測到used == 0 則踢掉連接;

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <sys/time.h>
#include <time.h>
#define MAX_CONN ((1 << 16) - 1)
//連接的節(jié)點,用來記錄心跳包發(fā)送的次數(shù)
typedef struct conn_node
{
struct conn_node *next;
//引用計數(shù)的次數(shù),當used==0,相當于自動銷毀
uint8_t used;
int id;
} conn_node_t;
//用數(shù)組記錄所有的連接
static conn_node_t nodes[MAX_CONN] = {0};
static uint32_t iter = 0;
//獲取一個空的連接節(jié)點
conn_node_t *get_node()
{
iter++;
while (nodes[iter & MAX_CONN].used > 0)
{
iter++;
}
return &nodes[iter];
}
//哈希桶的個數(shù)
#define TW_SIZE 16
//檢測心跳包的延遲時間,由于哈希桶的個數(shù)有限,所以心跳包發(fā)送時間不能夠超過6
#define EXPIRE 10
//取余操作
#define TW_MASK (TW_SIZE - 1)
static uint32_t tick = 0;
//哈希桶
typedef struct link_list
{
conn_node_t head;
//一個tail,能夠進行快速插入
conn_node_t *tail;
} link_list_t;
//添加連接
void add_conn(link_list_t *tw, conn_node_t *node, int delay)
{
//獲取對應的哈希桶
link_list_t *list = &tw[(tick + EXPIRE + delay) & TW_MASK];
list->tail->next = node;
list->tail = node;
node->next = NULL;
node->used++;
}
//清楚這個哈希桶
void link_clear(link_list_t *list)
{
list->head.next = NULL;
list->tail = &(list->head);
}
//檢測哈希桶
void check_conn(link_list_t *tw)
{
int32_t itick = tick;
tick++;
//取到對應哈希桶的鏈表
link_list_t *list = &tw[itick & TW_MASK];
//檢測哈希桶對應的鏈表
conn_node_t *current = list->head.next;
while (current)
{
conn_node_t *temp = current;
current = current->next;
temp->used--;
if (temp->used == 0)
{
printf("連接:%d 斷開\n", temp->id);
temp->next = NULL;
continue;
}
printf("這個鏈接:%d 心跳包還剩:%d個需要檢測\n", temp->id, temp->used);
}
link_clear(list);
}
//獲取時間,單位是s
static time_t current_time()
{
time_t t;
struct timespec ti;
clock_gettime(CLOCK_MONOTONIC, &ti);
t = (time_t)ti.tv_sec;
return t;
}
int main()
{
memset(nodes, 0, MAX_CONN * sizeof(conn_node_t));
// init link list
link_list_t tw[TW_SIZE];
memset(tw, 0, TW_SIZE * sizeof(link_list_t));
for (int i = 0; i < TW_SIZE; i++)
{
link_clear(&tw[i]);
}
// 第一個連接對應的心跳包,在0和5時進行發(fā)送
//所以會在10s和15s時進行檢測,15s時把該連接斷開
{
conn_node_t *node = get_node();
node->id = 10001;
add_conn(tw, node, 0);
add_conn(tw, node, 5);
}
//第二個連接發(fā)送的心跳包,在第10s時進行檢測
{
conn_node_t *node = get_node();
node->id = 10002;
add_conn(tw, node, 0);
}
//第二個連接發(fā)送的心跳包,在第13s時檢測
{
conn_node_t *node = get_node();
node->id = 10003;
add_conn(tw, node, 3);
}
time_t start = current_time();
while (1)
{
time_t now = current_time();
if (now - start > 0)
{
for (int i = 0; i < now - start; i++)
check_conn(tw);
start = now;
printf("在第%d秒時檢測,此時機器時間:%d\n", tick, now);
}
}
return 0;
}
上面的時間輪受哈希桶大小和延遲10s收到心跳包的影響,只能在[0,6]秒內(nèi)發(fā)送數(shù)據(jù)心跳包,如果想要延遲長時間,則需要擴大哈希桶的大小。
如果我們不發(fā)送心跳包,而是改成在若干秒后執(zhí)行一個任務,比如50s后執(zhí)行任務,但哈希桶大小只有16,我們可以在任務節(jié)點中增加一個參數(shù)round,用來記錄需要走多少遍哈希桶,比如50s,對應到大小為16的哈希桶則round=3,idx=2。
不過這樣做,在check_conn取任務時就不能夠把整個鏈表都取出來,而是需要取出round==0的任務。
typedef struct node
{
struct conn_node *next;
//需要走多少輪哈希桶,當round==0時,則說明需要執(zhí)行這個任務
int round;
int id;
} node_t;這樣做能解決時間輪刻度范圍過大造成的空間浪費,但是卻帶來了另一個問題:時間輪每次都需要遍歷任務列表,耗時增加,當時間輪刻度粒度很小(秒級甚至毫秒級),任務列表又特別長時,這種遍歷的辦法是不可接受的。
多層級時間輪
參照時鐘表盤的運轉規(guī)律,可以將定時任務根據(jù)觸發(fā)的緊急程度,分布到不同層級的時間輪中;
假設時間精度為 10ms ;在第 1 層級每 10ms 移動?格;每移動?格執(zhí)?該格?當中所有的定時任務;
當?shù)?1 層指針從 255 格開始移動,此時層級 2 移動?格;層級 2 移動?格的?為定義為,將該格當中的定時任務重新映射到層級 1 當中;同理,層級 2 當中從 63 格開始移動,層級 3 格?中的定時任務重新映射到層級 2 ; 以此類推層級 4 往層級 3 映射,層級 5 往層級 4 映射。
只有任務在第一層時才會被執(zhí)行,其他層都是將任務重新映射到上一層。

timewheel.h:
#pragma once
#include <stdint.h>
#define TIME_NEAR_SHIFT 8
#define TIME_NEAR (1 << TIME_NEAR_SHIFT)
#define TIME_LEVEL_SHIFT 6
#define TIME_LEVEL (1 << TIME_LEVEL_SHIFT)
#define TIME_NEAR_MASK (TIME_NEAR - 1)
#define TIME_LEVEL_MASK (TIME_LEVEL - 1)
typedef struct timer_node timer_node_t;
typedef void (*handler_pt)(struct timer_node *node);
struct timer_node
{
struct timer_node *next;
uint32_t expire; //時間
handler_pt callback; //回調函數(shù)
uint8_t cancel; //是否刪除
int id; // 此時攜帶參數(shù)
};
timer_node_t *add_timer(int time, handler_pt func, int threadid);
void expire_timer(void);
void del_timer(timer_node_t *node);
void init_timer(void);
void clear_timer();spinlock.h:
#pragma once
struct spinlock
{
int lock;
};
void spinlock_init(struct spinlock *lock)
{
lock->lock = 0;
}
void spinlock_lock(struct spinlock *lock)
{
while (__sync_lock_test_and_set(&lock->lock, 1))
{
}
}
int spinlock_trylock(struct spinlock *lock)
{
return __sync_lock_test_and_set(&lock->lock, 1) == 0;
}
void spinlock_unlock(struct spinlock *lock)
{
__sync_lock_release(&lock->lock);
}
void spinlock_destroy(struct spinlock *lock)
{
(void)lock;
}timewheel.cpp:
#include "spinlock.h"
#include "timewheel.h"
#include <string.h>
#include <stddef.h>
#include <stdlib.h>
#include <time.h>
typedef struct link_list
{
timer_node_t head;
timer_node_t *tail;
} link_list_t;
//多級時間輪
typedef struct timer
{
//第一級
link_list_t near[TIME_NEAR];
// 2-4級
link_list_t t[4][TIME_LEVEL];
struct spinlock lock;
uint32_t time;
uint64_t current;
uint64_t current_point;
} s_timer_t;
static s_timer_t *TI = NULL;
timer_node_t *link_clear(link_list_t *list)
{
timer_node_t *ret = list->head.next;
list->head.next = 0;
list->tail = &(list->head);
return ret;
}
//鏈接一個節(jié)點
void link(link_list_t *list, timer_node_t *node)
{
list->tail->next = node;
list->tail = node;
node->next = 0;
}
void add_node(s_timer_t *T, timer_node_t *node)
{
uint32_t time = node->expire;
uint32_t current_time = T->time;
uint32_t msec = time - current_time;
//根據(jù)時間
if (msec < TIME_NEAR)
{ //[0, 0x100)
link(&T->near[time & TIME_NEAR_MASK], node);
}
else if (msec < (1 << (TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)))
{ //[0x100, 0x4000)
link(&T->t[0][((time >> TIME_NEAR_SHIFT) & TIME_LEVEL_MASK)], node);
}
else if (msec < (1 << (TIME_NEAR_SHIFT + 2 * TIME_LEVEL_SHIFT)))
{ //[0x4000, 0x100000)
link(&T->t[1][((time >> (TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)], node);
}
else if (msec < (1 << (TIME_NEAR_SHIFT + 3 * TIME_LEVEL_SHIFT)))
{ //[0x100000, 0x4000000)
link(&T->t[2][((time >> (TIME_NEAR_SHIFT + 2 * TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)], node);
}
else
{ //[0x4000000, 0xffffffff]
link(&T->t[3][((time >> (TIME_NEAR_SHIFT + 3 * TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)], node);
}
}
//增加事件
timer_node_t *add_timer(int time, handler_pt func, int threadid)
{
timer_node_t *node = (timer_node_t *)malloc(sizeof(*node));
spinlock_lock(&TI->lock);
node->expire = time + TI->time; // 每10ms加1 0
node->callback = func;
node->id = threadid;
if (time <= 0)
{
node->callback(node);
free(node);
spinlock_unlock(&TI->lock);
return NULL;
}
add_node(TI, node);
spinlock_unlock(&TI->lock);
return node;
}
void move_list(s_timer_t *T, int level, int idx)
{
timer_node_t *current = link_clear(&T->t[level][idx]);
while (current)
{
timer_node_t *temp = current->next;
add_node(T, current);
current = temp;
}
}
void timer_shift(s_timer_t *T)
{
int mask = TIME_NEAR;
uint32_t ct = ++T->time;
if (ct == 0)
{
move_list(T, 3, 0);
}
else
{
// ct / 256
uint32_t time = ct >> TIME_NEAR_SHIFT;
int i = 0;
// ct % 256 == 0
while ((ct & (mask - 1)) == 0)
{
int idx = time & TIME_LEVEL_MASK;
if (idx != 0)
{
move_list(T, i, idx);
break;
}
mask <<= TIME_LEVEL_SHIFT;
time >>= TIME_LEVEL_SHIFT;
++i;
}
}
}
void dispatch_list(timer_node_t *current)
{
do
{
timer_node_t *temp = current;
current = current->next;
if (temp->cancel == 0)
temp->callback(temp);
free(temp);
} while (current);
}
void timer_execute(s_timer_t *T)
{
int idx = T->time & TIME_NEAR_MASK;
while (T->near[idx].head.next)
{
timer_node_t *current = link_clear(&T->near[idx]);
spinlock_unlock(&T->lock);
dispatch_list(current);
spinlock_lock(&T->lock);
}
}
void timer_update(s_timer_t *T)
{
spinlock_lock(&T->lock);
timer_execute(T);
timer_shift(T);
timer_execute(T);
spinlock_unlock(&T->lock);
}
void del_timer(timer_node_t *node)
{
node->cancel = 1;
}
s_timer_t *timer_create_timer()
{
s_timer_t *r = (s_timer_t *)malloc(sizeof(s_timer_t));
memset(r, 0, sizeof(*r));
int i, j;
for (i = 0; i < TIME_NEAR; i++)
{
link_clear(&r->near[i]);
}
for (i = 0; i < 4; i++)
{
for (j = 0; j < TIME_LEVEL; j++)
{
link_clear(&r->t[i][j]);
}
}
spinlock_init(&r->lock);
r->current = 0;
return r;
}
uint64_t gettime()
{
uint64_t t;
struct timespec ti;
clock_gettime(CLOCK_MONOTONIC, &ti);
t = (uint64_t)ti.tv_sec * 100;
t += ti.tv_nsec / 10000000;
return t;
}
void expire_timer(void)
{
uint64_t cp = gettime();
if (cp != TI->current_point)
{
uint32_t diff = (uint32_t)(cp - TI->current_point);
TI->current_point = cp;
int i;
for (i = 0; i < diff; i++)
{
timer_update(TI);
}
}
}
void init_timer(void)
{
TI = timer_create_timer();
TI->current_point = gettime();
}
void clear_timer()
{
int i, j;
for (i = 0; i < TIME_NEAR; i++)
{
link_list_t *list = &TI->near[i];
timer_node_t *current = list->head.next;
while (current)
{
timer_node_t *temp = current;
current = current->next;
free(temp);
}
link_clear(&TI->near[i]);
}
for (i = 0; i < 4; i++)
{
for (j = 0; j < TIME_LEVEL; j++)
{
link_list_t *list = &TI->t[i][j];
timer_node_t *current = list->head.next;
while (current)
{
timer_node_t *temp = current;
current = current->next;
free(temp);
}
link_clear(&TI->t[i][j]);
}
}
}tw-timer.cpp:
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <stdlib.h>
#include "timewheel.h"
struct context
{
int quit;
int thread;
};
struct thread_param
{
struct context *ctx;
int id;
};
static struct context ctx = {0};
void do_timer(timer_node_t *node)
{
printf("timer expired:%d - thread-id:%d\n", node->expire, node->id);
}
void *thread_worker(void *p)
{
struct thread_param *tp = (struct thread_param *)p;
int id = tp->id;
struct context *ctx = tp->ctx;
while (!ctx->quit)
{
int expire = rand() % 200;
add_timer(expire, do_timer, id);
usleep(expire * (10 - 1) * 1000);
}
printf("thread_worker:%d exit!\n", id);
return NULL;
}
void do_quit(timer_node_t *node)
{
ctx.quit = 1;
}
int main()
{
srand(time(NULL));
ctx.thread = 8;
pthread_t pid[ctx.thread];
init_timer();
add_timer(6000, do_quit, 100);
struct thread_param task_thread_p[ctx.thread];
int i;
for (i = 0; i < ctx.thread; i++)
{
task_thread_p[i].id = i;
task_thread_p[i].ctx = &ctx;
if (pthread_create(&pid[i], NULL, thread_worker, &task_thread_p[i]))
{
fprintf(stderr, "create thread failed\n");
exit(1);
}
}
while (!ctx.quit)
{
expire_timer();
usleep(2500);
}
clear_timer();
for (i = 0; i < ctx.thread; i++)
{
pthread_join(pid[i], NULL);
}
printf("all thread is closed\n");
return 0;
}
到此這篇關于C++定時器實現(xiàn)和時間輪介紹的文章就介紹,到這了,更多相關C++定時器 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
C++ 將字符串值賦給CHAR數(shù)組的實現(xiàn)
這篇文章主要介紹了C++ 將字符串值賦給CHAR數(shù)組的實現(xiàn),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01
C++實現(xiàn)LeetCode(18.四數(shù)之和)
這篇文章主要介紹了C++實現(xiàn)LeetCode(18.四數(shù)之和),本篇文章通過簡要的案例,講解了該項技術的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下2021-07-07

