ps.(本文档默认你了解Linux文件句柄操作和基础的socket编程)
为什么需要使用 I/O 多路复用技术?
我们知道,read()
是阻塞的,假设我们有多个文件描述符需要监听,应该怎么办呢?
如果一个个的调用read()
,假设某一个read()
的过程中一直没有读取到数据,那就会阻塞在这里,
后面的数据就无法即使读取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| int main() { char buffer[1024] = {0}; std::cout << "Please enter some text: ";
ssize_t bytes_read = read(0, buffer, sizeof(buffer) - 1);
if (bytes_read > 0) { buffer[bytes_read] = '\0'; std::cout << "You entered: " << buffer << std::endl; } else if (bytes_read == 0) { std::cout << "No input received" << std::endl; } else { std::cerr << "Error reading input" << std::endl; }
return 0; }
|
像这个例子,假设我一直不按回车,那程序就会阻塞在第四行代码,后面的代码不会执行
有的人这时候会想到使用多线程处理每个文件描述符,但是创建线程和切换线程其实很费时间,
而且线程的数量是有限的,会导致线程资源的浪费。所以我们需要使用I/O多路复用技术。
那么什么是I/O多路复用技术呢?
首先我们要清楚,我们read()
的句柄不是每时每刻都有数据。
举个简单的例子,read(0, buffer, sizeof(buffer) - 1)
在这段代码中,用户按下回车键之前,read()
的缓冲区里面是没有数据的。也就是说,在用户输入这一段数据之前,整个线程是阻塞在这里啥事都没有做的,
为了提升性能,我们可以设一个状态,当read()
的缓冲区里面没有数据的时候, 我们将这个文件描述符设置为等待状态,
反之则设置为就绪状态,然后在一个线程里监听多个文件描述符的状态,哪个先就绪就处理哪个。这样就节省了很多等待的时间。
以防你们还是不懂,我简单画个图区分一下一个个调用和上面我说的方法:
一个个调用read()
:
I/O多路复用技术:
ps.以上的操作是内核管理的,实际实现可能更复杂,我这只是讲一个大概的思路用于理解。
那么我们该如何使用I/O多路复用技术呢?
接下来我们就需要了解内核提供的接口了:select poll epoll
select
fd_set
我们先了解一下fd_set
fd_set 可以理解为文件描述符集合,他提供了一些宏函数,常用的有:
FD_ZERO(fd_set *set)
:清空集合。
FD_SET(int fd, fd_set *set)
:将文件描述符 fd 加入集合。
FD_CLR(int fd, fd_set *set)
:从集合中移除文件描述符 fd。
FD_ISSET(int fd, fd_set *set)
:检查文件描述符 fd 是否在集合中(是否就绪)。
select 函数
再看select函数原型int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
参数说明:
nfds:文件描述符集合中最大文件描述符的值加 1。这个值通常是所有文件描述符中最大的那个文件描述符的值(如 sockfd),加 1。
readfds:监视可读的文件描述符集合。如果某个文件描述符可以读取数据,select() 将在返回时设置该文件描述符。
writefds:监视可写的文件描述符集合。如果某个文件描述符可以写入数据,select() 将在返回时设置该文件描述符。
exceptfds:监视异常条件(如带外数据、错误)的文件描述符集合。
timeout:指定超时时间。如果超时,select() 将返回。timeout 可以是以下三种情况:
NULL:select() 将无限期阻塞,直到某个文件描述符就绪。
非 NULL:指定超时时间,超时时间由 struct timeval 结构指定。
零超时值:不阻塞,立即返回并检查文件描述符。
可能看到这你有点懵,其实很简单,nfds
是指的监听的文件描述符数,举个例子,如果你要监听0,1,2,3,4这五个文件描述符,那nfds
就是5。
而fd_set *readfds
就是,监听这个文件描述符的状态是否已经可以读取数据了,举个例子,假设你要监听0,1,2,3,4这五个文件描述符,其中,012这三个文件描述符已经可以读取数据了,那select返回时就会将012设置为就绪。
另外两个也差不多是这样,至于timeout
,就是设置超时时间,如果超时了,select就会返回。
注意nfds
监听的文件描述符数是从0到nfds
,而每次创建新的客户端的套接字时难免会比当前的nfds
大,例如当前的nfds
是5,而新创建的套接字是6,那么nfds
就要更新为6。所以每次创建新的套接字时,都要更新nfds
。
下面我们来看一个简单的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| #define PORT 8080
int main() { int server_fd; struct sockaddr_in address; int addrlen = sizeof(address); std::vector<int> client_sockets;
server_fd = socket(AF_INET, SOCK_STREAM, 0); address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons(PORT);
bind(server_fd, (struct sockaddr *)&address, sizeof(address)); listen(server_fd, 3);
while (true) { fd_set readfds; FD_ZERO(&readfds);
FD_SET(server_fd, &readfds); int max_sd = server_fd;
for (int client_fd : client_sockets) { FD_SET(client_fd, &readfds); if (client_fd > max_sd) max_sd = client_fd; }
int activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);
if (FD_ISSET(server_fd, &readfds)) { int new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen); std::cout << "新客户端已连接" << std::endl; client_sockets.push_back(new_socket); }
for (auto it = client_sockets.begin(); it != client_sockets.end(); ) { int client_fd = *it; if (FD_ISSET(client_fd, &readfds)) { char buffer[1024] = {0}; int valread = read(client_fd, buffer, sizeof(buffer)); if (valread == 0) { close(client_fd); it = client_sockets.erase(it); std::cout << "客户端断开连接" << std::endl; } else { std::cout << "接收到数据: " << buffer << std::endl; ++it; } } else { ++it; } } }
close(server_fd); return 0; }
|
代码说明
初始化 fd_set
集合:使用 FD_ZERO
清空集合,并使用 FD_SET
添加服务器和客户端套接字。
调用 select
:监视所有文件描述符集合中的 I/O 事件。
检查可读性:
使用 FD_ISSET
检查 server_fd
是否在 readfds
中,若在,则表示有新客户端连接。
使用 FD_ISSET
检查客户端套接字是否在 readfds
中,若在,则表示有数据可以读取。
处理断开连接:当 read
返回值为 0 时,表示客户端断开连接,从 client_sockets
中移除该客户端。
通过 select
之后,fd_set
会只保留那些满足条件的文件描述符,因此需要在每次调用前重新初始化 fd_set 集合。这样可以确保 select 每次都只处理最新的状态。
还需要注意的就是,select
是最古老的I/O多路复用技术,它的效率不高,因为每次调用select
都要遍历所有的文件描述符,所以当文件描述符很多时,效率会很低,而且select的文件描述符数量是有限的,一般是1024,所以当文件描述符超过1024时,select就会失效。
poll
poll
和 select
类似,但克服了 select
函数在文件描述符数量上限制的缺点。poll
允许通过 pollfd
结构体数组来监视任意数量的文件描述符。
pollfd
我们先来看看pollfd结构体:
1 2 3 4 5
| struct pollfd { int fd; short events; short revents; };
|
events
:指定要监视的事件,如 POLLIN
(可读)、POLLOUT
(可写)、POLLERR
(错误)等。这里的POLLIN
就类似于select
中的readfds
,以此类推。
revents
:实际发生的事件类型,在 poll
返回后检查。这里就是表示events
中的事件是否发生了。
poll 函数
poll
函数原型:int poll(struct pollfd *fds, nfds_t nfds, int timeout);
返回值:
poll
函数的返回值表示发生事件的文件描述符数量。
- 超时处理:如果超时,
poll
返回 0,表示没有事件发生。
- 错误处理:如果
poll
返回 -1,表示出现错误。
参数说明:
fds
:就是上面讲的pollfd
,每个代表一个需要监视的文件描述符。
nfds
:fds
数组中的元素数量。
timeout
:超时时间,单位为毫秒。如果设置为 -1,则 poll
将无限期阻塞,直到有事件发生。
下面我们来看一个简单的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
| #define PORT 8080 #define MAX_CLIENTS 10 #define BUFFER_SIZE 1024
std::vector<std::thread> clientThreads;
void handleClient(int clientSocket) { char buffer[BUFFER_SIZE]; while (true) { int bytesReceived = read(clientSocket, buffer, BUFFER_SIZE); if (bytesReceived <= 0) { std::cout << "Client disconnected." << std::endl; close(clientSocket); return; } buffer[bytesReceived] = '\0'; std::cout << "Received: " << buffer << std::endl;
std::string response = "Echo: " + std::string(buffer); send(clientSocket, response.c_str(), response.size(), 0); } }
int main() { int serverSocket, clientSocket; struct sockaddr_in serverAddr, clientAddr; socklen_t addrLen = sizeof(clientAddr);
serverSocket = socket(AF_INET, SOCK_STREAM, 0); if (serverSocket == 0) { std::cerr << "Socket creation failed." << std::endl; return -1; }
serverAddr.sin_family = AF_INET; serverAddr.sin_addr.s_addr = INADDR_ANY; serverAddr.sin_port = htons(PORT);
if (bind(serverSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) { std::cerr << "Bind failed." << std::endl; close(serverSocket); return -1; }
if (listen(serverSocket, MAX_CLIENTS) < 0) { std::cerr << "Listen failed." << std::endl; close(serverSocket); return -1; } std::cout << "Server listening on port " << PORT << std::endl;
struct pollfd pollFds[MAX_CLIENTS + 1]; pollFds[0].fd = serverSocket; pollFds[0].events = POLLIN;
int clientCount = 1;
while (true) { int pollCount = poll(pollFds, clientCount, -1); if (pollCount < 0) { std::cerr << "Poll error." << std::endl; break; }
if (pollFds[0].revents & POLLIN) { clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &addrLen); if (clientSocket < 0) { std::cerr << "Client accept failed." << std::endl; continue; }
std::cout << "New client connected." << std::endl;
if (clientCount < MAX_CLIENTS + 1) { pollFds[clientCount].fd = clientSocket; pollFds[clientCount].events = POLLIN; clientCount++;
clientThreads.emplace_back(std::thread(handleClient, clientSocket)); } else { std::cerr << "Maximum clients reached." << std::endl; close(clientSocket); } }
for (int i = 1; i < clientCount; i++) { if (pollFds[i].revents & POLLIN) { char buffer[BUFFER_SIZE]; int bytesReceived = read(pollFds[i].fd, buffer, BUFFER_SIZE); if (bytesReceived <= 0) { std::cout << "Client disconnected." << std::endl; close(pollFds[i].fd); pollFds[i] = pollFds[clientCount - 1]; clientCount--; i--; } else { buffer[bytesReceived] = '\0'; std::cout << "Received from client: " << buffer << std::endl; std::string response = "Echo from server: " + std::string(buffer); send(pollFds[i].fd, response.c_str(), response.size(), 0); } } } }
for (auto& t : clientThreads) { if (t.joinable()) { t.join(); } }
close(serverSocket); return 0; }
|
代码说明:
- 创建
pollfd
数组:pollfd fds[1]
,并监听标准输入和关心可读事件。
- 调用
poll
函数:等待事件发生。
- 检查
revents
:检查 revents
中的事件类型,如 POLLIN
(可读)。
- 读取数据:如果有数据可读,调用
read
函数读取数据。
- 像
select
一样,每连接一个客户端,就要更新pollfd
数组,所以每次连接新的客户端时,都要更新clientCount
。
epoll
与poll
不同的是,epoll
在监控大量文件描述符时具有更好的性能,因为它不需要每次都重新传递所有文件描述符,而是通过事件表的方式进行管理。在高并发服务器中,epoll
十分高效。
epoll 有三个主要的系统调用:
epoll_create
/ epoll_create1
:创建 epoll
实例。
epoll_ctl
:向 epoll
实例中添加、修改或删除文件描述符。
epoll_wait
:等待文件描述符上的事件发生。
epoll_event
我们先了解一下epoll_event
结构体:
epoll_event
结构体用于描述事件,定义如下:
1 2 3 4 5 6 7 8 9 10 11
| struct epoll_event { uint32_t events; epoll_data_t data; };
typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t;
|
events
:表示事件类型,与poll的events
类似:
EPOLLIN
:有数据可读。
EPOLLOUT
:可以写入数据。
EPOLLERR
:发生错误。
EPOLLHUP
:连接被挂起。
EPOLLET
:边沿触发模式(相比默认的水平触发模式)。
epoll_create / epoll_create1
- 用于创建一个
epoll
实例,该实例将用于管理文件描述符。
- 返回值是一个
epoll
实例的文件描述符,后续的 epoll_ctl
和 epoll_wait
操作都依赖于这个描述符。
函数原型:
int epoll_create(int size);
int epoll_create1(int flags);
参数:
size
:epoll
实例的大小,已经不再使用,可以忽略。
flags
:可以设置为 EPOLL_CLOEXEC
,用于在 fork()
子进程中关闭文件描述符。
返回值:
- 返回一个
epoll
实例的文件描述符,用于后续的 epoll_ctl
和 epoll_wait
操作。
- 如果出现错误,返回
-1
。
epoll_ctl
- 用于向
epoll
实例中添加、修改或删除文件描述符,即将epoll_event
写进事件表,类似poll
中的pollfd
。
函数原型
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数:
epfd
:epoll
实例的文件描述符,由 epoll_create
或 epoll_create1
返回。
op
:操作类型,可以是以下值之一:
EPOLL_CTL_ADD
:添加文件描述符到 epoll
实例中。
EPOLL_CTL_MOD
:修改已存在的文件描述符的事件。
EPOLL_CTL_DEL
:从 epoll
实例中删除文件描述符。
fd
:要监视的文件描述符。
event
:指向 epoll_event
结构的指针,用于指定感兴趣的事件。
返回值:
示例
1 2 3 4 5 6 7 8
| struct epoll_event event; event.events = EPOLLIN; event.data.fd = sockfd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event) == -1) { perror("epoll_ctl: EPOLL_CTL_ADD failed"); exit(EXIT_FAILURE); }
|
epoll_wait
- 用于等待文件描述符上的事件发生,类似
poll
中的poll
函数。
函数原型
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
参数
epfd
:epoll
实例的文件描述符。
events
:用于接收发生事件的 epoll_event
数组。
maxevents
:events
数组的最大长度,通常设置为监听的文件描述符的数量。
timeout
:超时时间,单位为毫秒,类似于 poll
的 timeout
参数。
timeout
> 0:阻塞等待指定毫秒数。
timeout
= 0:立即返回,不管是否有事件发生(非阻塞)。
timeout
= -1:无限等待,直到有事件发生。
- 返回值:
- 成功时,返回实际触发事件的文件描述符数量。
- 失败时,返回
-1
,并设置 errno
。
ps:(这里的events
是会被写入实际发生的事件的,而不是单纯告诉事件表我要监听这个事件)
示例
假设已经绑定好了服务器套接字server_fd
,也已经创建了epoll_fd
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| #define MAX_EVENTS 10
int main() { struct epoll_event event, events[MAX_EVENTS]; event.events = EPOLLIN; event.data.fd = server_fd; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) == -1) { perror("epoll_ctl: EPOLL_CTL_ADD failed"); exit(EXIT_FAILURE); } while (true) { int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); if (nfds == -1) { perror("epoll_wait failed"); exit(EXIT_FAILURE); }
for (int i = 0; i < nfds; i++) { if (events[i].data.fd == server_fd) { new_socket = accept(server_fd, NULL, NULL); event.events = EPOLLIN; event.data.fd = new_socket; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &event); } else if(events[i].events & EPOLLIN) { char buffer[1024] = {0}; ssize_t count = read(events[i].data.fd, buffer, sizeof(buffer)); if (count == -1) { perror("read failed"); close(events[i].data.fd); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL); } else if (count == 0) { close(events[i].data.fd); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL); } else { std::cout << "Received: " << buffer << std::endl; } } } }
close(server_fd); return 0; }
|
这里有两个要注意的点:
- 一个是在处理新连接时,
accept
需要设置为非阻塞的,不然会卡着一直不往下面走
- 另一个是要记得判断事件输出是否正确,”
else if(events[i].events & EPOLLIN)
“,否则会影响性能