SOCKET编程与复用

-目录-

SOCKET编程

TCP

流程图

TCP通信过程

关键函数

int fd = socket(AF_INET, SOCK_STREAM, 0);

int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));

bind(fd, local_addr, sizeof(local_addr));

conn_fd = accept(fd, backlog);

recv(conn_fd, buff, sizeof(buff), 0);
send(conn_fd, buff, sizeof(buff), 0);

UDP

流程图

IO多路复用

关键函数

int fd = socket(AF_INET, SOCK_DGRAM, 0);

bind(fd, local_addr, sizeof(local_addr));

//和recv的区别是后两个参数是出参,能够获得IP地址,供sendto使用
recvfrom(fd, buff, sizeof(buff), 0, client_addr, sizeof(client_addr));
sendto(fd, buff, sizeof(buff), 0, client_addr, sizeof(client_addr));

IO复用

一般来说,一个进程只处理一个连接。但这样显然不合理,所以需要一个进程通过某种方式处理多个连接。就是说一个进程同时管控多个连接socket,同时处理多个连接的业务。Linux系统下可以通过以下几种方式实现。

fork

关键函数

pid会在子进程中为0,父进程中不为0。

pid_t pid = fork();

原理浅析

就是对每个连接新建子进程,相当于golang的go func{}(),每次accept后进行fork,让子进程处理连接。

代码示例

//接收到连接...
pid = fork();

if(pid !=0)
{
    continue;
}
//对accept_socket处理...

select

关键函数

#include <sys/select.h>

int select(int nfds, fd_set *restrict readfds,
           fd_set *restrict writefds, fd_set *restrict exceptfds,
           struct timeval *restrict timeout);

void FD_CLR(int fd, fd_set *set);
int  FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

原理浅析

文档select(2) - Linux manual page (man7.org),函数原型:

 int select(int nfds, 				//三个fd_set里最大的数量
            fd_set *readfds, 		 //判断读是否阻塞的fd
            fd_set *writefds,		 //判断写是否阻塞的fd
            fd_set *exceptfds,  	 //判断是否有异常的fd
            struct timeval *timeout	 //阻塞超时
           );

传入的socket应该是非阻塞的,可使用fcntl文件控制进行非阻塞处理。

fd_set结构如下,是一个有32个ULONG(32bit)的数组,是一个连续的3232位=1024位的空间,可以把它看作一个1024×1的超长向量。每一位都能与一打开的文件句柄(socket、文件、管道、设备等)建立联系(因此8sizeof(fd_set)是最大支持的设备数),建立联系由FD_SET完成,当调用select()时,由内核根据IO状态修改fd_set的内容,程序员根据此变化来判断哪个句柄可读。

#define __FD_SETSIZE	1024

typedef struct {
	unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];
} __kernel_fd_set;

为说明方便,取fd_set长度为1字节8bit,fd_set中的每一bit可以对应一个文件描述符fd。则1字节长的fd_set最大可以对应8个fd。

  1. 执行fd_set set; FD_ZERO(&set); 则set用位表示是0000,0000。
  2. 若fd=5,执行FD_SET(fd,&set); 后set变为0001,0000(第5位置为1)
  3. 若再加入fd=2,fd=1,则set变为0001,0011
  4. 执行 select(6,&set,0,0,0) 阻塞等待
  5. 若fd=1,fd=2上都发生可读事件,则select返回,此时set变为0000,0011。注意:没有事件发生的fd=5被清空。

select会循环遍历它所监测的fd_set(一组文件描述符(fd)的集合)内的所有文件描述符对应的驱动程序的poll函数。驱动程序提供的poll函数首先会将调用select的用户进程插入到该设备驱动对应资源的等待队列(如读/写等待队列),然后返回一个bitmask告诉select当前资源哪些可用。当select循环遍历完所有fd_set内指定的文件描述符对应的poll函数后,如果没有一个资源可用(即没有一个文件可供操作),则select让该进程睡眠,一直等到有资源可用为止,进程被唤醒(或者timeout)继续往下执行。

所谓poll函数就是把current(当前进程)挂到设备的等待队列,不同设备有不同等待队列,如tcp_poll的等待队列是sk->sk_sleep(把进程挂到等待队列中并不代表进程已睡眠)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒。

