nginx線程池源碼分析
周末看了nginx線程池部分的代碼,順手照抄了一遍,寫成了自己的版本。實(shí)現(xiàn)上某些地方還是有差異的,不過基本結(jié)構(gòu)全部摘抄。
在這里分享一下。如果你看懂了我的版本,也就證明你看懂了nginx的線程池。
本文只列出了關(guān)鍵數(shù)據(jù)結(jié)構(gòu)和API,重在理解nginx線程池設(shè)計(jì)思路。完整代碼在最后的鏈接里。
1.任務(wù)節(jié)點(diǎn)
typedef void (*CB_FUN)(void *);
//任務(wù)結(jié)構(gòu)體
typedef struct task
{
void *argv; //任務(wù)函數(shù)的參數(shù)(任務(wù)執(zhí)行結(jié)束前,要保證參數(shù)地址有效)
CB_FUN handler; //任務(wù)函數(shù)(返回值必須為0 非0值用作增加線程,和銷毀線程池)
struct task *next; //任務(wù)鏈指針
}zoey_task_t;
handler為函數(shù)指針,是實(shí)際的任務(wù)函數(shù),argv為該函數(shù)的參數(shù),next指向下一個任務(wù)。
2.任務(wù)隊(duì)列
typedef struct task_queue
{
zoey_task_t *head; //隊(duì)列頭
zoey_task_t **tail; //隊(duì)列尾
unsigned int maxtasknum; //最大任務(wù)限制
unsigned int curtasknum; //當(dāng)前任務(wù)數(shù)
}zoey_task_queue_t;
head為任務(wù)隊(duì)列頭指針,tail為任務(wù)隊(duì)列尾指針,maxtasknum為隊(duì)列最大任務(wù)數(shù)限制,curtasknum為隊(duì)列當(dāng)前任務(wù)數(shù)。
3.線程池
typedef struct threadpool
{
pthread_mutex_t mutex; //互斥鎖
pthread_cond_t cond; //條件鎖
zoey_task_queue_t tasks;//任務(wù)隊(duì)列
unsigned int threadnum; //線程數(shù)
unsigned int thread_stack_size; //線程堆棧大小
}zoey_threadpool_t;
mutex為互斥鎖 cond為條件鎖。mutex和cond共同保證線程池任務(wù)的互斥領(lǐng)取或者添加。
tasks指向任務(wù)隊(duì)列。
threadnum為線程池的線程數(shù)
thread_stack_size為線程堆棧大小
4.啟動配置
//配置參數(shù)
typedef struct threadpool_conf
{
unsigned int threadnum; //線程數(shù)
unsigned int thread_stack_size;//線程堆棧大小
unsigned int maxtasknum;//最大任務(wù)限制
}zoey_threadpool_conf_t;
啟動配置結(jié)構(gòu)體是初始化線程池時(shí)的一些參數(shù)。
5.初始化線程池
首先檢查參數(shù)是否合法,然后初始化mutex,cond,key(pthread_key_t)。key用來讀寫線程全局變量,此全局變量控制線程是否退出。
最后創(chuàng)建線程。
zoey_threadpool_t* zoey_threadpool_init(zoey_threadpool_conf_t *conf)
{
zoey_threadpool_t *pool = NULL;
int error_flag_mutex = 0;
int error_flag_cond = 0;
pthread_attr_t attr;
do{
if (z_conf_check(conf) == -1){ //檢查參數(shù)是否合法
break;
}
pool = (zoey_threadpool_t *)malloc(sizeof(zoey_threadpool_t));//申請線程池句柄
if (pool == NULL){
break;
}
//初始化線程池基本參數(shù)
pool->threadnum = conf->threadnum;
pool->thread_stack_size = conf->thread_stack_size;
pool->tasks.maxtasknum = conf->maxtasknum;
pool->tasks.curtasknum = 0;
z_task_queue_init(&pool->tasks);
if (z_thread_key_create() != 0){//創(chuàng)建一個pthread_key_t,用以訪問線程全局變量。
free(pool);
break;
}
if (z_thread_mutex_create(&pool->mutex) != 0){ //初始化互斥鎖
z_thread_key_destroy();
free(pool);
break;
}
if (z_thread_cond_create(&pool->cond) != 0){ //初始化條件鎖
z_thread_key_destroy();
z_thread_mutex_destroy(&pool->mutex);
free(pool);
break;
}
if (z_threadpool_create(pool) != 0){ //創(chuàng)建線程池
z_thread_key_destroy();
z_thread_mutex_destroy(&pool->mutex);
z_thread_cond_destroy(&pool->cond);
free(pool);
break;
}
return pool;
}while(0);
return NULL;
}
6.添加任務(wù)
首先申請一個任務(wù)節(jié)點(diǎn),實(shí)例化后將節(jié)點(diǎn)加入任務(wù)隊(duì)列,并將當(dāng)前任務(wù)隊(duì)列數(shù)++并通知其他進(jìn)程有新任務(wù)。整個過程加鎖。
int zoey_threadpool_add_task(zoey_threadpool_t *pool, CB_FUN handler, void* argv)
{
zoey_task_t *task = NULL;
//申請一個任務(wù)節(jié)點(diǎn)并賦值
task = (zoey_task_t *)malloc(sizeof(zoey_task_t));
if (task == NULL){
return -1;
}
task->handler = handler;
task->argv = argv;
task->next = NULL;
if (pthread_mutex_lock(&pool->mutex) != 0){ //加鎖
free(task);
return -1;
}
do{
if (pool->tasks.curtasknum >= pool->tasks.maxtasknum){//判斷工作隊(duì)列中的任務(wù)數(shù)是否達(dá)到限制
break;
}
//將任務(wù)節(jié)點(diǎn)尾插到任務(wù)隊(duì)列
*(pool->tasks.tail) = task;
pool->tasks.tail = &task->next;
pool->tasks.curtasknum++;
//通知阻塞的線程
if (pthread_cond_signal(&pool->cond) != 0){
break;
}
//解鎖
pthread_mutex_unlock(&pool->mutex);
return 0;
}while(0);
pthread_mutex_unlock(&pool->mutex);
free(task);
return -1;
}
7.銷毀線程池
銷毀線程池其實(shí)也是向任務(wù)隊(duì)列添加任務(wù),只不過添加的任務(wù)是讓線程退出。z_threadpool_exit_cb函數(shù)會將lock置0后退出線程,lock為0表示此線程
已經(jīng)退出,接著退出下一個線程。退出完線程釋放所有資源。
void zoey_threadpool_destroy(zoey_threadpool_t *pool)
{
unsigned int n = 0;
volatile unsigned int lock;
//z_threadpool_exit_cb函數(shù)會使對應(yīng)線程退出
for (; n < pool->threadnum; n++){
lock = 1;
if (zoey_threadpool_add_task(pool, z_threadpool_exit_cb, &lock) != 0){
return;
}
while (lock){
usleep(1);
}
}
z_thread_mutex_destroy(&pool->mutex);
z_thread_cond_destroy(&pool->cond);
z_thread_key_destroy();
free(pool);
}
8.增加一個線程
很簡單,再生成一個線程以及線程數(shù)++即可。加鎖。
int zoey_thread_add(zoey_threadpool_t *pool)
{
int ret = 0;
if (pthread_mutex_lock(&pool->mutex) != 0){
return -1;
}
ret = z_thread_add(pool);
pthread_mutex_unlock(&pool->mutex);
return ret;
}
9.改變?nèi)蝿?wù)隊(duì)列最大任務(wù)限制
當(dāng)num=0時(shí)設(shè)置線程數(shù)為無限大。
void zoey_set_max_tasknum(zoey_threadpool_t *pool,unsigned int num)
{
if (pthread_mutex_lock(&pool->mutex) != 0){
return -1;
}
z_change_maxtask_num(pool, num); //改變最大任務(wù)限制
pthread_mutex_unlock(&pool->mutex);
}
10.使用示例
int main()
{
int array[10000] = {0};
int i = 0;
zoey_threadpool_conf_t conf = {5,0,5}; //實(shí)例化啟動參數(shù)
zoey_threadpool_t *pool = zoey_threadpool_init(&conf);//初始化線程池
if (pool == NULL){
return 0;
}
for (; i < 10000; i++){
array[i] = i;
if (i == 80){
zoey_thread_add(pool); //增加線程
zoey_thread_add(pool);
}
if (i == 100){
zoey_set_max_tasknum(pool, 0); //改變最大任務(wù)數(shù) 0為不做上限
}
while(1){
if (zoey_threadpool_add_task(pool, testfun, &array[i]) == 0){
break;
}
printf("error in i = %d\n",i);
}
}
zoey_threadpool_destroy(pool);
while(1){
sleep(5);
}
return 0;
}
11.源碼
https://github.com/unlikewashface/zoey_threadpool.git
線程池可以發(fā)揮更多作用,比如可以把連接放到線程池里。nginx的異步加lua的協(xié)程是個非常好的組合,現(xiàn)在有了線程池后,線程池加協(xié)程將是另一個選擇??偠灾?,如果在保證性能的情況下,讓nginx開發(fā)變得非常簡單,這是非常利好的消息。
相關(guān)文章
nginx try_files指令的實(shí)現(xiàn)示例
try_files用于指定文件的查找規(guī)則,可以配置多個規(guī)則,會按順序執(zhí)行查找規(guī)則,本文主要介紹了nginx try_files指令的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下2024-07-07
Nginx下配置pathinfo及ThinkPHP的URL Rewrite模式支持
這篇文章主要介紹了Nginx下配置pathinfo及ThinkPHP的URL Rewrite模式支持,使用Nginx運(yùn)行ThinkPHP的必備配置,需要的朋友可以參考下2015-07-07
18個運(yùn)維必知的Nginx代理緩存配置技巧(你都掌握了哪些呢)
這篇文章主要介紹了18個運(yùn)維必知的Nginx代理緩存配置技巧(你都掌握了哪些呢),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
基于nginx獲取代理服務(wù)ip以及客戶端真實(shí)ip詳解
最近在研究nginx中如何獲取真實(shí)客戶端IP的方法,下面這篇文章主要給大家介紹了基于nginx獲取代理服務(wù)ip以及客戶端真實(shí)ip的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-07-07

