本文基于linux 3.10.104 x86_64内核,对select相关系统调用的源码进行分析,对应源码位于fs/select.c。
用户态下select系统调用相关函数原型如下:
#include <sys/select.h> int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); void FD_CLR(int fd, fd_set *set); int FD_ISSET(int fd, fd_set *set); void FD_SET(int fd, fd_set *set); void FD_ZERO(fd_set *set);
其中以大写的FD_为前缀的函数并非系统调用,而是几个对fd_set进行相关位操作的宏,对应原型定义如下(/usr/include/sys/select.h和/usr/include/bits/select.h):
typedef long int __fd_mask; #define __NFDBITS (8 * (int) sizeof (__fd_mask)) #define __FD_ELT(d) ((d) / __NFDBITS) #define __FD_MASK(d) ((__fd_mask) 1 << ((d) % __NFDBITS)) /* fd_set for select and pselect. */ typedef struct { /* XPG4.2 requires this member name. Otherwise avoid the name from the global namespace. */ #ifdef __USE_XOPEN __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]; # define __FDS_BITS(set) ((set)->fds_bits) #else __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS]; # define __FDS_BITS(set) ((set)->__fds_bits) #endif } fd_set; /* Maximum number of file descriptors in `fd_set'. */ #define FD_SETSIZE __FD_SETSIZE // sys/select.h #define FD_SET(fd, fdsetp) __FD_SET (fd, fdsetp) #define FD_CLR(fd, fdsetp) __FD_CLR (fd, fdsetp) #define FD_ISSET(fd, fdsetp) __FD_ISSET (fd, fdsetp) #define FD_ZERO(fdsetp) __FD_ZERO (fdsetp) // bits/select.h # define __FD_ZERO(fdsp) \ do { \ int __d0, __d1; \ __asm__ __volatile__ ("cld; rep; " __FD_ZERO_STOS \ : "=c" (__d0), "=D" (__d1) \ : "a" (0), "0" (sizeof (fd_set) \ / sizeof (__fd_mask)), \ "1" (&__FDS_BITS (fdsp)[0]) \ : "memory"); \ } while (0) #define __FD_SET(d, set) \ ((void) (__FDS_BITS (set)[__FD_ELT (d)] |= __FD_MASK (d))) #define __FD_CLR(d, set) \ ((void) (__FDS_BITS (set)[__FD_ELT (d)] &= ~__FD_MASK (d))) #define __FD_ISSET(d, set) \ ((__FDS_BITS (set)[__FD_ELT (d)] & __FD_MASK (d)) != 0)
上面可以看到fd_set结构体的定义实际包含的是fds_bits位数组,其大小固定,由FD_SETSIZE指定(/usr/include/bits/typesizes.h中),在当前内核中数值为1024,可见每次select系统调用可监听处理的文件描述符最大数量为1024。
其中宏定义体中如__FD_SET对应的(void),作用为消除编译器对类型不一致相关的警告,并无其它附加意义。
言归正传,找到用户态下select系统调用入口为为:
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp, fd_set __user *, exp, struct timeval __user *, tvp) { struct timespec end_time, *to = NULL; struct timeval tv; int ret; if (tvp) { if (copy_from_user(&tv, tvp, sizeof(tv))) return -EFAULT; to = &end_time; if (poll_select_set_timeout(to, tv.tv_sec + (tv.tv_usec / USEC_PER_SEC), (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC)) return -EINVAL; } ret = core_sys_select(n, inp, outp, exp, to); ret = poll_select_copy_remaining(&end_time, tvp, 1, ret); return ret; }
首先在指定超时的情况下会使用copy_from_user将用户空间下的tvpx相对超时拷贝到内核空间下的tv(timeval微秒精度),并通过poll_select_set_timeout设定成绝对的超时(timespec纳秒精度)。
copy_from_user,如何高效的拷贝,这个函数实现也挺有意思,除了一些必要的检查看核心代码:
// include/asm-generic/uaccess.h static inline __must_check long __copy_to_user(void __user *to, const void *from, unsigned long n) { if (__builtin_constant_p(n)) { switch(n) { case 1: *(u8 __force *)to = *(u8 *)from; return 0; case 2: *(u16 __force *)to = *(u16 *)from; return 0; case 4: *(u32 __force *)to = *(u32 *)from; return 0; #ifdef CONFIG_64BIT case 8: *(u64 __force *)to = *(u64 *)from; return 0; #endif default: break; } } memcpy((void __force *)to, from, n); return 0; }
对于待拷贝的数值若为编译期常量直接o(1)常数强转赋值,否则直接memcpy。
poll_select_copy_remaining则用于拷贝更新此次调用完成剩余的时间差值。
下面看core_sys_select,这个真正的执行入口:
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timespec *end_time) { fd_set_bits fds; void *bits; int ret, max_fds; unsigned int size; struct fdtable *fdt; /* Allocate small arguments on the stack to save memory and be faster */ long stack_fds[SELECT_STACK_ALLOC/sizeof(long)]; ret = -EINVAL; if (n < 0) goto out_nofds; /* max_fds can increase, so grab it once to avoid race */ rcu_read_lock(); fdt = files_fdtable(current->files); max_fds = fdt->max_fds; rcu_read_unlock(); if (n > max_fds) n = max_fds; /* * We need 6 bitmaps (in/out/ex for both incoming and outgoing), * since we used fdset we need to allocate memory in units of * long-words. */ size = FDS_BYTES(n); bits = stack_fds; if (size > sizeof(stack_fds) / 6) { /* Not enough space in on-stack array; must use kmalloc */ ret = -ENOMEM; bits = kmalloc(6 * size, GFP_KERNEL); if (!bits) goto out_nofds; } fds.in = bits; fds.out = bits + size; fds.ex = bits + 2*size; fds.res_in = bits + 3*size; fds.res_out = bits + 4*size; fds.res_ex = bits + 5*size; if ((ret = get_fd_set(n, inp, fds.in)) || (ret = get_fd_set(n, outp, fds.out)) || (ret = get_fd_set(n, exp, fds.ex))) goto out; zero_fd_set(n, fds.res_in); zero_fd_set(n, fds.res_out); zero_fd_set(n, fds.res_ex); ret = do_select(n, &fds, end_time); if (ret < 0) goto out; if (!ret) { ret = -ERESTARTNOHAND; if (signal_pending(current)) goto out; ret = 0; } if (set_fd_set(n, inp, fds.res_in) || set_fd_set(n, outp, fds.res_out) || set_fd_set(n, exp, fds.res_ex)) ret = -EFAULT; out: if (bits != stack_fds) kfree(bits); out_nofds: return ret; }
主要做些很重要的准备工作,尽可能实现高效:
1> 定义一个SELECT_STACK_ALLOC(256字节)大小的栈上数组用于高效处理传入以及待传出的可读、可写及异常文件描述符集合,空间可能不够使用。
2> 基于current宏检查传入的最大fd对应参数n是否超出当前进程打开的文件描述符表内所示位图容量的max_fds数值(位数),基于使用位图结构也就很容易理解为何select调用的第一个参数是传入的待监听fd的最大值加1。
3> 栈上数组空间不足以存放本次select要处理的fd集合所需总计内存,则使用kmalloc(基于slab)从内核空间分配所需的连续物理内存。
4> 依次使用get_fd_set拷贝待监听的可读、可写及异常事件对应的文件描述符集合,可见每次select调用都需要从用户空间拷贝传入的文件描述符集合到内核空间。
5> 清空(zero)待传出的处理结果对应的文件描述符集合,用来存放本次select调用的结果。
提前看下收尾工作,select真正执行完成后:
1> 依次使用set_fd_set拷贝各事件处理结果集合到对应传入的三个事件集合,可见每次select调用还要将处理结果从内核空间拷贝回用户空间下。
2> 因select返回时最终结果事件集合会拷贝到传入的各事件初始集合(实际在调用前的准备工作也对待传出结果集合进行了清空,原始待监听各集合不可作为下次调用时复用),所以每次select调用前都需要清空(FD_ZERO)传入的fd事件集合。
3> 如果传入的文件描述符比较大,超出栈上分配的内存导致从内核空间分配了所需内存,则释放该内核对应内存。
到了本文最关键的部分,select调用最核心的函数do_select实现:
int do_select(int n, fd_set_bits *fds, struct timespec *end_time) { ktime_t expire, *to = NULL; struct poll_wqueues table; poll_table *wait; int retval, i, timed_out = 0; unsigned long slack = 0; rcu_read_lock(); retval = max_select_fd(n, fds); rcu_read_unlock(); if (retval < 0) return retval; n = retval; poll_initwait(&table); wait = &table.pt; if (end_time && !end_time->tv_sec && !end_time->tv_nsec) { wait->_qproc = NULL; timed_out = 1; } if (end_time && !timed_out) slack = select_estimate_accuracy(end_time); retval = 0; for (;;) { unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp; inp = fds->in; outp = fds->out; exp = fds->ex; rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex; for (i = 0; i < n; ++rinp, ++routp, ++rexp) { unsigned long in, out, ex, all_bits, bit = 1, mask, j; unsigned long res_in = 0, res_out = 0, res_ex = 0; in = *inp++; out = *outp++; ex = *exp++; all_bits = in | out | ex; if (all_bits == 0) { i += BITS_PER_LONG; continue; } for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) { struct fd f; if (i >= n) break; if (!(bit & all_bits)) continue; f = fdget(i); if (f.file) { const struct file_operations *f_op; f_op = f.file->f_op; mask = DEFAULT_POLLMASK; if (f_op && f_op->poll) { wait_key_set(wait, in, out, bit); mask = (*f_op->poll)(f.file, wait); } fdput(f); if ((mask & POLLIN_SET) && (in & bit)) { res_in |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLOUT_SET) && (out & bit)) { res_out |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLEX_SET) && (ex & bit)) { res_ex |= bit; retval++; wait->_qproc = NULL; } } } if (res_in) *rinp = res_in; if (res_out) *routp = res_out; if (res_ex) *rexp = res_ex; cond_resched(); } wait->_qproc = NULL; if (retval || timed_out || signal_pending(current)) break; if (table.error) { retval = table.error; break; } /* * If this is the first loop and we have a timeout * given, then we convert to ktime_t and set the to * pointer to the expiry value. */ if (end_time && !to) { expire = timespec_to_ktime(*end_time); to = &expire; } if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack)) timed_out = 1; } poll_freewait(&table); return retval; }
先来看第9-15行,max_select_fd借助当前进程已打开的文件描述符表检查传入且合法的已打开最大fd,并修正传入的n。
第17行很重要的poll_initwait以及几个关键的数据结构,这部分实际与设备驱动相关挂载等待队列以及就绪唤醒:
void poll_initwait(struct poll_wqueues *pwq) { init_poll_funcptr(&pwq->pt, __pollwait); pwq->polling_task = current; pwq->triggered = 0; pwq->error = 0; pwq->table = NULL; pwq->inline_index = 0; } /* Add a new entry */ static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) { struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); struct poll_table_entry *entry = poll_get_entry(pwq); if (!entry) return; entry->filp = get_file(filp); entry->wait_address = wait_address; entry->key = p->_key; init_waitqueue_func_entry(&entry->wait, pollwake); entry->wait.private = pwq; add_wait_queue(wait_address, &entry->wait); } struct poll_table_page { struct poll_table_page * next; struct poll_table_entry * entry; struct poll_table_entry entries[0]; }; // include/linux/kernel.h /** * container_of - cast a member of a structure out to the containing structure * @ptr: the pointer to the member. * @type: the type of the container struct this is embedded in. * @member: the name of the member within the struct. * */ #define container_of(ptr, type, member) ({ \ const typeof( ((type *)0)->member ) *__mptr = (ptr); \ (type *)( (char *)__mptr - offsetof(type,member) );}) // include/linux/stddef.h #undef offsetof #ifdef __compiler_offsetof #define offsetof(TYPE,MEMBER) __compiler_offsetof(TYPE,MEMBER) #else #define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER) #endif // include/linux/poll.h static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc) { pt->_qproc = qproc; pt->_key = ~0UL; /* all events enabled */ } static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p && p->_qproc && wait_address) p->_qproc(filp, wait_address, p); } typedef struct poll_table_struct { poll_queue_proc _qproc; unsigned long _key; } poll_table; struct poll_table_entry { struct file *filp; unsigned long key; wait_queue_t wait; wait_queue_head_t *wait_address; }; /* * Structures and helpers for select/poll syscall */ struct poll_wqueues { poll_table pt; struct poll_table_page *table; struct task_struct *polling_task; int triggered; int error; int inline_index; struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; }; // include/linux/wait.h typedef struct __wait_queue wait_queue_t; struct __wait_queue { unsigned int flags; #define WQ_FLAG_EXCLUSIVE 0x01 void *private; wait_queue_func_t func; struct list_head task_list; };
上面第15行container_of宏用于根据成员地址获取结构体的地址,其实现参见第41行,其中借助了offsetof宏,用来判断成员在结构体中的偏移位置,其实现参加第45行。这两个宏在内核代码中很常见,很有用,实现也很巧妙。
poll_initwait初始化poll_wqueues结构体table(do_select第4行),这一结构体用于本次select调用对所有传入的待监听fd进行轮询工作,每个fd对应一个poll_table_entry。
这个核心结构体poll_wqueues的定义见第上面81行,分配了N_INLINE_POLL_ENTRIES((832-256)/64=9)个poll_table_entry,传入fd超过该数值时后续会在注册的__pollwait函数中在内核页上扩充。
上面第54行的init_poll_funcptr则用于注册初始化poll函数指针__pollwait,在__pollwait中完成poll_table_entry的初始化及栈上空间不够用时扩充poll_table。
这里的实现看起来有些绕,到底是如何串联起来呢?
为方便理解将当前进程(current)挂在各个文件描述符的等待队列进行监听轮询,来看下几个fd相关的关键数据结构:
// include/linux/fs.h
struct file_operations {
struct module *owner;
loff_t (*llseek) (struct file *, loff_t, int);
ssize_t (*read) (struct file *, char __user *, size_t, loff_t *);
ssize_t (*write) (struct file *, const char __user *, size_t, loff_t *);
ssize_t (*aio_read) (struct kiocb *, const struct iovec *, unsigned long, loff_t);
ssize_t (*aio_write) (struct kiocb *, const struct iovec *, unsigned long, loff_t);
int (*readdir) (struct file *, void *, filldir_t);
unsigned int (*poll) (struct file *, struct poll_table_struct *);
long (*unlocked_ioctl) (struct file *, unsigned int, unsigned long);
long (*compat_ioctl) (struct file *, unsigned int, unsigned long);
int (*mmap) (struct file *, struct vm_area_struct *);
int (*open) (struct inode *, struct file *);
int (*flush) (struct file *, fl_owner_t id);
int (*release) (struct inode *, struct file *);
int (*fsync) (struct file *, loff_t, loff_t, int datasync);
int (*aio_fsync) (struct kiocb *, int datasync);
int (*fasync) (int, struct file *, int);
int (*lock) (struct file *, int, struct file_lock *);
ssize_t (*sendpage) (struct file *, struct page *, int, size_t, loff_t *, int);
unsigned long (*get_unmapped_area)(struct file *, unsigned long, unsigned long, unsigned long, unsigned long);
int (*check_flags)(int);
int (*flock) (struct file *, int, struct file_lock *);
ssize_t (*splice_write)(struct pipe_inode_info *, struct file *, loff_t *, size_t, unsigned int);
ssize_t (*splice_read)(struct file *, loff_t *, struct pipe_inode_info *, size_t, unsigned int);
int (*setlease)(struct file *, long, struct file_lock **);
long (*fallocate)(struct file *file, int mode, loff_t offset,
loff_t len);
int (*show_fdinfo)(struct seq_file *m, struct file *f);
};
struct file {
union {
struct llist_node fu_llist;
struct rcu_head fu_rcuhead;
} f_u;
struct path f_path;
#define f_dentry f_path.dentry
struct inode *f_inode; /* cached value */
const struct file_operations *f_op;
/*
* Protects f_ep_links, f_flags, f_pos vs i_size in lseek SEEK_CUR.
* Must not be taken from IRQ context.
*/
spinlock_t f_lock;
atomic_long_t f_count;
unsigned int f_flags;
fmode_t f_mode;
loff_t f_pos;
struct fown_struct f_owner;
const struct cred *f_cred;
struct file_ra_state f_ra;
u64 f_version;
#ifdef CONFIG_SECURITY
void *f_security;
#endif
/* needed for tty driver, and maybe others */
void *private_data;
#ifdef CONFIG_EPOLL
/* Used by fs/eventpoll.c to link all the hooks to this file */
struct list_head f_ep_links;
struct list_head f_tfile_llink;
#endif /* #ifdef CONFIG_EPOLL */
struct address_space *f_mapping;
#ifdef CONFIG_DEBUG_WRITECOUNT
unsigned long f_mnt_write_state;
#endif
};
// include/linux/file.h
struct fd {
struct file *file;
int need_put;
};
回来看do_select函数源码中第58行调用了poll函数,实际对应本段上面第10行加粗的file_operations结构体的函数指针成员poll,不同的fd在创建时会将该成员进行注册初始化(以C99语法里不要求顺序的结构体指定初始化方式),例如socket对应的注册为sock_poll(net/socket.c),pipe对应注册为pipe_poll(fs/pipe.c),普通file则并未注册(例如ext4或xfs),这里实际很像面向对象里的多态行为,linux下一切皆文件,抽象为结构体file,如上第41行提供了file_operations成员指针,fd提供了file_operations统一的操作接口(函数指针),而创建时进行注册调用的真正操作,非常简单清晰。
以socket为例,我们看这个当前进程如何挂在等待队列并对fd轮询检测。
上面已分析过poll_initwait完成了初始化工作,在do_select第28行的第一层无穷循环开始轮询事件监测,第34行开始对传入的n个描述符,以BITS_PER_LONG个为一组依次挂载到等待队列,并对事件进行检测,如果没有事件到来,仅有第一次循环完成挂载,后续循环只监测事件。核心调用见第58行,上面段落分析过了poll函数指针,在这里会根据fd的不同创建类别调用真正的poll函数,socket下对应是sock_poll,如ipv4/tcp下会继续调用tcp_poll,在这里完成调用poll_table注册的函数指针__poll_wait挂载等待队列操作(实际借助poll_wait封装调用),之后完成检测操作获取事件mask结果。
do_select第61-75行,则把获取的结果mask依次写到待传出的fd事件集合。
do_select第64,69,74,86行,保证仅在第一次循环时,完成本次fd对应挂载等待队列,不论是否收到设备事件通知,本次调用仅挂载一次,因此置空poll_table注册的poll。
do_select第109行,释放poll_table。
由上述可见,每次select调用都要轮询完成所有fd的挂载等待队列及事件监测。
当待监听的fd数量比较少时,select还是比较高效的,性能未见明显差异。当待处理的fd数量非常大,十万级甚至百万级时,下篇中的epoll就体现出优势来了。
refer:
1. http://www.chongh.wiki/blog/2016/04/08/linux-syscalls/
2. http://blog.csdn.net/qq_33921804/article/details/53454203
3. http://www.cnblogs.com/wuchanming/p/3780058.html
4. http://blog.csdn.net/lizhiguo0532/article/details/6568968
5. http://www.embeddedlinux.org.cn/html/yingjianqudong/201405/11-2860.html
6. https://www.kancloud.cn/kancloud/ldd3/60979
7. http://www.hulkdev.com/posts/select-io
select 选择符大小定义同__NFDBITS有关系:__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
其中__FD_SETSIZE在内核中定义为1024,但是__NFDBITS在x86_64架构下通过如下程序可以验证其值为64,因此select选择符的最大数目为16.
typedef long int __fd_mask;
#define __NFDBITS (8 * (int) sizeof (__fd_mask))
#define __FD_ELT(d) ((d) / __NFDBITS)
#define __FD_MASK(d) ((__fd_mask) 1 << ((d) % __NFDBITS))
int main(){
printf("size of __NFDBITS: %d\n", __NFDBITS);
return 0;
}
__NFDBITS 在 x86_64 下确实是64(__fd_mask 为 long int),但该数组的使用是以 bit 来统计监听的文件描述符。
你再想想?
wait_key_set(wait, in, out, bit) 博主你好像没有放这个函数
wait_key_set(wait, in, out, bit) 博主 你没放这个函数
wait_key_set(wait, in, out, bit)