SOCKET编程与复用
SOCKET编程
TCP
流程图
关键函数
int fd = socket(AF_INET, SOCK_STREAM, 0);
int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
bind(fd, local_addr, sizeof(local_addr));
conn_fd = accept(fd, backlog);
recv(conn_fd, buff, sizeof(buff), 0);
send(conn_fd, buff, sizeof(buff), 0);
UDP
流程图
关键函数
int fd = socket(AF_INET, SOCK_DGRAM, 0);
bind(fd, local_addr, sizeof(local_addr));
//和recv的区别是后两个参数是出参,能够获得IP地址,供sendto使用
recvfrom(fd, buff, sizeof(buff), 0, client_addr, sizeof(client_addr));
sendto(fd, buff, sizeof(buff), 0, client_addr, sizeof(client_addr));
IO复用
一般来说,一个进程只处理一个连接。但这样显然不合理,所以需要一个进程通过某种方式处理多个连接。就是说一个进程同时管控多个连接socket,同时处理多个连接的业务。Linux系统下可以通过以下几种方式实现。
fork
关键函数
pid会在子进程中为0,父进程中不为0。
pid_t pid = fork();
原理浅析
就是对每个连接新建子进程,相当于golang的go func{}(),每次accept后进行fork,让子进程处理连接。
代码示例
//接收到连接...
pid = fork();
if(pid !=0)
{
continue;
}
//对accept_socket处理...
select
关键函数
#include <sys/select.h>
int select(int nfds, fd_set *restrict readfds,
fd_set *restrict writefds, fd_set *restrict exceptfds,
struct timeval *restrict timeout);
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
原理浅析
文档select(2) - Linux manual page (man7.org),函数原型:
int select(int nfds, //三个fd_set里最大的数量
fd_set *readfds, //判断读是否阻塞的fd
fd_set *writefds, //判断写是否阻塞的fd
fd_set *exceptfds, //判断是否有异常的fd
struct timeval *timeout //阻塞超时
);
传入的socket应该是非阻塞的,可使用fcntl
文件控制进行非阻塞处理。
fd_set结构如下,是一个有32个ULONG(32bit)的数组,是一个连续的3232位=1024位的空间,可以把它看作一个1024×1的超长向量。每一位都能与一打开的文件句柄(socket、文件、管道、设备等)建立联系(因此8sizeof(fd_set)是最大支持的设备数),建立联系由FD_SET
完成,当调用select()
时,由内核根据IO状态修改fd_set的内容,程序员根据此变化来判断哪个句柄可读。
#define __FD_SETSIZE 1024
typedef struct {
unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];
} __kernel_fd_set;
为说明方便,取fd_set长度为1字节8bit,fd_set中的每一bit可以对应一个文件描述符fd。则1字节长的fd_set最大可以对应8个fd。
- 执行fd_set set; FD_ZERO(&set); 则set用位表示是0000,0000。
- 若fd=5,执行FD_SET(fd,&set); 后set变为0001,0000(第5位置为1)
- 若再加入fd=2,fd=1,则set变为0001,0011
- 执行 select(6,&set,0,0,0) 阻塞等待
- 若fd=1,fd=2上都发生可读事件,则select返回,此时set变为0000,0011。注意:没有事件发生的fd=5被清空。
select会循环遍历它所监测的fd_set(一组文件描述符(fd)的集合)内的所有文件描述符对应的驱动程序的poll函数。驱动程序提供的poll函数首先会将调用select的用户进程插入到该设备驱动对应资源的等待队列(如读/写等待队列),然后返回一个bitmask告诉select当前资源哪些可用。当select循环遍历完所有fd_set内指定的文件描述符对应的poll函数后,如果没有一个资源可用(即没有一个文件可供操作),则select让该进程睡眠,一直等到有资源可用为止,进程被唤醒(或者timeout)继续往下执行。
所谓poll函数就是把current(当前进程)挂到设备的等待队列,不同设备有不同等待队列,如tcp_poll的等待队列是sk->sk_sleep(把进程挂到等待队列中并不代表进程已睡眠)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒。
基于上面的讨论,可以得出select模型的特点:
- 可监控的文件描述符个数取决与
sizeof(fd_set)
的值,Linux上是1024。 - 将fd加入select监控集的同时,还要再使用一个数据结构array保存放到select监控集中的fd。因为select后没有事件发生的socket对应的bit会设为0,需要从array里拿之前存的socket的值 通过
FD_ISSET
判断是否有事件发生(那一位是否为1)。 - 可见select模型必须在select前循环array(加fd,取maxfd),select返回后循环array(
FD_ISSET
判断是否有事件发生)。
代码示例
一个使用select的TCP server简化部分代码后如下:
int main() {
//变量定义,部分省略
int ret = -1;
int accept_fd = -1;
int socket_fd = -1;
//用于保存所有fd
int accept_fds[FD_SIZE] = { -1, };
int curpos = -1;
int maxpos = 0;
int backlog = 10;
int max_fd = -1;
//用于select
fd_set fd_sets;
int events = 0;
//创建socket
socket_fd = socket(AF_INET, SOCK_STREAM, 0);
//设置NON_BLOCK和bind地址,省略...
//skip...
//listen
ret = listen(socket_fd, backlog);
max_fd = socket_fd;
for (int i = 0; i < FD_SIZE; i++) {
accept_fds[i] = -1;
}
for (;;) {
FD_ZERO(&fd_sets); //清空sets
FD_SET(socket_fd, &fd_sets); //将socket_fd 添加到sets
for (int k = 0; k < maxpos; k++) {
if (accept_fds[k] != -1) {
if (accept_fds[k] > max_fd) {
max_fd = accept_fds[k];
}
FD_SET(accept_fds[k], &fd_sets); //继续向sets添加fd
}
}
//通过select找到是否有可写的
events = select(max_fd + 1, &fd_sets, NULL, NULL, NULL);
//经过select,不满足监控条件(可读可写等)的socket对应的那个bit位设为0,否则不变为1(通过此操作与array里存的所有socket进行对比得到是否可用)
if (events < 0) {
//error
break;
}
else if (events == 0) {
//timeout
continue;
}
else if (events) {
if (FD_ISSET(socket_fd, &fd_sets)) { // 如果来的是新连接
//分配一个新的,并存入array结构
int a = 0;
for (; a < FD_SIZE; a++) {
if (accept_fds[a] == -1) {
curpos = a;
break;
}
}
if (a == FD_SIZE) {
printf("the connection is full!\n");
continue;
}
int addr_len = sizeof(struct sockaddr_in);
accept_fd = accept(socket_fd, (struct sockaddr*)&remote_addr, &addr_len); //创建一个新连接的fd
int flags = fcntl(accept_fd, F_GETFL, 0); //取出新连接的 fd 的相关选项
fcntl(accept_fd, F_SETFL, flags | O_NONBLOCK); //设置为非阻塞
//存入array结构
accept_fds[curpos] = accept_fd;
//修改maxpos和max_fd,省略...
}
for (int j = 0; j < maxpos; j++) {
if ((accept_fds[j] != -1) && FD_ISSET(accept_fds[j], &fd_sets)) { //有事件时
printf("accept event :%d, accept_fd: %d\n", j, accept_fds[j]);
//逻辑处理部分,例如发送与接收,部分省略...
char in_buf[MESSAGE_SIZE];
int ret = recv(accept_fds[j], &in_buf, MESSAGE_SIZE, 0);
send(accept_fds[j], (void*)in_buf, MESSAGE_SIZE, 0);
}
}//end for
}//end if events != 0
}//end for
//关闭socket,省略...
return 0;
}
epoll
关键函数
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
//op 操作模式
EPOLL_CTL_ADD
EPOLL_CTL_MOD
EPOLL_CTL_DEL
//epoll_event 事件
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
//Epoll events
EPOLLIN //read
EPOLLOUT //write
EPOLLRDHUP
EPOLLPRI
EPOLLERR
EPOLLHUP
EPOLLET
EPOLLONESHOT
EPOLLWAKEUP
EPOLLEXCLUSIVE
原理浅析
两个重要成员
当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用密切相关:
struct eventpoll {
...
/*红黑树的根节点,这棵树中存储着所有添加到epoll中的事件,
也就是这个epoll监控的事件*/
struct rb_root_cached rbr;
/*双向链表rdllist保存着将要通过epoll_wait返回给用户的、满足条件的事件*/
struct list_head rdllist;
...
};
在调用epoll_create
时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个rdllist双向链表,用于存储准备就绪的事件。当epoll_wait调用时,仅仅观察这个rdllist双向链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。这是epoll_wait
高效的原因。
所有添加到epoll中的事件都会与设备(如网卡)驱动程序建立回调关系,也就是说相应事件的发生时会调用这里的回调方法。这个回调方法在内核中叫做ep_poll_callback
,它会把这样的事件放到上面的rdllist双向链表中。
当调用epoll_wait
检查是否有发生事件的连接时,只是检查eventpoll对象中的rdllist双向链表是否有epitem元素而已,如果rdllist链表不为空,则这里的事件复制到用户态内存(使用共享内存mmap提高效率)中,同时将事件数量返回给用户。因此epoll_wait
效率非常高。epoll_ctl
在向epoll对象中添加、修改、删除事件时,从rbr红黑树中查找事件也非常快,也就是说epoll是非常高效的。
过程总结
- epoll主要使用一个红黑树,一个准备就绪句柄的链表,就帮我们解决了大并发下的socket处理问题。
- 执行
epoll_create()
时,创建了红黑树和就绪链表; - 执行
epoll_ctl()
时,如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表中插入数据; - 执行
epoll_wait()
时立刻返回准备就绪链表里的数据即可。
触发模式
- ET模式(边缘触发)只有数据到来才触发,不管缓存区中是否还有数据,缓冲区剩余未读尽的数据不会导致epoll_wait返回,相当于是select的加强版
- LT 模式(水平触发,默认)只要有数据都会触发,缓冲区剩余未读尽的数据会再次导致epoll_wait返回
EPOLLET触发模式的好处
如果采用EPOLLLT模式的话,系统中一旦有大量的不需要读写的就绪文件描述符,它们每次调用epoll_wait都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率.。而采用EPOLLET这种边缘触发模式的话,当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知一次,直到该文件描述符上出现第二次可读写事件才会通知。这种模式比水平触发效率高,系统不会充斥大量的不关心的就绪文件描述符。
代码示例
int main() {
int ret = -1;
int socket_fd = -1;
int accept_fd = -1;
int curpos = 0;
int backlog = 10;
int flag = 1;
int flags;
char in_buf[MESSAGE_SIZE] = { 0, };
struct sockaddr_in local_addr, remote_addr;
int epoll_fd;
struct epoll_event ev, events[MAX_EVENTS];
int event_number;
//创建socket,设置NON_BLOCK,设置opt,bind IP, 开启listen 已省略
epoll_fd = epoll_create(256);
//添加侦听socket,水平触发确保能接收到
ev.events = EPOLLIN;
ev.data.fd = socket_fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_fd, &ev);
//loop
for (;;) {
event_number = epoll_wait(epoll_fd, events, MAX_EVENTS, TIMEOUT);
for (int i = 0; i < event_number; i++) {
//该空闲socket是侦听socket
if (events[i].data.fd == socket_fd) {
//侦听socket空闲,连接新的
socklen_t addr_len = sizeof(struct sockaddr_in);
accept_fd = accept(socket_fd, (struct sockaddr*)&remote_addr, &addr_len);
//设置非阻塞
flags = fcntl(accept_fd, F_GETFL, 0);
fcntl(accept_fd, F_SETLK, flags | O_NONBLOCK);
//添加新的socket到epoll里
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = accept_fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, accept_fd, &ev);
}
//判断事件类型
else if (events[i].events & EPOLLIN) {
//该空闲socket是侦听socket
//逻辑处理
ret = recv(events[i].data.fd, (void*)in_buf, MESSAGE_SIZE, 0);
}
}
}
printf("quit server...\n");
close(socket_fd);
return 0;
}
libevent
是一个跨平台的异步事件处理库。对于Linux,其底层仍使用的是select、poll、epoll。
对比
- select需要把bitmask从用户态和内核态之间来回拷贝,耗费资源多。epoll使用mmap避免了这一过程,
- select无差别遍历所有文件描述符,而epoll使用回调函数让文件描述符主动通知。