linux內核select/poll,epoll實現與區(qū)別
下面文章在這段時間內研究 select/poll/epoll的內核實現的一點心得體會:
select,poll,epoll都是多路復用IO的函數,簡單說就是在一個線程里,可以同時處理多個文件描述符的讀寫。
select/poll的實現很類似,epoll是從select/poll擴展而來,主要是為了解決select/poll天生的缺陷。
epoll在內核版本2.6以上才出現的新的函數,而他們在linux內核中的實現都是十分相似。
這三種函數都需要設備驅動提供poll回調函數,對于套接字而言,他們是 tcp_poll,udp_poll和datagram_poll;
對于自己開發(fā)的設備驅動而言,是自己實現的poll接口函數。
select實現(2.6的內核,其他版本的內核,應該都相差不多)
應用程序調用select,進入內核調用sys_select,做些簡單初始化工作,接著進入 core_sys_select,
此函數主要工作是把描述符集合從用戶空間復制到內核空間, 最終進入do_select,完成其主要的功能。
do_select里,調用 poll_initwait,主要工作是注冊poll_wait的回調函數為__pollwait,
當在設備驅動的poll回調函數里調用poll_wait,其實就是調用__pollwait,
__pollwait的主要工作是把當前進程掛載到等待隊列里,當等待的事件到來就會喚醒此進程。
接著執(zhí)行for循環(huán),循環(huán)里首先遍歷每個文件描述符,調用對應描述符的poll回調函數,檢測是否就緒,
遍歷完所有描述符之后,只要有描述符處于就緒狀態(tài),信號中斷,出錯或者超時,就退出循環(huán),
否則會調用schedule_xxx函數,讓當前進程睡眠,一直到超時或者有描述符就緒被喚醒。
接著又會再次遍歷每個描述符,調用poll再次檢測。
如此循環(huán),直到符合條件才會退出。
以下是 2.6.31內核的有關select函數的部分片段:
他們調用關系:
select --> sys_select --> core_sys_select --> do_select
int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
ktime_t expire, *to = NULL;
struct poll_wqueues table;
poll_table *wait;
int retval, i, timed_out = 0;
unsigned long slack = 0;
///這里為了獲得集合中的最大描述符,這樣可減少循環(huán)中遍歷的次數。
///也就是為什么linux中select第一個參數為何如此重要了
rcu_read_lock();
retval = max_select_fd(n, fds);
rcu_read_unlock();
if (retval < 0)
return retval;
n = retval;
////初始化 poll_table結構,其中一個重要任務是把 __pollwait函數地址賦值給它,
poll_initwait(&table);
wait = &table.pt;
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
wait = NULL;
timed_out = 1;
}
if (end_time && !timed_out)
slack = estimate_accuracy(end_time);
retval = 0;
///主循環(huán),將會在這里完成描述符的狀態(tài)輪訓
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
const struct file_operations *f_op = NULL;
struct file *file = NULL;
///select中 fd_set 以及 do_select 中的 fd_set_bits 參數,都是按照位來保存描述符,意思是比如申請一個1024位的內存,
///如果第 28位置1,說明此集合有 描述符 28,
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex; // 檢測讀寫異常3個集合中有無描述符
if (all_bits == 0) {
i += __NFDBITS;
continue;
}
for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
int fput_needed;
if (i >= n)
break;
if (!(bit & all_bits))
continue;
file = fget_light(i, &fput_needed); ///通過 描述符 index 獲得 struct file結構指針,
if (file) {
f_op = file->f_op; //通過 struct file 獲得 file_operations,這是操作文件的回調函數集合。
mask = DEFAULT_POLLMASK;
if (f_op && f_op->poll) {
wait_key_set(wait, in, out, bit);
mask = (*f_op->poll)(file, wait); //調用我們的設備中實現的 poll函數,
//因此,為了能讓select正常工作,在我們設備驅動中,必須要提供poll的實現,
}
fput_light(file, fput_needed);
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
wait = NULL; /// 此處包括以下的,把wait設置為NULL,是因為檢測到mask = (*f_op->poll)(file, wait); 描述符已經就緒
/// 無需再把當前進程添加到等待隊列里,do_select 遍歷完所有描述符之后就會退出。
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
wait = NULL;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
wait = NULL;
}
}
}
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
cond_resched();
}
wait = NULL; //已經遍歷完一遍,該加到等待隊列的,都已經加了,無需再加,因此設置為NULL
if (retval || timed_out || signal_pending(current)) //描述符就緒,超時,或者信號中斷就退出循環(huán)
break;
if (table.error) {//出錯退出循環(huán)
retval = table.error;
break;
}
/*
* If this is the first loop and we have a timeout
* given, then we convert to ktime_t and set the to
* pointer to the expiry value.
*/
if (end_time && !to) {
expire = timespec_to_ktime(*end_time);
to = &expire;
}
/////讓進程休眠,直到超時,或者被就緒的描述符喚醒,
if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
to, slack))
timed_out = 1;
}
poll_freewait(&table);
return retval;
}
void poll_initwait(struct poll_wqueues *pwq)
{
init_poll_funcptr(&pwq->pt, __pollwait); //設置poll_table的回調函數為 __pollwait,這樣當我們在驅動中調用poll_wait 就會調用到 __pollwait
........
}
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
poll_table *p)
{
...................
init_waitqueue_func_entry(&entry->wait, pollwake); // 設置喚醒進程調用的回調函數,當在驅動中調用 wake_up喚醒隊列時候,
// pollwake會被調用,這里其實就是調用隊列的默認函數 default_wake_function
// 用來喚醒睡眠的進程。
add_wait_queue(wait_address, &entry->wait); //加入到等待隊列
}
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timespec *end_time)
{
........
//把描述符集合從用戶空間復制到內核空間
if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
.........
ret = do_select(n, &fds, end_time);
.............
////把do_select返回集合,從內核空間復制到用戶空間
if (set_fd_set(n, inp, fds.res_in) ||
set_fd_set(n, outp, fds.res_out) ||
set_fd_set(n, exp, fds.res_ex))
ret = -EFAULT;
............
}
poll的實現跟select基本差不多,按照
poll --> do_sys_poll --> do_poll --> do_pollfd 的調用序列
其中do_pollfd是對每個描述符調用 其回調poll狀態(tài)輪訓。
poll比select的好處就是沒有描述多少限制,select 有1024 的限制,描述符不能超過此值,poll不受限制。
我們從上面代碼分析,可以總結出select/poll天生的缺陷:
1)每次調用select/poll都需要要把描述符集合從用戶空間copy到內核空間,檢測完成之后,又要把檢測的結果集合從內核空間copy到用戶空間
當描述符很多,而且select經常被喚醒,這種開銷會比較大
2)如果說描述符集合來回復制不算什么,那么多次的全部描述符遍歷就比較恐怖了,
我們在應用程序中,每次調用select/poll 都必須首先遍歷描述符,把他們加到fd_set集合里,這是應用層的第一次遍歷,
接著進入內核空間,至少進行一次遍歷和調用每個描述符的poll回調檢測,一般可能是2次遍歷,第一次沒發(fā)現就緒描述符,
加入等待隊列,第二次是被喚醒,接著再遍歷一遍。再回到應用層,我們還必須再次遍歷所有描述符,用 FD_ISSET檢測結果集。
如果描述符很多,這種遍歷就很消耗CPU資源了。
3)描述符多少限制,當然poll沒有限制,select卻有1024的硬性限制,除了修改內核增加1024限制外沒別的辦法。
既然有這么些缺點 ,那不是 select/poll變得一無是處了,那就大錯特錯了。
他們依然是代碼移植的最好函數,因為幾乎所有平臺都有對它們的實現提供接口。
在描述符不是太多,他們依然十分出色的完成多路復用IO,
而且如果每個連接上的描述符都處于活躍狀態(tài),他們的效率其實跟epoll也差不了多少。
曾經使用多個線程+每個線程采用poll的辦法開發(fā)TCP服務器,處理文件收發(fā),連接達到幾千個,
當時的瓶頸已經不在網絡IO,而在磁盤IO了。
我們再來看epoll為了解決select/poll天生的缺陷,是如何實現的。
epoll只是select/poll的擴展,他不是在linux內核中另起爐灶,做顛覆性的設計的,他只是在select的基礎上來解決他們的缺陷。
他的底層依然需要設備驅動提供poll回調來作為狀態(tài)檢測基礎。
epoll分為三個函數 epoll_create,epoll_ctl, epoll_wait 。
他們的實現在 eventpoll.c代碼里。
epoll_create創(chuàng)建epoll設備,用來管理所有添加進去的描述符,epoll_ctl 用來添加新的描述符,修改或者刪除描述符。
epoll_wait等待描述符事件。
epoll_wait的等待已經不再是輪訓方式的等待了,epoll內部有個描述符就緒隊列,epoll_wait只檢測這個隊列即可,
他采用睡眠一會檢測一下的方式,如果發(fā)現描述符就緒隊列不為空,就把此隊列中的描述符copy到用戶空間,然后返回。
描述符就緒隊列里的數據又是從何而來的?
原來使用 epoll_ctl添加新描述符時候,epoll_ctl內核實現里會修改兩個回調函數,
一個是 poll_table結構里的qproc回調函數指針,
在 select中是 __pollwait函數,在epoll中換成 ep_ptable_queue_proc,
當在epoll_ctl中調用新添加的描述符的poll回調時候,底層驅動就會調用 poll_wait添加等待隊列,
底層驅動調用poll_wait時候,
其實就是調用ep_ptable_queue_proc,此函數會修改等待隊列的回調函數為 ep_poll_callback, 并加入到等待隊列頭里;
一旦底層驅動發(fā)現數據就緒,就會調用wake_up喚醒等待隊列,從而 ep_poll_callback將被調用,
在ep_poll_callback中 會把這個就緒的描述符添加到 epoll的描述符就緒隊列里,并同時喚醒 epoll_wait 所在的進程。
如此這般,就是epoll的內核實現的精髓。
看他是如何解決 select/poll的缺陷的, 首先他通過 epoll_ctl的EPOLL_CTL_ADD命令把描述符添加進epoll內部管理器里,
只需添加一次即可,直到用 epoll_ctl的EPOLL_CTL_DEL命令刪除此描述符為止,
而不像select/poll是每次執(zhí)行都必須添加,很顯然大量減少了描述符在內核和用戶空間不斷的來回copy的開銷。
其次雖然 epoll_wait內部也是循環(huán)檢測,但是它只需檢測描述符就緒隊列是否為空即可,
比起select/poll必須輪訓每個描述符的poll,其開銷簡直可以忽略不計。
他同時也沒描述符多少的限制,只要你機器的內存夠大,就能容納非常多的描述符。
以下是 epoll相關部分內核代碼片段:
struct epitem {
/* RB tree node used to link this structure to the eventpoll RB tree */
struct rb_node rbn; // 紅黑樹節(jié)點,
struct epoll_filefd ffd; // 存儲此變量對應的描述符
struct epoll_event event; //用戶定義的結構
/*其他成員*/
};
struct eventpoll {
/*其他成員*/
.......
/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
struct list_head rdllist; ///描述符就緒隊列,掛載的是 epitem結構
/* RB tree root used to store monitored fd structs */
struct rb_root rbr; /// 存儲 新添加的 描述符的紅黑樹根, 此成員用來存儲添加進來的所有描述符。掛載的是epitem結構
.........
};
//epoll_create
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error;
struct eventpoll *ep = NULL;
/*其他代碼*/
......
//分配 eventpoll結構,這個結構是epoll的靈魂,他包含了所有需要處理得數據。
error = ep_alloc(&ep);
if (error < 0)
return error;
error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
flags & O_CLOEXEC); ///打開 eventpoll 的描述符,并把 ep存儲到 file->private_data變量里。
if (error < 0)
ep_free(ep);
return error;
}
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
/*其他代碼*/
.....
ep = file->private_data;
......
epi = ep_find(ep, tfile, fd); ///從 eventpoll的 rbr里查找描述符是 fd 的 epitem,
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tfile, fd); // 在這個函數里添加新描述符,同時修改重要的回調函數。
//同時還調用描述符的poll,查看就緒狀態(tài)
} else
error = -EEXIST;
break;
/*其他代碼*/
........
}
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
..... /*其他代碼*/
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);//設置 poll_tabe回調函數為 ep_ptable_queue_proc
//ep_ptable_queue_proc會設置等待隊列的回調指針為 ep_epoll_callback,同時添加等待隊列。
........ /*其他代碼*/
revents = tfile->f_op->poll(tfile, &epq.pt); //調用描述符的poll回調,在此函數里 ep_ptable_queue_proc會被調用
....... /*其他代碼*/
ep_rbtree_insert(ep, epi); //把新生成關于epitem添加到紅黑樹里
...... /*其他代碼*/
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist); //如果 上邊的poll調用,檢測到描述符就緒,添加本描述符到就緒隊列里。
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
...... /*其他代碼*/
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait); // 如果描述符就緒隊列不為空,則喚醒 epoll_wait所在的進程。
......... /*其他代碼*/
}
//這個函數設置等待隊列回調函數為 ep_poll_callback,
//這樣到底層有數據喚醒等待隊列時候,ep_poll_callback就會被調用,從而把就緒的描述符加到就緒隊列。
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
......... /*其他代碼*/
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist); // 把當前就緒的描述epitem結構添加到就緒隊列里
......... /*其他代碼*/
if (pwake)
ep_poll_safewake(&ep->poll_wait); //如果隊列不為空,喚醒 epoll_wait所在進程
......... /*其他代碼*/
}
epoll_wait內核代碼里主要是調用ep_poll,列出ep_poll部分代碼片段:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;
......... /*其他代碼*/
if (list_empty(&ep->rdllist)) {
init_waitqueue_entry(&wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
__add_wait_queue(&ep->wq, &wait);
// 如果檢測到就緒隊列為空,添加當前進程到等待隊列,并執(zhí)行否循環(huán)
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (!list_empty(&ep->rdllist) || !jtimeout) //如果就緒隊列不為空,或者超時則退出循環(huán)
break;
if (signal_pending(current)) { //如果信號中斷,退出循環(huán)
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
jtimeout = schedule_timeout(jtimeout);//睡眠,知道被喚醒或者超時為止。
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
......... /*其他代碼*/
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
// ep_send_events主要任務是把就緒隊列的就緒描述符copy到用戶空間的 epoll_event數組里,
return res;
}
可以看到 ep_poll既epoll_wait的循環(huán)是相當輕松的循環(huán),他只是簡單檢測就緒隊列而已,因此他的開銷很小。
我們最后看看描述符就緒時候,是如何通知給select/poll/epoll的,以網絡套接字的TCP協(xié)議來進行說明。
tcp協(xié)議對應的 poll回調是tcp_poll, 對應的等待隊列頭是 struct sock結構里 sk_sleep成員,
在tcp_poll中會把 sk_sleep加入到等待隊列,等待數據就緒。
當物理網卡接收到數據包,引發(fā)硬件中斷,驅動在中斷ISR例程里,構建skb包,把數據copy進skb,接著調用netif_rx
把skb掛載到CPU相關的 input_pkt_queue隊列,同時引發(fā)軟中斷,在軟中斷的net_rx_action回調函數里從input_pkt_queue里取出
skb數據包,通過分析,調用協(xié)議相關的回調函數,這樣層層傳遞,一直到struct sock,此結構里的 sk_data_ready回調指針被調用
sk_data_ready指向 sock_def_readable 函數,sock_def_readable函數其實就是 wake_up 喚醒 sock結構里 的 sk_sleep。
以上機制,對 select/poll/epoll都是一樣的,接下來喚醒 sk_sleep方式就不一樣了,因為他們指向了不同的回調函數。
在 select/poll實現中,等待隊列回調函數是 pollwake其實就是調用default_wake_function,喚醒被select阻塞住的進程。
epoll實現中,等待回調函數是 ep_poll_callback, 此回調函數只是把就緒描述符加入到epoll的就緒隊列里。
所以呢 select/poll/epoll其實他們在內核實現中,差別也不是太大,其實都差不多。
epoll雖然效率不錯,可以跟windows平臺中的完成端口比美,但是移植性太差,
目前幾乎就只有l(wèi)inux平臺才實現了epoll而且必須是2.6以上的內核版本。