基于上面的讨论,可以得出select模型的特点:

  1. 可监控的文件描述符个数取决与sizeof(fd_set)的值,Linux上是1024。
  2. 将fd加入select监控集的同时,还要再使用一个数据结构array保存放到select监控集中的fd。因为select后没有事件发生的socket对应的bit会设为0,需要从array里拿之前存的socket的值 通过FD_ISSET判断是否有事件发生(那一位是否为1)。
  3. 可见select模型必须在select前循环array(加fd,取maxfd),select返回后循环array(FD_ISSET判断是否有事件发生)。

代码示例

一个使用select的TCP server简化部分代码后如下:

img

int main() {
	//变量定义,部分省略
	int ret = -1;

	int accept_fd = -1;
	int socket_fd = -1;
    //用于保存所有fd
	int accept_fds[FD_SIZE] = { -1, };

	int curpos = -1;
	int maxpos = 0;
	int backlog = 10;

	int max_fd = -1;
    //用于select
	fd_set fd_sets;
	int events = 0;

	//创建socket
	socket_fd = socket(AF_INET, SOCK_STREAM, 0);

	//设置NON_BLOCK和bind地址,省略...
    //skip...

    //listen
	ret = listen(socket_fd, backlog);

	max_fd = socket_fd;
	for (int i = 0; i < FD_SIZE; i++) {
		accept_fds[i] = -1;
	}

	for (;;) {

		FD_ZERO(&fd_sets); //清空sets
		FD_SET(socket_fd, &fd_sets); //将socket_fd 添加到sets

		for (int k = 0; k < maxpos; k++) {
			if (accept_fds[k] != -1) {
				if (accept_fds[k] > max_fd) {
					max_fd = accept_fds[k];
				}
				FD_SET(accept_fds[k], &fd_sets); //继续向sets添加fd
			}
		}

		//通过select找到是否有可写的
		events = select(max_fd + 1, &fd_sets, NULL, NULL, NULL);
        //经过select,不满足监控条件(可读可写等)的socket对应的那个bit位设为0,否则不变为1(通过此操作与array里存的所有socket进行对比得到是否可用)
		if (events < 0) {
			//error
			break;
		}
		else if (events == 0) {
			//timeout
			continue;
		}
		else if (events) {
			if (FD_ISSET(socket_fd, &fd_sets)) { // 如果来的是新连接
                 //分配一个新的,并存入array结构
				int a = 0;
				for (; a < FD_SIZE; a++) {
					if (accept_fds[a] == -1) {
						curpos = a;
						break;
					}
				}

				if (a == FD_SIZE) {
					printf("the connection is full!\n");
					continue;
				}

				int addr_len = sizeof(struct sockaddr_in);
				accept_fd = accept(socket_fd, (struct sockaddr*)&remote_addr, &addr_len); //创建一个新连接的fd

				int flags = fcntl(accept_fd, F_GETFL, 0); //取出新连接的 fd 的相关选项
				fcntl(accept_fd, F_SETFL, flags | O_NONBLOCK); //设置为非阻塞

                 //存入array结构
				accept_fds[curpos] = accept_fd;

				//修改maxpos和max_fd,省略...
			}

			for (int j = 0; j < maxpos; j++) {
				if ((accept_fds[j] != -1) && FD_ISSET(accept_fds[j], &fd_sets)) { //有事件时
					printf("accept event :%d, accept_fd: %d\n", j, accept_fds[j]);
                      //逻辑处理部分,例如发送与接收,部分省略...
					char in_buf[MESSAGE_SIZE];
					int ret = recv(accept_fds[j], &in_buf, MESSAGE_SIZE, 0);
					send(accept_fds[j], (void*)in_buf, MESSAGE_SIZE, 0);
				}
			}//end for
		}//end if events != 0
	}//end for

	//关闭socket,省略...

	return 0;
}

epoll

关键函数

#include <sys/epoll.h>

int epoll_create(int size);
 int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
//op 操作模式
EPOLL_CTL_ADD
EPOLL_CTL_MOD
EPOLL_CTL_DEL
    
//epoll_event 事件
typedef union epoll_data {
    void        *ptr;
    int          fd;
    uint32_t     u32;
    uint64_t     u64;
} epoll_data_t;

struct epoll_event {
    uint32_t     events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
};

