Linux内核select源码剖析

本文基于linux 3.10.104 x86_64内核,对select相关系统调用的源码进行分析,对应源码位于fs/select.c。

用户态下select系统调用相关函数原型如下:

#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, 
           fd_set *exceptfds, struct timeval *timeout);
void FD_CLR(int fd, fd_set *set);
int  FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

其中以大写的FD_为前缀的函数并非系统调用,而是几个对fd_set进行相关位操作的宏,对应原型定义如下(/usr/include/sys/select.h和/usr/include/bits/select.h):

typedef long int __fd_mask;

#define __NFDBITS       (8 * (int) sizeof (__fd_mask))
#define __FD_ELT(d)     ((d) / __NFDBITS)
#define __FD_MASK(d)    ((__fd_mask) 1 << ((d) % __NFDBITS))

/* fd_set for select and pselect.  */
 typedef struct
   {
     /* XPG4.2 requires this member name.  Otherwise avoid the name
        from the global namespace.  */
 #ifdef __USE_XOPEN
     __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
 # define __FDS_BITS(set) ((set)->fds_bits)
 #else
     __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
 # define __FDS_BITS(set) ((set)->__fds_bits)
 #endif
   } fd_set;

/* Maximum number of file descriptors in `fd_set'.  */
#define FD_SETSIZE              __FD_SETSIZE

// sys/select.h
#define FD_SET(fd, fdsetp)      __FD_SET (fd, fdsetp)
#define FD_CLR(fd, fdsetp)      __FD_CLR (fd, fdsetp)
#define FD_ISSET(fd, fdsetp)    __FD_ISSET (fd, fdsetp)
#define FD_ZERO(fdsetp)         __FD_ZERO (fdsetp)

// bits/select.h
# define __FD_ZERO(fdsp) \
  do {                                                                        \
    int __d0, __d1;                                                           \
    __asm__ __volatile__ ("cld; rep; " __FD_ZERO_STOS                         \
                          : "=c" (__d0), "=D" (__d1)                          \
                          : "a" (0), "0" (sizeof (fd_set)                     \
                                          / sizeof (__fd_mask)),              \
                            "1" (&__FDS_BITS (fdsp)[0])                       \
                          : "memory");                                        \
  } while (0)

#define __FD_SET(d, set) \
   ((void) (__FDS_BITS (set)[__FD_ELT (d)] |= __FD_MASK (d)))
#define __FD_CLR(d, set) \
   ((void) (__FDS_BITS (set)[__FD_ELT (d)] &= ~__FD_MASK (d)))
#define __FD_ISSET(d, set) \
   ((__FDS_BITS (set)[__FD_ELT (d)] & __FD_MASK (d)) != 0)

上面可以看到fd_set结构体的定义实际包含的是fds_bits位数组,其大小固定,由FD_SETSIZE指定(/usr/include/bits/typesizes.h中),在当前内核中数值为1024,可见每次select系统调用可监听处理的文件描述符最大数量为1024
其中宏定义体中如__FD_SET对应的(void),作用为消除编译器对类型不一致相关的警告,并无其它附加意义。

言归正传,找到用户态下select系统调用入口为为:

SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
		fd_set __user *, exp, struct timeval __user *, tvp)
{
	struct timespec end_time, *to = NULL;
	struct timeval tv;
	int ret;

	if (tvp) {
		if (copy_from_user(&tv, tvp, sizeof(tv)))
			return -EFAULT;

		to = &end_time;
		if (poll_select_set_timeout(to,
				tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
				(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
			return -EINVAL;
	}

	ret = core_sys_select(n, inp, outp, exp, to);
	ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);

	return ret;
}

首先在指定超时的情况下会使用copy_from_user将用户空间下的tvpx相对超时拷贝到内核空间下的tv(timeval微秒精度),并通过poll_select_set_timeout设定成绝对的超时(timespec纳秒精度)。
copy_from_user,如何高效的拷贝,这个函数实现也挺有意思,除了一些必要的检查看核心代码:

// include/asm-generic/uaccess.h
static inline __must_check long __copy_to_user(void __user *to,
		const void *from, unsigned long n)
{
	if (__builtin_constant_p(n)) {
		switch(n) {
		case 1:
			*(u8 __force *)to = *(u8 *)from;
			return 0;
		case 2:
			*(u16 __force *)to = *(u16 *)from;
			return 0;
		case 4:
			*(u32 __force *)to = *(u32 *)from;
			return 0;
#ifdef CONFIG_64BIT
		case 8:
			*(u64 __force *)to = *(u64 *)from;
			return 0;
#endif
		default:
			break;
		}
	}

	memcpy((void __force *)to, from, n);
	return 0;
}

对于待拷贝的数值若为编译期常量直接o(1)常数强转赋值,否则直接memcpy。

poll_select_copy_remaining则用于拷贝更新此次调用完成剩余的时间差值。

下面看core_sys_select,这个真正的执行入口:

int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
			   fd_set __user *exp, struct timespec *end_time)
{
	fd_set_bits fds;
	void *bits;
	int ret, max_fds;
	unsigned int size;
	struct fdtable *fdt;
	/* Allocate small arguments on the stack to save memory and be faster */
	long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];

	ret = -EINVAL;
	if (n < 0)
		goto out_nofds;

	/* max_fds can increase, so grab it once to avoid race */
	rcu_read_lock();
	fdt = files_fdtable(current->files);
	max_fds = fdt->max_fds;
	rcu_read_unlock();
	if (n > max_fds)
		n = max_fds;

	/*
	 * We need 6 bitmaps (in/out/ex for both incoming and outgoing),
	 * since we used fdset we need to allocate memory in units of
	 * long-words. 
	 */
	size = FDS_BYTES(n);
	bits = stack_fds;
	if (size > sizeof(stack_fds) / 6) {
		/* Not enough space in on-stack array; must use kmalloc */
		ret = -ENOMEM;
		bits = kmalloc(6 * size, GFP_KERNEL);
		if (!bits)
			goto out_nofds;
	}
	fds.in      = bits;
	fds.out     = bits +   size;
	fds.ex      = bits + 2*size;
	fds.res_in  = bits + 3*size;
	fds.res_out = bits + 4*size;
	fds.res_ex  = bits + 5*size;

	if ((ret = get_fd_set(n, inp, fds.in)) ||
	    (ret = get_fd_set(n, outp, fds.out)) ||
	    (ret = get_fd_set(n, exp, fds.ex)))
		goto out;
	zero_fd_set(n, fds.res_in);
	zero_fd_set(n, fds.res_out);
	zero_fd_set(n, fds.res_ex);

	ret = do_select(n, &fds, end_time);

	if (ret < 0)
		goto out;
	if (!ret) {
		ret = -ERESTARTNOHAND;
		if (signal_pending(current))
			goto out;
		ret = 0;
	}

	if (set_fd_set(n, inp, fds.res_in) ||
	    set_fd_set(n, outp, fds.res_out) ||
	    set_fd_set(n, exp, fds.res_ex))
		ret = -EFAULT;

out:
	if (bits != stack_fds)
		kfree(bits);
out_nofds:
	return ret;
}

主要做些很重要的准备工作,尽可能实现高效:
1> 定义一个SELECT_STACK_ALLOC(256字节)大小的栈上数组用于高效处理传入以及待传出的可读、可写及异常文件描述符集合,空间可能不够使用。
2> 基于current宏检查传入的最大fd对应参数n是否超出当前进程打开的文件描述符表内所示位图容量的max_fds数值(位数),基于使用位图结构也就很容易理解为何select调用的第一个参数是传入的待监听fd的最大值加1。
3> 栈上数组空间不足以存放本次select要处理的fd集合所需总计内存,则使用kmalloc(基于slab)从内核空间分配所需的连续物理内存。
4> 依次使用get_fd_set拷贝待监听的可读、可写及异常事件对应的文件描述符集合,可见每次select调用都需要从用户空间拷贝传入的文件描述符集合到内核空间
5> 清空(zero)待传出的处理结果对应的文件描述符集合,用来存放本次select调用的结果。

提前看下收尾工作,select真正执行完成后:
1> 依次使用set_fd_set拷贝各事件处理结果集合到对应传入的三个事件集合,可见每次select调用还要将处理结果从内核空间拷贝回用户空间下
2> 因select返回时最终结果事件集合会拷贝到传入的各事件初始集合(实际在调用前的准备工作也对待传出结果集合进行了清空,原始待监听各集合不可作为下次调用时复用),所以每次select调用前都需要清空(FD_ZERO)传入的fd事件集合
3> 如果传入的文件描述符比较大,超出栈上分配的内存导致从内核空间分配了所需内存,则释放该内核对应内存。

到了本文最关键的部分,select调用最核心的函数do_select实现:

