深入理解select 模型 (源码分析 )
本篇主要参照 [深入select多路复用内核源码加驱动实现][1] 和 [Linux内核select源码剖析][2]
select
下面这些是用户态下可调用的API函数
# include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
其中以大写的FD_为前缀的函数并非系统调用,而是几个对fd_set进行相关位操作的宏,对应原型定义如下(/usr/include/sys/select.h和/usr/include/bits/select.h):
typedef long int __fd_mask;
# define __NFDBITS (8 * (int) sizeof (__fd_mask))
# define __FD_ELT(d) ((d) / __NFDBITS)
# define __FD_MASK(d) ((__fd_mask) 1 << ((d) % __NFDBITS))
/* fd_set for select and pselect. */
typedef struct
{
/* XPG4.2 requires this member name. Otherwise avoid the name
from the global namespace. */
# ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
# else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
# endif
} fd_set;
/* Maximum number of file descriptors in `fd_set'. */
# define FD_SETSIZE __FD_SETSIZE //这是指定 最大数量的文件描述符
// sys/select.h
# define FD_SET(fd, fdsetp) __FD_SET (fd, fdsetp)
# define FD_CLR(fd, fdsetp) __FD_CLR (fd, fdsetp)
# define FD_ISSET(fd, fdsetp) __FD_ISSET (fd, fdsetp)
# define FD_ZERO(fdsetp) __FD_ZERO (fdsetp)
// bits/select.h
# define __FD_ZERO(fdsp) \
do { \
int __d0, __d1; \
__asm__ __volatile__ ("cld; rep; " __FD_ZERO_STOS \
: "=c" (__d0), "=D" (__d1) \
: "a" (0), "0" (sizeof (fd_set) \
/ sizeof (__fd_mask)), \
"1" (&__FDS_BITS (fdsp)[0]) \
: "memory"); \
} while (0)
//实际调用的宏
# define __FD_SET(d, set) \
((void) (__FDS_BITS (set)[__FD_ELT (d)] |= __FD_MASK (d)))
# define __FD_CLR(d, set) \
((void) (__FDS_BITS (set)[__FD_ELT (d)] &= ~__FD_MASK (d)))
# define __FD_ISSET(d, set) \
((__FDS_BITS (set)[__FD_ELT (d)] & __FD_MASK (d)) != 0)
上面可以看到fd_set结构体的定义实际包含的是fds_bits位数组,其大小固定,由FD_SETSIZE指定(/usr/include/bits/typesizes.h中),在当前内核中数值为1024,可见每次select系统调用可监听处理的文件描述符最大数量为1024。
其中宏定义体中如__FD_SET对应的(void),作用为消除编译器对类型不一致相关的警告,并无其它附加意义。
言归正传,找到用户态下select系统调用入口为为:
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct timeval __user *, tvp)
{
struct timespec end_time, *to = NULL;
struct timeval tv;
int ret;
if (tvp) {
if (copy_from_user(&tv, tvp, sizeof(tv)))
return -EFAULT;
to = &end_time;
if (poll_select_set_timeout(to,
tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
return -EINVAL;
}
ret = core_sys_select(n, inp, outp, exp, to);
ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);
return ret;
}
sys_select
言归正传,找到用户态下select系统调用入口为为:
这里主要做的事情:
- 将用户设置的超时时间从用户态拷贝到内核态
- 对时间进行转换
- 调用core_sys_select
- 将剩余时间拷贝到用户进程时间
# undef __NFDBITS # define __NFDBITS (8 * sizeof(unsigned long)) # undef __FD_SETSIZE # define __FD_SETSIZE 1024 # undef __FDSET_LONGS # define __FDSET_LONGS (__FD_SETSIZE/__NFDBITS) typedef struct { unsigned longfds_bits [__FDSET_LONGS]; //1024个bit。可以看到可以支持1024个描述符 } __kernel_fd_set; //系统调用(内核态) //参数为 maxfd, r_fds, w_fds, e_fds, timeout。 asmlinkage long sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timeval __user *tvp) { s64 timeout = -1; struct timeval tv; int ret; //将超时时间换成jiffies if (tvp) { if (copy_from_user(&tv, tvp, sizeof(tv))) //将用户态参数拷贝到内核态 return -EFAULT; if (tv.tv_sec < 0 || tv.tv_usec < 0) return -EINVAL; /* Cast to u64 to make GCC stop complaining */ if ((u64)tv.tv_sec >= (u64)MAX_INT64_SECONDS) timeout = -1; /* infinite */ else { timeout = ROUND_UP(tv.tv_usec, USEC_PER_SEC/HZ); timeout += tv.tv_sec * HZ; } } // (***) 调用 core_sys_select ret = core_sys_select(n, inp, outp, exp, &timeout); //将剩余时间拷贝回用户空间进程 if (tvp) { struct timeval rtv; if (current->personality & STICKY_TIMEOUTS) //判断当前环境是否支持修改超时时间(不确定) goto sticky; rtv.tv_usec = jiffies_to_usecs(do_div((*(u64*)&timeout), HZ)); rtv.tv_sec = timeout; if (timeval_compare(&rtv, &tv) >= 0) rtv = tv; if (copy_to_user(tvp, &rtv, sizeof(rtv))) { sticky: /* * 如果应用程序将timeval值放在只读存储中, * 我们不希望在成功完成select后引发错误(修改timeval) * 但是,因为没修改timeval,所以我们不能重启这个系统调用。 */ if (ret == -ERESTARTNOHAND) ret = -EINTR; } } return ret; }
core_sys_select
上面的都是时间方面的准备工作 , 对于需要监听的集合的操作是通过core_sys_select 来进行的
//主要的工作在这个函数中完成
staticint core_sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, s64 *timeout)
{
fd_set_bits fds;
/* fd_set_bits 结构如下:
typedef struct {
unsigned long *in, *out, *ex;
unsigned long *res_in, *res_out, *res_ex;
} fd_set_bits;
这个结构体中定义的全是指针,这些指针都是用来指向描述符集合的。
*/
void *bits;
int ret, max_fds;
unsigned int size;
struct fdtable *fdt;
/* Allocate small arguments on the stack to save memory and be faster 先尝试使用栈(因为栈省内存且快速)*/
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)]; // SELECT_STACK_ALLOC=256
//执行后stack_fds= 32 (sizeof(long)=8)
ret = -EINVAL;
if (n < 0) //传入的描述符无效
goto out_nofds;
/* max_fds can increase, so grab it once to avoid race */
//最大描述符是会改变的,加锁避免竞争
rcu_read_lock(); //rcu锁
fdt = files_fdtable(current->files); //读取文件描述符表
/* struct fdtable 结构如下:
struct fdtable {
unsigned int max_fds;
struct file **fd;
...
};
*/
max_fds = fdt->max_fds; //从files结构中获取最大值(当前进程能够处理的最大文件数目)
rcu_read_unlock();
if (n > max_fds)
// 如果传入的n大于当前进程最大的文件描述符,给予修正
n = max_fds;
/* 我们需要使用6倍于最大描述符的描述符个数,
* 分别是in/out/exception(参见fd_set_bits结构体),
* 并且每份有一个输入和一个输出(用于结果返回) */
size = FDS_BYTES(n);// 以一个文件描述符占一bit来计算,传递进来的这些fd_set需要用掉多少个字
bits = stack_fds;
if (size > sizeof(stack_fds) / 6) { // 除以6,因为每个文件描述符需要6个bitmaps上的位。
//栈不能满足,先前的尝试失败,只能使用kmalloc方式
/* Not enough space in on-stack array; must use kmalloc */
ret = -ENOMEM;
bits = kmalloc(6 * size, GFP_KERNEL);
if (!bits)
goto out_nofds;
}
//设置fds
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;
// get_fd_set仅仅调用copy_from_user从用户空间拷贝了fd_se
if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
goto out;
// 对这些存放返回状态的字段清0
zero_fd_set(n, fds.res_in);
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);
// 执行do_select,完成监控功能
ret = do_select(n, &fds, timeout);
if (ret < 0) // 有错误
goto out;
if (!ret) { // 超时返回,无设备就绪
ret = -ERESTARTNOHAND;
if (signal_pending(current))
goto out;
ret = 0;
}
if (set_fd_set(n, inp, fds.res_in) ||
set_fd_set(n, outp, fds.res_out) ||
set_fd_set(n, exp, fds.res_ex))
ret = -EFAULT;
out:
if (bits != stack_fds)
kfree(bits);
out_nofds:
return ret;
}
![core_select.png][3]
do_select
到目前为止,之前所有的准备工作都已经做完了,现在我们需要到真正运行的do_select 中一探究竟.但是在此之前,我们需要知道源码中比较重要的四个结构体:
struct poll_wqueues
struct poll_table_page
struct poll_table_entry
struct poll_table_struct。
每一个调用select()系统调用的应用进程都会存在一个struct poll_wqueues结构体,用来统一辅佐实现这个进程中所有待监测的fd的轮询工作,后面所有的工作和都这个结构体有关,所以它非常重要。
struct poll_wqueues {
poll_table pt;
struct poll_table_page *table;
struct task_struct *polling_task; //保存当前调用select的用户进程struct task_struct结构体
int triggered; // 当前用户进程被唤醒后置成1,以免该进程接着进睡眠
int error; // 错误码
int inline_index; // 数组inline_entries的引用下标
struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};
实际上结构体poll_wqueues内嵌的poll_table_entry数组inline_entries[] 的大小是有限的,如果空间不够用,后续会动态申请物理内存页以链表的形式挂载poll_wqueues.table上统一管理。接下来的两个结构体就和这项内容密切相关:
struct poll_table_page { // 申请的物理页都会将起始地址强制转换成该结构体指针
struct poll_table_page *next; // 指向下一个申请的物理页
struct poll_table_entry *entry; // 指向entries[]中首个待分配(空的) poll_table_entry地址
struct poll_table_entry entries[0]; // 该page页后面剩余的空间都是待分配的poll_table_entry结构体
};
对每一个fd调用fop->poll() => poll_wait() => __pollwait()都会先从poll_wqueues.inline_entries[]中分配一个poll_table_entry结构体,直到该数组用完才会分配物理页挂在链表指针poll_wqueues.table上然后才会分配一个poll_table_entry结构体(poll_get_entry函数)。
poll_table_entry具体用处:函数__pollwait声明如下:
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p);
该函数调用时需要3个参数,第一个是特定fd对应的file结构体指针,第二个就是特定fd对应的硬件驱动程序中的等待队列头指针,第3个是调用select()的应用进程中poll_wqueues结构体的poll_table项(该进程监测的所有fd调用fop->poll函数都用这一个poll_table结构体)。
struct poll_table_entry {
struct file *filp; // 指向特定fd对应的file结构体;
unsigned long key; // 等待特定fd对应硬件设备的事件掩码,如POLLIN、 POLLOUT、POLLERR;
wait_queue_t wait; // 代表调用select()的应用进程,等待在fd对应设备的特定事件 (读或者写)的等待队列头上的等待队列项;
wait_queue_head_t *wait_address; // 设备驱动程序中特定事件的等待队列头(该fd执行fop->poll,需要等待时在哪等,所以叫等待地址);
};
# define POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR)
# define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR)
# define POLLEX_SET (POLLPRI)
int do_select(int n, fd_set_bits *fds, s64 *timeout)
{
struct poll_wqueues table;
/*
struct poll_wqueues {
poll_table pt;
struct poll_table_page *table;
struct task_struct *polling_task; //保存当前调用select的用户进程struct task_struct结构体
int triggered; // 当前用户进程被唤醒后置成1,以免该进程接着进睡眠
int error; // 错误码
int inline_index; // 数组inline_entries的引用下标
struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};
*/
poll_table *wait;
int retval, i;
rcu_read_lock();
//根据已经设置好的fd位图检查用户打开的fd, 要求对应fd必须打开, 并且返回最大的fd。
retval = max_select_fd(n, fds);
rcu_read_unlock();
if (retval < 0)
return retval;
n = retval;
/* 一些重要的初始化:
poll_wqueues.poll_table.qproc函数指针初始化,
该函数是驱动程序中poll函数(fop->poll)实现中必须要调用的poll_wait()中使用的函数。 */
poll_initwait(&table);
/*
void poll_initwait(struct poll_wqueues *pwq)
{
// 设置回调函数 __pollwait
init_poll_funcptr(&pwq->pt, __pollwait);
pwq->error = 0;
pwq->table = NULL;
pwq->inline_index = 0;
}
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
pt->qproc = qproc;
}
这个函数实现很关键,这里 init_poll_funcptr 初始化回调函数为 __pollwait, 后面轮询会回调这个函数,然后通过这个函数把进程添加到对应的监听文件等待队列,当有事件到来时,就会唤醒这个进程。
*/
wait = &table.pt; //这里把 wait 设置为这个回调函数的指针
if (!*timeout)//当 timeout=0 的时候把wait设置为 0,这个可以实现非阻塞的功能
wait = NULL;
retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
long __timeout;
set_current_state(TASK_INTERRUPTIBLE);
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
// 所有n个fd的循环
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
const struct file_operations *f_op = NULL;
struct file *file = NULL;
// 先取出当前循环周期中的32(设long占32位)个文件描述符对应的bitmaps
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;// 组合一下,有的fd可能只监测读,或者写,或者err,或者同时都监测
if (all_bits == 0) {
i += __NFDBITS; //如果这个字没有待查找的描述符, 跳过这个长字(32位,__NFDBITS=32),取下一个32个fd的循环中
continue;
}
// 本次32个fd的循环中有需要监测的状态存在
for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
int fput_needed;
if (i >= n)
break;
if (!(bit & all_bits)) // bit每次循环后左移一位的作用在这里,用来跳过没有状态监测的fd
continue;
file = fget_light(i, &fput_needed);//得到file结构指针,并增加引用计数字段f_count
if (file) {// 如果file存在(这个文件描述符对应的文件确实打开了)
f_op = file->f_op;
mask = DEFAULT_POLLMASK;
// 这里会调用 struct file*实现的poll函数进行轮询
if (f_op && f_op->poll) //这个文件对应的驱动程序提供了poll函数(fop->poll)。
mask = (*f_op->poll)(file, retval ? NULL : wait);//调用驱动程序中的poll函数。
/* 调用驱动程序中的poll函数,以evdev驱动中的evdev_poll()为例
* 该函数会调用函数poll_wait(file, &evdev->wait, wait),
* 继续调用__pollwait()回调来分配一个poll_table_entry结构体,
* 该结构体有一个内嵌的等待队列项,
* 设置好wake时调用的回调函数后将其添加到驱动程序中的等待队列头中。 */
fput_light(file, fput_needed); // 释放file结构指针,实际就是减小他的一个引用计数字段f_count。
//记录结果。poll函数返回的mask是设备的状态掩码。
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit; //如果是这个描述符可读, 将这个位置位
retval++; //返回描述符个数加1
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
}
}
/*
* cond_resched()将判断是否有进程需要抢占当前进程,
* 如果是将立即发生调度,这只是为了增加强占点。
* (给其他紧急进程一个机会去执行,增加了实时性)
* 在支持抢占式调度的内核中(定义了CONFIG_PREEMPT),
* cond_resched是空操作。
*/
cond_resched();
}
//返回结果
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
}
wait = NULL;
if (retval || !*timeout || signal_pending(current)) // signal_pending(current)检查当前进程是否有信号要处理
break;
if(table.error) {
retval = table.error;
break;
}
if (*timeout < 0) {
/* Wait indefinitely 无限期等待*/
__timeout = MAX_SCHEDULE_TIMEOUT;
} elseif (unlikely(*timeout >= (s64)MAX_SCHEDULE_TIMEOUT - 1)) {
/* Wait for longer than MAX_SCHEDULE_TIMEOUT. Do it in a loop */
__timeout = MAX_SCHEDULE_TIMEOUT - 1;
*timeout -= __timeout;
} else {
__timeout = *timeout;
*timeout = 0;
}
/* schedule_timeout 用来让出CPU;
* 在指定的时间用完以后或者其它事件到达并唤醒进程(比如接收了一个信号量)时,
* 该进程才可以继续运行 */
__timeout = schedule_timeout(__timeout);
if (*timeout >= 0)
*timeout += __timeout;
}
__set_current_state(TASK_RUNNING);
poll_freewait(&table);
return retval;
}
![do_select.png][4]
引用自论[select的实现][5]
上面的实现看起来一上来就是轮询,假设所有的文件描述符都没有数据可以读写,会怎么样呢? 理论上应该需要监听所有的文件描述符,当有其中一个有数据到来时就唤醒进程。那么哪个地方把进程添加到文件描述符对应的监听列表呢?
对于 tcp 的文件描述符, 他的 poll 函数实现(跟我们上面的poll不是一个东西),调用了sock_poll, 里面又调用了 tcp_poll, tcp_poll 里面会执行下面步骤:
调用 poll_wait
查询当前文件描述符的状态,是否可读写。
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
...
poll_wait(file, sk->sk_sleep, wait);
...
}
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && wait_address)
p->qproc(filp, wait_address, p);
}
回到上面的问题,就是通过 poll_wait 函数把当前进程挂到 sk->sk_sleep 这个队列里面来,当有读写事件到来时,就会唤醒这个队列里面的进程,让他们重新运行,来轮询读写数据。
而这个 qproc 函数就是上面 poll_initwait 函数初始化的回调函数 __pollwait。
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
poll_table *p)
{
struct poll_table_entry *entry = poll_get_entry(p);
if (!entry)
return;
get_file(filp);
entry->filp = filp;
entry->wait_address = wait_address;
init_waitqueue_entry(&entry->wait, current);
// 新添加的进程添加到设备事件触发等待队列
add_wait_queue(wait_address, &entry->wait);
}
这个函数实现就是把当前进程挂到 sk->sleep 队列。 上面还可以看到调用回调函数的条件必须是 wait_address 和 p 不为空,也就是说如果不想把文件描述挂到监听列表,只需要把 p 这个参数设置为 NULL.
因为进程挂到某一个文件的监听列表,只要挂一次即可。所以第一次循环之后把所有的文件描述符挂一遍之后就会把这个参数设置为 NULL。
还有两种情况也会把这个参数设置为 NULL:
1、 已经有文件描述符有读写事件,不需要再挂,因为一遍轮询就会退出,取消所有监听。
if (f_op && f_op->poll)
mask = (*f_op->poll)(file, retval ? NULL : wait);
2、 最上面提到的, timeout = 0, 会把 wait 参数为 NULL, 即不会有挂载行为,轮询一遍立即返回,不等待事件到来,类似 non-blocking 的效果。
总结
通过上面的叙述,我们能看到每一次轮询调用select都需要将 集合 从用户态拷贝到内核态中,当有时间发生的时候又需要再拷贝回来.当监听的数量多的时候这样效率极低,同时也需要重新将进程挂载到监听的文件描述符中.
同时返回回来的时候并不知道有哪些事件有响应需要再重新轮询一遍.再次降低了效率.
EPOLL 的改进
进程挂载到监听的文件描述符只会挂一次。
用专门的链表来存储有事件到来的文件描述符,返回给用户进程,只需要拷贝这个链表。
使用红黑树来管理文件描述符,可以做到快速添加监听的文件描述符。
惊群效应
简言之,惊群现象就是多进程(多线程)在同时阻塞等待同一个事件的时候(休眠状态),如果等待的这个事件发生,那么他就会唤醒等待的所有进程(或者线程),但是最终却只可能有一个进程(线程)获得这个时间的“控制权”,对该事件进行处理,而其他进程(线程)获取“控制权”失败,只能重新进入休眠状态,这种现象和性能浪费就叫做惊群。
这是因为这些进程都处在同一个监听等待队列中,当有事件响应的时候,内核会通知这个队列并唤醒队列中的所有进程。
惊群效应到底消耗了什么?
- 系统对用户进程/线程频繁地做无效的调度,上下文切换系统性能大打折扣。
- 为了确保只有一个进程得到资源,用户必须对资源操作进行加锁保护,进一步加大了系统开销。
是不是还是觉得不够深入,概念化?看下面:
1、上下文切换(context switch)过高会导致cpu像个搬运工,频繁地在寄存器和运行队列之间奔波,更多的时间花在了进程(线程)切换,而不是在真正工作的进程(线程)上面。直接的消耗包括cpu寄存器要保存和加载(例如程序计数器)、系统调度器的代码需要执行。间接的消耗在于多核cache之间的共享数据。
看一下:wiki上下文切换
2、通过锁机制解决惊群效应是一种方法,在任意时刻只让一个进程(线程)处理等待的事件。但是锁机制也会造成cpu等资源的消耗和性能损耗。目前一些常见的服务器软件有的是通过锁机制解决的,比如nginx(它的锁机制是默认开启的,可以关闭);还有些认为惊群对系统性能影响不大,没有去处理,比如lighttpd。
[1]: https://my.oschina.net/fileoptions/blog/911091?tdsourcetag=s_pctim_aiomsg# h1_1
[2]: http://www.pandademo.com/2016/11/linux-kernel-select-source-dissect/?tdsourcetag=s_pctim_aiomsg
[3]: https://banthink.com/usr/uploads/2020/02/2227498139.png
[4]: https://banthink.com/usr/uploads/2020/02/1227669105.png
[5]: http://www.hulkdev.com/posts/select-io