//Epoll events
EPOLLIN     //read
EPOLLOUT    //write
EPOLLRDHUP  
EPOLLPRI
EPOLLERR
EPOLLHUP
EPOLLET
EPOLLONESHOT
EPOLLWAKEUP
EPOLLEXCLUSIVE 

原理浅析

两个重要成员

当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用密切相关:

struct eventpoll {
  ...
  /*红黑树的根节点,这棵树中存储着所有添加到epoll中的事件,
  也就是这个epoll监控的事件*/
  struct rb_root_cached rbr;
  /*双向链表rdllist保存着将要通过epoll_wait返回给用户的、满足条件的事件*/
  struct list_head rdllist;
  ...
};

在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个rdllist双向链表,用于存储准备就绪的事件。当epoll_wait调用时,仅仅观察这个rdllist双向链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。这是epoll_wait高效的原因。

所有添加到epoll中的事件都会与设备(如网卡)驱动程序建立回调关系,也就是说相应事件的发生时会调用这里的回调方法。这个回调方法在内核中叫做ep_poll_callback,它会把这样的事件放到上面的rdllist双向链表中。

当调用epoll_wait检查是否有发生事件的连接时,只是检查eventpoll对象中的rdllist双向链表是否有epitem元素而已,如果rdllist链表不为空,则这里的事件复制到用户态内存(使用共享内存mmap提高效率)中,同时将事件数量返回给用户。因此epoll_wait效率非常高。epoll_ctl在向epoll对象中添加、修改、删除事件时,从rbr红黑树中查找事件也非常快,也就是说epoll是非常高效的。

epoll原理

过程总结

过程

触发模式

EPOLLET触发模式的好处

  如果采用EPOLLLT模式的话,系统中一旦有大量的不需要读写的就绪文件描述符,它们每次调用epoll_wait都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率.。而采用EPOLLET这种边缘触发模式的话,当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知一次,直到该文件描述符上出现第二次可读写事件才会通知。这种模式比水平触发效率高,系统不会充斥大量的不关心的就绪文件描述符。

代码示例

int main() {

	int ret = -1;

	int socket_fd = -1;
	int accept_fd = -1;

	int curpos = 0;
	int backlog = 10;
	int flag = 1;
	int flags;

	char in_buf[MESSAGE_SIZE] = { 0, };
	struct sockaddr_in local_addr, remote_addr;

	int epoll_fd;
	struct epoll_event ev, events[MAX_EVENTS];
	int event_number;

	//创建socket,设置NON_BLOCK,设置opt,bind IP, 开启listen 已省略

	epoll_fd = epoll_create(256);
	//添加侦听socket,水平触发确保能接收到
	ev.events = EPOLLIN;
	ev.data.fd = socket_fd;
	epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_fd, &ev);

	//loop
	for (;;) {
		event_number = epoll_wait(epoll_fd, events, MAX_EVENTS, TIMEOUT);
		for (int i = 0; i < event_number; i++) {
			//该空闲socket是侦听socket
			if (events[i].data.fd == socket_fd) {
				//侦听socket空闲,连接新的
				socklen_t addr_len = sizeof(struct sockaddr_in);
				accept_fd = accept(socket_fd, (struct sockaddr*)&remote_addr, &addr_len);
				//设置非阻塞
				flags = fcntl(accept_fd, F_GETFL, 0);
				fcntl(accept_fd, F_SETLK, flags | O_NONBLOCK);
				//添加新的socket到epoll里
				ev.events = EPOLLIN | EPOLLET;
				ev.data.fd = accept_fd;
				epoll_ctl(epoll_fd, EPOLL_CTL_ADD, accept_fd, &ev);
			}
             //判断事件类型
			else if (events[i].events & EPOLLIN) {
				//该空闲socket是侦听socket
                  //逻辑处理
                  ret = recv(events[i].data.fd, (void*)in_buf, MESSAGE_SIZE, 0);
			}
		}

	}

	printf("quit server...\n");
	close(socket_fd);

	return 0;
}

libevent

是一个跨平台的异步事件处理库。对于Linux,其底层仍使用的是select、poll、epoll。

对比

  1. select需要把bitmask从用户态和内核态之间来回拷贝,耗费资源多。epoll使用mmap避免了这一过程,
  2. select无差别遍历所有文件描述符,而epoll使用回调函数让文件描述符主动通知。
© 2019 - 2023 · YuYoung's Blog