int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
	ktime_t expire, *to = NULL;
	struct poll_wqueues table;
	poll_table *wait;
	int retval, i, timed_out = 0;
	unsigned long slack = 0;

	rcu_read_lock();
	retval = max_select_fd(n, fds);
	rcu_read_unlock();

	if (retval < 0)
		return retval;
	n = retval;

	poll_initwait(&table);
	wait = &table.pt;
	if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
		wait->_qproc = NULL;
		timed_out = 1;
	}

	if (end_time && !timed_out)
		slack = select_estimate_accuracy(end_time);

	retval = 0;
	for (;;) {
		unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;

		inp = fds->in; outp = fds->out; exp = fds->ex;
		rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

		for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
			unsigned long in, out, ex, all_bits, bit = 1, mask, j;
			unsigned long res_in = 0, res_out = 0, res_ex = 0;

			in = *inp++; out = *outp++; ex = *exp++;
			all_bits = in | out | ex;
			if (all_bits == 0) {
				i += BITS_PER_LONG;
				continue;
			}

			for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
				struct fd f;
				if (i >= n)
					break;
				if (!(bit & all_bits))
					continue;
				f = fdget(i);
				if (f.file) {
					const struct file_operations *f_op;
					f_op = f.file->f_op;
					mask = DEFAULT_POLLMASK;
					if (f_op && f_op->poll) {
						wait_key_set(wait, in, out, bit);
						mask = (*f_op->poll)(f.file, wait);
					}
					fdput(f);
					if ((mask & POLLIN_SET) && (in & bit)) {
						res_in |= bit;
						retval++;
						wait->_qproc = NULL;
					}
					if ((mask & POLLOUT_SET) && (out & bit)) {
						res_out |= bit;
						retval++;
						wait->_qproc = NULL;
					}
					if ((mask & POLLEX_SET) && (ex & bit)) {
						res_ex |= bit;
						retval++;
						wait->_qproc = NULL;
					}
				}
			}
			if (res_in)
				*rinp = res_in;
			if (res_out)
				*routp = res_out;
			if (res_ex)
				*rexp = res_ex;
			cond_resched();
		}
		wait->_qproc = NULL;
		if (retval || timed_out || signal_pending(current))
			break;
		if (table.error) {
			retval = table.error;
			break;
		}

		/*
		 * If this is the first loop and we have a timeout
		 * given, then we convert to ktime_t and set the to
		 * pointer to the expiry value.
		 */
		if (end_time && !to) {
			expire = timespec_to_ktime(*end_time);
			to = &expire;
		}

		if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
					   to, slack))
			timed_out = 1;
	}

	poll_freewait(&table);

	return retval;
}

先来看第9-15行,max_select_fd借助当前进程已打开的文件描述符表检查传入且合法的已打开最大fd,并修正传入的n。

第17行很重要的poll_initwait以及几个关键的数据结构,这部分实际与设备驱动相关挂载等待队列以及就绪唤醒:

void poll_initwait(struct poll_wqueues *pwq)
{
	init_poll_funcptr(&pwq->pt, __pollwait);
	pwq->polling_task = current;
	pwq->triggered = 0;
	pwq->error = 0;
	pwq->table = NULL;
	pwq->inline_index = 0;
}

/* Add a new entry */
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
				poll_table *p)
{
	struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
	struct poll_table_entry *entry = poll_get_entry(pwq);
	if (!entry)
		return;
	entry->filp = get_file(filp);
	entry->wait_address = wait_address;
	entry->key = p->_key;
	init_waitqueue_func_entry(&entry->wait, pollwake);
	entry->wait.private = pwq;
	add_wait_queue(wait_address, &entry->wait);
}

struct poll_table_page {
	struct poll_table_page * next;
	struct poll_table_entry * entry;
	struct poll_table_entry entries[0];
};

// include/linux/kernel.h
/**
 * container_of - cast a member of a structure out to the containing structure
 * @ptr:	the pointer to the member.
 * @type:	the type of the container struct this is embedded in.
 * @member:	the name of the member within the struct.
 *
 */
#define container_of(ptr, type, member) ({			\
	const typeof( ((type *)0)->member ) *__mptr = (ptr);	\
	(type *)( (char *)__mptr - offsetof(type,member) );})

// include/linux/stddef.h
#undef offsetof
#ifdef __compiler_offsetof
#define offsetof(TYPE,MEMBER) __compiler_offsetof(TYPE,MEMBER)
#else
#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)
#endif

