nginx源碼分析線程池詳解
nginx源碼分析線程池詳解
一、前言
nginx是采用多進(jìn)程模型,master和worker之間主要通過(guò)pipe管道的方式進(jìn)行通信,多進(jìn)程的優(yōu)勢(shì)就在于各個(gè)進(jìn)程互不影響。但是經(jīng)常會(huì)有人問(wèn)道,nginx為什么不采用多線程模型(這個(gè)除了之前一篇文章講到的情況,別的只有去問(wèn)作者了,HAHA)。其實(shí),nginx代碼中提供了一個(gè)thread_pool(線程池)的核心模塊來(lái)處理多任務(wù)的。下面就本人對(duì)該thread_pool這個(gè)模塊的理解來(lái)跟大家做些分享(文中錯(cuò)誤、不足還請(qǐng)大家指出,謝謝)
二、thread_pool線程池模塊介紹
nginx的主要功能都是由一個(gè)個(gè)模塊構(gòu)成的,thread_pool也不例外。線程池主要用于讀取、發(fā)送文件等IO操作,避免慢速IO影響worker的正常運(yùn)行。先引用一段官方的配置示例
Syntax: thread_pool name threads=number [max_queue=number]; Default: thread_pool default threads=32 max_queue=65536; Context: main
根據(jù)上述的配置說(shuō)明,thread_pool是有名字的,上面的線程數(shù)目以及隊(duì)列大小都是指每個(gè)worker進(jìn)程中的線程,而不是所有worker中線程的總數(shù)。一個(gè)線程池中所有的線程共享一個(gè)隊(duì)列,隊(duì)列中的最大人數(shù)數(shù)量為上面定義的max_queue,如果隊(duì)列滿了的話,再往隊(duì)列中添加任務(wù)就會(huì)報(bào)錯(cuò)。
根據(jù)之前講到過(guò)的模塊初始化流程(在master啟動(dòng)worker之前) create_conf--> command_set函數(shù)-->init_conf,下面就按照這個(gè)流程看看thread_pool模塊的初始化
/******************* nginx/src/core/ngx_thread_pool.c ************************/
//創(chuàng)建線程池所需的基礎(chǔ)結(jié)構(gòu)
static void * ngx_thread_pool_create_conf(ngx_cycle_t *cycle)
{
ngx_thread_pool_conf_t *tcf;
//從cycle->pool指向的內(nèi)存池中申請(qǐng)一塊內(nèi)存
tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));
if (tcf == NULL) {
return NULL;
}
//先申請(qǐng)包含4個(gè)ngx_thread_pool_t指針類型元素的數(shù)組
//ngx_thread_pool_t結(jié)構(gòu)體中保存了一個(gè)線程池相關(guān)的信息
if (ngx_array_init(&tcf->pools, cycle->pool, 4,
sizeof(ngx_thread_pool_t *))
!= NGX_OK)
{
return NULL;
}
return tcf;
}
//解析處理配置文件中thread_pool的配置,并將相關(guān)信息保存的ngx_thread_pool_t中
static char * ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_str_t *value;
ngx_uint_t i;
ngx_thread_pool_t *tp;
value = cf->args->elts;
//根據(jù)thread_pool配置中的name作為線程池的唯一標(biāo)識(shí)(如果重名,只有第一個(gè)有效)
//申請(qǐng)ngx_thread_pool_t結(jié)構(gòu)保存線程池的相關(guān)信息
//由此可見(jiàn),nginx支持配置多個(gè)name不同的線程池
tp = ngx_thread_pool_add(cf, &value[1]);
.......
//處理thread_pool配置行的所有元素
for (i = 2; i < cf->args->nelts; i++) {
//檢查配置的線程數(shù)
if (ngx_strncmp(value[i].data, "threads=", 8) == 0) {
.......
}
//檢查配置的最大隊(duì)列長(zhǎng)度
if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {
.......
}
}
......
}
//判斷包含多個(gè)線程池的數(shù)組中的各個(gè)線程池的配置是否正確
static char * ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf)
{
....
ngx_thread_pool_t **tpp;
tpp = tcf->pools.elts;
//遍歷數(shù)組中所有的線程池配置,并檢查其正確性
for (i = 0; i < tcf->pools.nelts; i++) {
.....
}
return NGX_CONF_OK;
}
在上述的流程走完之后,nginx的master就保存了一份所有線程池的配置(tcf->pools),這份配置在創(chuàng)建worker時(shí)也會(huì)被繼承。然后每個(gè)worker中都調(diào)用各個(gè)核心模塊的init_process函數(shù)(如果有的話)。
/******************* nginx/src/core/ngx_thread_pool.c ************************/
//創(chuàng)建線程池所需的基礎(chǔ)結(jié)構(gòu)
static ngx_int_t
ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
{
ngx_uint_t i;
ngx_thread_pool_t **tpp;
ngx_thread_pool_conf_t *tcf;
//如果不是worker或者只有一個(gè)worker就不起用線程池
if (ngx_process != NGX_PROCESS_WORKER
&& ngx_process != NGX_PROCESS_SINGLE)
{
return NGX_OK;
}
//初始化任務(wù)隊(duì)列
ngx_thread_pool_queue_init(&ngx_thread_pool_done);
tpp = tcf->pools.elts;
for (i = 0; i < tcf->pools.nelts; i++) {
//初始化各個(gè)線程池
if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {
return NGX_ERROR;
}
}
return NGX_OK;
}
//線程池初始化
static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
{
.....
//初始化任務(wù)隊(duì)列
ngx_thread_pool_queue_init(&tp->queue);
//創(chuàng)建線程鎖
if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
return NGX_ERROR;
}
//創(chuàng)建線程條件變量
if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
(void) ngx_thread_mutex_destroy(&tp->mtx, log);
return NGX_ERROR;
}
......
for (n = 0; n < tp->threads; n++) {
//創(chuàng)建線程池中的每個(gè)線程
err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
if (err) {
ngx_log_error(NGX_LOG_ALERT, log, err,
"pthread_create() failed");
return NGX_ERROR;
}
}
......
}
//線程池中線程處理主函數(shù)
static void *ngx_thread_pool_cycle(void *data)
{
......
for ( ;; ) {
//阻塞的方式獲取線程鎖
if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
return NULL;
}
/* the number may become negative */
tp->waiting--;
//如果任務(wù)隊(duì)列為空,就cond_wait阻塞等待有新任務(wù)時(shí)調(diào)用cond_signal/broadcast觸發(fā)
while (tp->queue.first == NULL) {
if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
!= NGX_OK)
{
(void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
return NULL;
}
}
//從任務(wù)隊(duì)列中獲取task,并將其從隊(duì)列中移除
task = tp->queue.first;
tp->queue.first = task->next;
if (tp->queue.first == NULL) {
tp->queue.last = &tp->queue.first;
}
if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
return NULL;
}
......
//task的處理函數(shù)
task->handler(task->ctx, tp->log);
.....
ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
//將經(jīng)過(guò)預(yù)處理的任務(wù)添加到done隊(duì)列中等待調(diào)用event的回調(diào)函數(shù)繼續(xù)處理
*ngx_thread_pool_done.last = task;
ngx_thread_pool_done.last = &task->next;
//防止編譯器優(yōu)化,保證解鎖操作是在上述語(yǔ)句執(zhí)行完畢后再去執(zhí)行的
ngx_memory_barrier();
ngx_unlock(&ngx_thread_pool_done_lock);
(void) ngx_notify(ngx_thread_pool_handler);
}
}
//處理pool_done隊(duì)列上task中包含的每個(gè)event事件
static void ngx_thread_pool_handler(ngx_event_t *ev)
{
.....
ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
//獲取任務(wù)鏈表的頭部
task = ngx_thread_pool_done.first;
ngx_thread_pool_done.first = NULL;
ngx_thread_pool_done.last = &ngx_thread_pool_done.first;
ngx_memory_barrier();
ngx_unlock(&ngx_thread_pool_done_lock);
while (task) {
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
"run completion handler for task #%ui", task->id);
//遍歷隊(duì)列中的所有任務(wù)事件
event = &task->event;
task = task->next;
event->complete = 1;
event->active = 0;
//調(diào)用event對(duì)應(yīng)的處理函數(shù)有針對(duì)性的進(jìn)行處理
event->handler(event);
}
}
三、thread_pool線程池使用示例
根據(jù)之前所講到的,nginx中的線程池主要是用于操作文件的IO操作。所以,在nginx中自帶的模塊ngx_http_file_cache.c文件中看到了線程池的使用。
/*********************** nginx/src/os/unix/ngx_files.c **********************/
//file_cache模塊的處理函數(shù)(涉及到了線程池)
static ssize_t ngx_http_file_cache_aio_read(ngx_http_request_t *r, ngx_http_cache_t *c)
{
.......
#if (NGX_THREADS)
if (clcf->aio == NGX_HTTP_AIO_THREADS) {
c->file.thread_task = c->thread_task;
//這里注冊(cè)的函數(shù)在下面語(yǔ)句中的ngx_thread_read函數(shù)中被調(diào)用
c->file.thread_handler = ngx_http_cache_thread_handler;
c->file.thread_ctx = r;
//根據(jù)任務(wù)的屬性,選擇正確的線程池,并初始化task結(jié)構(gòu)體中的各個(gè)成員
n = ngx_thread_read(&c->file, c->buf->pos, c->body_start, 0, r->pool);
c->thread_task = c->file.thread_task;
c->reading = (n == NGX_AGAIN);
return n;
}
#endif
return ngx_read_file(&c->file, c->buf->pos, c->body_start, 0);
}
//task任務(wù)的處理函數(shù)
static ngx_int_t ngx_http_cache_thread_handler(ngx_thread_task_t *task, ngx_file_t *file)
{
.......
tp = clcf->thread_pool;
.......
task->event.data = r;
//注冊(cè)thread_event_handler函數(shù),該函數(shù)在處理pool_done隊(duì)列中event事件時(shí)被調(diào)用
task->event.handler = ngx_http_cache_thread_event_handler;
//將任務(wù)放到線程池的任務(wù)隊(duì)列中
if (ngx_thread_task_post(tp, task) != NGX_OK) {
return NGX_ERROR;
}
......
}
/*********************** nginx/src/core/ngx_thread_pool.c **********************/
//添加任務(wù)到隊(duì)列中
ngx_int_t ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
{
//如果當(dāng)前的任務(wù)正在處理就退出
if (task->event.active) {
ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
"task #%ui already active", task->id);
return NGX_ERROR;
}
if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
return NGX_ERROR;
}
//判斷當(dāng)前線程池等待的任務(wù)數(shù)量與最大隊(duì)列長(zhǎng)度的關(guān)系
if (tp->waiting >= tp->max_queue) {
(void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
ngx_log_error(NGX_LOG_ERR, tp->log, 0,
"thread pool \"%V\" queue overflow: %i tasks waiting",
&tp->name, tp->waiting);
return NGX_ERROR;
}
//激活任務(wù)
task->event.active = 1;
task->id = ngx_thread_pool_task_id++;
task->next = NULL;
//通知阻塞的線程有新事件加入,可以解除阻塞
if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
(void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
return NGX_ERROR;
}
*tp->queue.last = task;
tp->queue.last = &task->next;
tp->waiting++;
(void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
"task #%ui added to thread pool \"%V\"",
task->id, &tp->name);
return NGX_OK;
}
上面示例基本展示了nginx目前對(duì)線程池的使用方法,采用線程池來(lái)處理IO這類慢速操作可以提升worker的主線程的執(zhí)行效率。當(dāng)然,用戶自己在開(kāi)發(fā)模塊時(shí),也可以參照f(shuō)ile_cache模塊中使用線程池的方法來(lái)調(diào)用多線程提升程序性能。(歡迎大家多多批評(píng)指正)
感謝閱讀,希望能幫助到大家,謝謝大家對(duì)本站的支持!
相關(guān)文章
Nginx轉(zhuǎn)發(fā)丟失cookie表現(xiàn)形式及解決方案
本文主要介紹了Nginx轉(zhuǎn)發(fā)丟失cookie表現(xiàn)形式及解決方案,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01
Nginx下WordPress鏈接(url偽靜態(tài))301永久重定向?qū)崿F(xiàn)方法
在幾個(gè)blog程序中折騰的結(jié)果,導(dǎo)致url連續(xù)二次變化。這是第三次了。 nginx 通過(guò)rewrite 使用 permanent; 參數(shù) 成301永久url重定向2012-09-09
nginx的請(qǐng)求轉(zhuǎn)發(fā)配置過(guò)程
Nginx在Windows和Linux環(huán)境下的安裝、啟動(dòng)、停止、配置和請(qǐng)求轉(zhuǎn)發(fā)過(guò)程,配置文件語(yǔ)法檢測(cè)、優(yōu)雅關(guān)閉、熱部署和日志文件重新打開(kāi),配置多個(gè)服務(wù)的請(qǐng)求轉(zhuǎn)發(fā)規(guī)則,修改前端API地址,設(shè)置最大上傳文件大小2024-12-12
實(shí)現(xiàn)nginx&php服務(wù)器配置的非主流配置方法
這種方法并非以前所流行的apache 加 php_module 的方式運(yùn)行,我是采用nginx 作為web服務(wù)器,以fastcgi的方式運(yùn)行php2011-05-05
Nginx服務(wù)器添加Systemd自定義服務(wù)過(guò)程解析
這篇文章主要介紹了Nginx服務(wù)器添加Systemd自定義服務(wù)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11
keepalived+nginx高可用實(shí)現(xiàn)方法示例
這篇文章主要介紹了keepalived+nginx高可用實(shí)現(xiàn)方法示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-05-05
nginx部署vue項(xiàng)目,給訪問(wèn)路徑加前綴的實(shí)現(xiàn)
這篇文章主要介紹了nginx部署vue項(xiàng)目,給訪問(wèn)路徑加前綴的實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12
ConfigMap掛載與Subpath在Nginx容器中的應(yīng)用小結(jié)
configmap可以通過(guò)ENV環(huán)境變量和文件兩種方式掛載到容器中,修改configmap后容器中對(duì)應(yīng)的ENV環(huán)境變量不會(huì)更新,將配置文件nginx.conf以configmap文件的方式掛載到容器中,本文介紹ConfigMap掛載與Subpath在Nginx容器中的應(yīng)用小結(jié),感興趣的朋友一起看看吧2024-03-03