// include/linux/poll.h
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
    pt->_qproc = qproc;
    pt->_key   = ~0UL; /* all events enabled */
}

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
	if (p && p->_qproc && wait_address)
		p->_qproc(filp, wait_address, p);
}

typedef struct poll_table_struct {
	poll_queue_proc _qproc;
	unsigned long _key;
} poll_table;

struct poll_table_entry {
	struct file *filp;
	unsigned long key;
	wait_queue_t wait;
	wait_queue_head_t *wait_address;
};

/*
 * Structures and helpers for select/poll syscall
 */
struct poll_wqueues {
	poll_table pt;
	struct poll_table_page *table;
	struct task_struct *polling_task;
	int triggered;
	int error;
	int inline_index;
	struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};

// include/linux/wait.h
typedef struct __wait_queue wait_queue_t;
struct __wait_queue {
	unsigned int flags;
#define WQ_FLAG_EXCLUSIVE	0x01
	void *private;
	wait_queue_func_t func;
	struct list_head task_list;
};

上面第15行container_of宏用于根据成员地址获取结构体的地址,其实现参见第41行,其中借助了offsetof宏,用来判断成员在结构体中的偏移位置,其实现参加第45行。这两个宏在内核代码中很常见,很有用,实现也很巧妙。

poll_initwait初始化poll_wqueues结构体table(do_select第4行),这一结构体用于本次select调用对所有传入的待监听fd进行轮询工作,每个fd对应一个poll_table_entry。
这个核心结构体poll_wqueues的定义见第上面81行,分配了N_INLINE_POLL_ENTRIES((832-256)/64=9)个poll_table_entry,传入fd超过该数值时后续会在注册的__pollwait函数中在内核页上扩充。
上面第54行的init_poll_funcptr则用于注册初始化poll函数指针__pollwait,在__pollwait中完成poll_table_entry的初始化及栈上空间不够用时扩充poll_table。

这里的实现看起来有些绕,到底是如何串联起来呢?

为方便理解将当前进程(current)挂在各个文件描述符的等待队列进行监听轮询,来看下几个fd相关的关键数据结构:

// include/linux/fs.h
struct file_operations {
	struct module *owner;
	loff_t (*llseek) (struct file *, loff_t, int);
	ssize_t (*read) (struct file *, char __user *, size_t, loff_t *);
	ssize_t (*write) (struct file *, const char __user *, size_t, loff_t *);
	ssize_t (*aio_read) (struct kiocb *, const struct iovec *, unsigned long, loff_t);
	ssize_t (*aio_write) (struct kiocb *, const struct iovec *, unsigned long, loff_t);
	int (*readdir) (struct file *, void *, filldir_t);
	unsigned int (*poll) (struct file *, struct poll_table_struct *);
	long (*unlocked_ioctl) (struct file *, unsigned int, unsigned long);
	long (*compat_ioctl) (struct file *, unsigned int, unsigned long);
	int (*mmap) (struct file *, struct vm_area_struct *);
	int (*open) (struct inode *, struct file *);
	int (*flush) (struct file *, fl_owner_t id);
	int (*release) (struct inode *, struct file *);
	int (*fsync) (struct file *, loff_t, loff_t, int datasync);
	int (*aio_fsync) (struct kiocb *, int datasync);
	int (*fasync) (int, struct file *, int);
	int (*lock) (struct file *, int, struct file_lock *);
	ssize_t (*sendpage) (struct file *, struct page *, int, size_t, loff_t *, int);
	unsigned long (*get_unmapped_area)(struct file *, unsigned long, unsigned long, unsigned long, unsigned long);
	int (*check_flags)(int);
	int (*flock) (struct file *, int, struct file_lock *);
	ssize_t (*splice_write)(struct pipe_inode_info *, struct file *, loff_t *, size_t, unsigned int);
	ssize_t (*splice_read)(struct file *, loff_t *, struct pipe_inode_info *, size_t, unsigned int);
	int (*setlease)(struct file *, long, struct file_lock **);
	long (*fallocate)(struct file *file, int mode, loff_t offset,
			  loff_t len);
	int (*show_fdinfo)(struct seq_file *m, struct file *f);
}; 

struct file {
	union {
		struct llist_node	fu_llist;
		struct rcu_head 	fu_rcuhead;
	} f_u;
	struct path		f_path;
#define f_dentry	f_path.dentry
	struct inode		*f_inode;	/* cached value */
	const struct file_operations	*f_op;

	/*
	 * Protects f_ep_links, f_flags, f_pos vs i_size in lseek SEEK_CUR.
	 * Must not be taken from IRQ context.
	 */
	spinlock_t		f_lock;
	atomic_long_t		f_count;
	unsigned int 		f_flags;
	fmode_t			f_mode;
	loff_t			f_pos;
	struct fown_struct	f_owner;
	const struct cred	*f_cred;
	struct file_ra_state	f_ra;

	u64			f_version;
#ifdef CONFIG_SECURITY
	void			*f_security;
#endif
	/* needed for tty driver, and maybe others */
	void			*private_data;

#ifdef CONFIG_EPOLL
	/* Used by fs/eventpoll.c to link all the hooks to this file */
	struct list_head	f_ep_links;
	struct list_head	f_tfile_llink;
#endif /* #ifdef CONFIG_EPOLL */
	struct address_space	*f_mapping;
#ifdef CONFIG_DEBUG_WRITECOUNT
	unsigned long f_mnt_write_state;
#endif
};

// include/linux/file.h
struct fd {
	struct file *file;
	int need_put;
};

回来看do_select函数源码中第58行调用了poll函数,实际对应本段上面第10行加粗的file_operations结构体的函数指针成员poll,不同的fd在创建时会将该成员进行注册初始化(以C99语法里不要求顺序的结构体指定初始化方式),例如socket对应的注册为sock_poll(net/socket.c),pipe对应注册为pipe_poll(fs/pipe.c),普通file则并未注册(例如ext4或xfs),这里实际很像面向对象里的多态行为,linux下一切皆文件,抽象为结构体file,如上第41行提供了file_operations成员指针,fd提供了file_operations统一的操作接口(函数指针),而创建时进行注册调用的真正操作,非常简单清晰。

以socket为例,我们看这个当前进程如何挂在等待队列并对fd轮询检测。
上面已分析过poll_initwait完成了初始化工作,在do_select第28行的第一层无穷循环开始轮询事件监测,第34行开始对传入的n个描述符,以BITS_PER_LONG个为一组依次挂载到等待队列,并对事件进行检测,如果没有事件到来,仅有第一次循环完成挂载,后续循环只监测事件。核心调用见第58行,上面段落分析过了poll函数指针,在这里会根据fd的不同创建类别调用真正的poll函数,socket下对应是sock_poll,如ipv4/tcp下会继续调用tcp_poll,在这里完成调用poll_table注册的函数指针__poll_wait挂载等待队列操作(实际借助poll_wait封装调用),之后完成检测操作获取事件mask结果。

do_select第61-75行,则把获取的结果mask依次写到待传出的fd事件集合。

do_select第64,69,74,86行,保证仅在第一次循环时,完成本次fd对应挂载等待队列,不论是否收到设备事件通知,本次调用仅挂载一次,因此置空poll_table注册的poll。

do_select第109行,释放poll_table。

由上述可见,每次select调用都要轮询完成所有fd的挂载等待队列及事件监测

当待监听的fd数量比较少时,select还是比较高效的,性能未见明显差异。当待处理的fd数量非常大,十万级甚至百万级时,下篇中的epoll就体现出优势来了。

refer:
1. http://www.chongh.wiki/blog/2016/04/08/linux-syscalls/
2. http://blog.csdn.net/qq_33921804/article/details/53454203
3. http://www.cnblogs.com/wuchanming/p/3780058.html
4. http://blog.csdn.net/lizhiguo0532/article/details/6568968
5. http://www.embeddedlinux.org.cn/html/yingjianqudong/201405/11-2860.html
6. https://www.kancloud.cn/kancloud/ldd3/60979
7. http://www.hulkdev.com/posts/select-io

Linux内核select源码剖析》有5个想法

  1. leemeans

    select 选择符大小定义同__NFDBITS有关系:__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
    其中__FD_SETSIZE在内核中定义为1024,但是__NFDBITS在x86_64架构下通过如下程序可以验证其值为64,因此select选择符的最大数目为16.
    typedef long int __fd_mask;

    #define __NFDBITS (8 * (int) sizeof (__fd_mask))
    #define __FD_ELT(d) ((d) / __NFDBITS)
    #define __FD_MASK(d) ((__fd_mask) 1 << ((d) % __NFDBITS))

    int main(){
    printf("size of __NFDBITS: %d\n", __NFDBITS);
    return 0;
    }

    回复
    1. pandademo 文章作者

      __NFDBITS 在 x86_64 下确实是64(__fd_mask 为 long int),但该数组的使用是以 bit 来统计监听的文件描述符。
      你再想想?

      回复

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注