ps.(本文档默认你了解Linux文件句柄操作和基础的socket编程)

为什么需要使用 I/O 多路复用技术?

我们知道,read()是阻塞的,假设我们有多个文件描述符需要监听,应该怎么办呢?
如果一个个的调用read(),假设某一个read()的过程中一直没有读取到数据,那就会阻塞在这里,
后面的数据就无法即使读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int main() {
char buffer[1024] = {0}; // 缓冲区用于存储读取到的数据
std::cout << "Please enter some text: ";

// 从标准输入(文件描述符0)读取最多1024字节数据,阻塞式I/O
ssize_t bytes_read = read(0, buffer, sizeof(buffer) - 1); // FD 0 表示标准输入

if (bytes_read > 0) {
buffer[bytes_read] = '\0'; // 确保缓冲区以null字符结束
std::cout << "You entered: " << buffer << std::endl;
} else if (bytes_read == 0) {
std::cout << "No input received" << std::endl;
} else {
std::cerr << "Error reading input" << std::endl;
}

return 0;
}

像这个例子,假设我一直不按回车,那程序就会阻塞在第四行代码,后面的代码不会执行

有的人这时候会想到使用多线程处理每个文件描述符,但是创建线程和切换线程其实很费时间,
而且线程的数量是有限的,会导致线程资源的浪费。所以我们需要使用I/O多路复用技术。

那么什么是I/O多路复用技术呢?

首先我们要清楚,我们read()的句柄不是每时每刻都有数据。
举个简单的例子,read(0, buffer, sizeof(buffer) - 1)在这段代码中,用户按下回车键之前,read()的缓冲区里面是没有数据的。也就是说,在用户输入这一段数据之前,整个线程是阻塞在这里啥事都没有做的,
为了提升性能,我们可以设一个状态,当read()的缓冲区里面没有数据的时候, 我们将这个文件描述符设置为等待状态,
反之则设置为就绪状态,然后在一个线程里监听多个文件描述符的状态,哪个先就绪就处理哪个。这样就节省了很多等待的时间。
以防你们还是不懂,我简单画个图区分一下一个个调用和上面我说的方法:

一个个调用read():

I/O多路复用技术:

ps.以上的操作是内核管理的,实际实现可能更复杂,我这只是讲一个大概的思路用于理解。

那么我们该如何使用I/O多路复用技术呢?

接下来我们就需要了解内核提供的接口了:select poll epoll

select

fd_set

我们先了解一下fd_set
fd_set 可以理解为文件描述符集合,他提供了一些宏函数,常用的有:

  • FD_ZERO(fd_set *set):清空集合。

  • FD_SET(int fd, fd_set *set):将文件描述符 fd 加入集合。

  • FD_CLR(int fd, fd_set *set):从集合中移除文件描述符 fd。

  • FD_ISSET(int fd, fd_set *set):检查文件描述符 fd 是否在集合中(是否就绪)。

select 函数

再看select函数原型int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
参数说明:

  • nfds:文件描述符集合中最大文件描述符的值加 1。这个值通常是所有文件描述符中最大的那个文件描述符的值(如 sockfd),加 1。

  • readfds:监视可读的文件描述符集合。如果某个文件描述符可以读取数据,select() 将在返回时设置该文件描述符。

  • writefds:监视可写的文件描述符集合。如果某个文件描述符可以写入数据,select() 将在返回时设置该文件描述符。

  • exceptfds:监视异常条件(如带外数据、错误)的文件描述符集合。

  • timeout:指定超时时间。如果超时,select() 将返回。timeout 可以是以下三种情况:

NULL:select() 将无限期阻塞,直到某个文件描述符就绪。
非 NULL:指定超时时间,超时时间由 struct timeval 结构指定。
零超时值:不阻塞,立即返回并检查文件描述符。

可能看到这你有点懵,其实很简单,nfds是指的监听的文件描述符数,举个例子,如果你要监听0,1,2,3,4这五个文件描述符,那nfds就是5。
fd_set *readfds就是,监听这个文件描述符的状态是否已经可以读取数据了,举个例子,假设你要监听0,1,2,3,4这五个文件描述符,其中,012这三个文件描述符已经可以读取数据了,那select返回时就会将012设置为就绪。
另外两个也差不多是这样,至于timeout,就是设置超时时间,如果超时了,select就会返回。

注意nfds监听的文件描述符数是从0到nfds,而每次创建新的客户端的套接字时难免会比当前的nfds大,例如当前的nfds是5,而新创建的套接字是6,那么nfds就要更新为6。所以每次创建新的套接字时,都要更新nfds

下面我们来看一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#define PORT 8080

int main() {
int server_fd;
struct sockaddr_in address;
int addrlen = sizeof(address);
std::vector<int> client_sockets;

// 创建服务器套接字
server_fd = socket(AF_INET, SOCK_STREAM, 0);
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);

bind(server_fd, (struct sockaddr *)&address, sizeof(address));
listen(server_fd, 3);

// 主循环:监视服务器和客户端套接字
while (true) {
fd_set readfds;
FD_ZERO(&readfds);

// 添加服务器套接字
FD_SET(server_fd, &readfds);
int max_sd = server_fd;

// 添加所有客户端套接字
for (int client_fd : client_sockets) {
FD_SET(client_fd, &readfds);
if (client_fd > max_sd) max_sd = client_fd;
}

// 使用 select 检查 I/O 事件
int activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);

if (FD_ISSET(server_fd, &readfds)) {
int new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen);
std::cout << "新客户端已连接" << std::endl;
client_sockets.push_back(new_socket);
}

// 检查哪些客户端套接字可读
for (auto it = client_sockets.begin(); it != client_sockets.end(); ) {
int client_fd = *it;
if (FD_ISSET(client_fd, &readfds)) {
char buffer[1024] = {0};
int valread = read(client_fd, buffer, sizeof(buffer));
if (valread == 0) {
// 客户端断开连接
close(client_fd);
it = client_sockets.erase(it);
std::cout << "客户端断开连接" << std::endl;
} else {
std::cout << "接收到数据: " << buffer << std::endl;
++it;
}
} else {
++it;
}
}
}

close(server_fd);
return 0;
}

代码说明
初始化 fd_set 集合:使用 FD_ZERO 清空集合,并使用 FD_SET 添加服务器和客户端套接字。

调用 select:监视所有文件描述符集合中的 I/O 事件。

检查可读性:
使用 FD_ISSET 检查 server_fd 是否在 readfds 中,若在,则表示有新客户端连接。
使用 FD_ISSET 检查客户端套接字是否在 readfds 中,若在,则表示有数据可以读取。

处理断开连接:当 read 返回值为 0 时,表示客户端断开连接,从 client_sockets 中移除该客户端。

通过 select 之后,fd_set 会只保留那些满足条件的文件描述符,因此需要在每次调用前重新初始化 fd_set 集合。这样可以确保 select 每次都只处理最新的状态。

还需要注意的就是,select是最古老的I/O多路复用技术,它的效率不高,因为每次调用select都要遍历所有的文件描述符,所以当文件描述符很多时,效率会很低,而且select的文件描述符数量是有限的,一般是1024,所以当文件描述符超过1024时,select就会失效。

poll

pollselect 类似,但克服了 select 函数在文件描述符数量上限制的缺点。poll 允许通过 pollfd 结构体数组来监视任意数量的文件描述符。

pollfd

我们先来看看pollfd结构体:

1
2
3
4
5
struct pollfd {
int fd; // 文件描述符
short events; // 要监视的事件
short revents; // 实际发生的事件
};

events:指定要监视的事件,如 POLLIN(可读)、POLLOUT(可写)、POLLERR(错误)等。这里的POLLIN就类似于select中的readfds,以此类推。

revents:实际发生的事件类型,在 poll 返回后检查。这里就是表示events中的事件是否发生了。

poll 函数

poll 函数原型:int poll(struct pollfd *fds, nfds_t nfds, int timeout);

返回值:

  • poll 函数的返回值表示发生事件的文件描述符数量。
  • 超时处理:如果超时,poll 返回 0,表示没有事件发生。
  • 错误处理:如果 poll 返回 -1,表示出现错误。

参数说明:

  • fds:就是上面讲的pollfd,每个代表一个需要监视的文件描述符。
  • nfdsfds 数组中的元素数量。
  • timeout:超时时间,单位为毫秒。如果设置为 -1,则 poll 将无限期阻塞,直到有事件发生。

下面我们来看一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#define PORT 8080
#define MAX_CLIENTS 10
#define BUFFER_SIZE 1024

// 用于存储客户端的线程
std::vector<std::thread> clientThreads;

// 客户端处理函数
void handleClient(int clientSocket) {
char buffer[BUFFER_SIZE];
while (true) {
// 读取客户端发送的数据
int bytesReceived = read(clientSocket, buffer, BUFFER_SIZE);
if (bytesReceived <= 0) {
std::cout << "Client disconnected." << std::endl;
close(clientSocket);
return;
}
buffer[bytesReceived] = '\0';
std::cout << "Received: " << buffer << std::endl;

// 回传数据给客户端
std::string response = "Echo: " + std::string(buffer);
send(clientSocket, response.c_str(), response.size(), 0);
}
}

int main() {
int serverSocket, clientSocket;
struct sockaddr_in serverAddr, clientAddr;
socklen_t addrLen = sizeof(clientAddr);

// 创建服务器套接字
serverSocket = socket(AF_INET, SOCK_STREAM, 0);
if (serverSocket == 0) {
std::cerr << "Socket creation failed." << std::endl;
return -1;
}

// 服务器地址配置
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = INADDR_ANY;
serverAddr.sin_port = htons(PORT);

// 绑定服务器套接字
if (bind(serverSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) {
std::cerr << "Bind failed." << std::endl;
close(serverSocket);
return -1;
}

// 开始监听
if (listen(serverSocket, MAX_CLIENTS) < 0) {
std::cerr << "Listen failed." << std::endl;
close(serverSocket);
return -1;
}
std::cout << "Server listening on port " << PORT << std::endl;

// 设置 poll 文件描述符结构体
struct pollfd pollFds[MAX_CLIENTS + 1];
pollFds[0].fd = serverSocket;
pollFds[0].events = POLLIN; // 监听服务器套接字的读事件(即新客户端连接)

int clientCount = 1;

while (true) {
// 调用 poll 监听所有文件描述符
int pollCount = poll(pollFds, clientCount, -1);
if (pollCount < 0) {
std::cerr << "Poll error." << std::endl;
break;
}

// 检查服务器套接字是否有新客户端连接
if (pollFds[0].revents & POLLIN) {
clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &addrLen);
if (clientSocket < 0) {
std::cerr << "Client accept failed." << std::endl;
continue;
}

std::cout << "New client connected." << std::endl;

// 将新客户端加入 poll 监听列表
if (clientCount < MAX_CLIENTS + 1) {
pollFds[clientCount].fd = clientSocket;
pollFds[clientCount].events = POLLIN;
clientCount++;

// 为每个客户端启动一个新线程进行处理
clientThreads.emplace_back(std::thread(handleClient, clientSocket));
} else {
std::cerr << "Maximum clients reached." << std::endl;
close(clientSocket);
}
}

// 处理现有客户端
for (int i = 1; i < clientCount; i++) {
if (pollFds[i].revents & POLLIN) {
char buffer[BUFFER_SIZE];
int bytesReceived = read(pollFds[i].fd, buffer, BUFFER_SIZE);
if (bytesReceived <= 0) {
// 客户端断开连接
std::cout << "Client disconnected." << std::endl;
close(pollFds[i].fd);
pollFds[i] = pollFds[clientCount - 1];
clientCount--;
i--; // 防止跳过一个客户端
} else {
// 处理客户端消息
buffer[bytesReceived] = '\0';
std::cout << "Received from client: " << buffer << std::endl;
std::string response = "Echo from server: " + std::string(buffer);
send(pollFds[i].fd, response.c_str(), response.size(), 0);
}
}
}
}

// 等待所有客户端线程结束
for (auto& t : clientThreads) {
if (t.joinable()) {
t.join();
}
}

close(serverSocket);
return 0;
}

代码说明:

  • 创建 pollfd 数组:pollfd fds[1],并监听标准输入和关心可读事件。
  • 调用 poll 函数:等待事件发生。
  • 检查 revents:检查 revents 中的事件类型,如 POLLIN(可读)。
  • 读取数据:如果有数据可读,调用 read 函数读取数据。
  • select一样,每连接一个客户端,就要更新pollfd数组,所以每次连接新的客户端时,都要更新clientCount

epoll

poll不同的是,epoll在监控大量文件描述符时具有更好的性能,因为它不需要每次都重新传递所有文件描述符,而是通过事件表的方式进行管理。在高并发服务器中,epoll十分高效。

epoll 有三个主要的系统调用:

  • epoll_create / epoll_create1:创建 epoll 实例。
  • epoll_ctl:向 epoll 实例中添加、修改或删除文件描述符。
  • epoll_wait:等待文件描述符上的事件发生。

epoll_event

我们先了解一下epoll_event结构体:

epoll_event 结构体用于描述事件,定义如下:

1
2
3
4
5
6
7
8
9
10
11
struct epoll_event {
uint32_t events; // 事件类型
epoll_data_t data; // 用户数据
};

typedef union epoll_data {
void *ptr;
int fd; // 文件描述符
uint32_t u32;
uint64_t u64;
} epoll_data_t;

events:表示事件类型,与poll的events类似:

  • EPOLLIN:有数据可读。
  • EPOLLOUT:可以写入数据。
  • EPOLLERR:发生错误。
  • EPOLLHUP:连接被挂起。
  • EPOLLET:边沿触发模式(相比默认的水平触发模式)。

epoll_create / epoll_create1

  • 用于创建一个 epoll 实例,该实例将用于管理文件描述符。
  • 返回值是一个 epoll 实例的文件描述符,后续的 epoll_ctlepoll_wait 操作都依赖于这个描述符。
    函数原型:
  • int epoll_create(int size);
  • int epoll_create1(int flags);

参数:

  • sizeepoll 实例的大小,已经不再使用,可以忽略。
  • flags:可以设置为 EPOLL_CLOEXEC ,用于在 fork() 子进程中关闭文件描述符。

返回值:

  • 返回一个 epoll 实例的文件描述符,用于后续的 epoll_ctlepoll_wait 操作。
  • 如果出现错误,返回 -1

epoll_ctl

  • 用于向 epoll 实例中添加、修改或删除文件描述符,即将epoll_event写进事件表,类似poll中的pollfd

函数原型

  • int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

参数:

  • epfdepoll 实例的文件描述符,由 epoll_createepoll_create1 返回。
  • op:操作类型,可以是以下值之一:
    • EPOLL_CTL_ADD:添加文件描述符到 epoll 实例中。
    • EPOLL_CTL_MOD:修改已存在的文件描述符的事件。
    • EPOLL_CTL_DEL:从 epoll 实例中删除文件描述符。
  • fd:要监视的文件描述符。
  • event:指向 epoll_event 结构的指针,用于指定感兴趣的事件。

返回值:

  • 成功时返回 0,失败时返回 -1

示例

1
2
3
4
5
6
7
8
struct epoll_event event;
event.events = EPOLLIN; // 监听读事件
event.data.fd = sockfd; // 关联文件描述符(socket)

if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event) == -1) {
perror("epoll_ctl: EPOLL_CTL_ADD failed");
exit(EXIT_FAILURE);
}

epoll_wait

  • 用于等待文件描述符上的事件发生,类似poll中的poll函数。

函数原型

  • int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

参数

  • epfdepoll 实例的文件描述符。
  • events:用于接收发生事件的 epoll_event 数组。
  • maxeventsevents 数组的最大长度,通常设置为监听的文件描述符的数量。
  • timeout:超时时间,单位为毫秒,类似于 polltimeout 参数。
    • timeout > 0:阻塞等待指定毫秒数。
    • timeout = 0:立即返回,不管是否有事件发生(非阻塞)。
    • timeout = -1:无限等待,直到有事件发生。
  • 返回值:
    • 成功时,返回实际触发事件的文件描述符数量。
    • 失败时,返回 -1,并设置 errno

ps:(这里的events是会被写入实际发生的事件的,而不是单纯告诉事件表我要监听这个事件)

示例

假设已经绑定好了服务器套接字server_fd,也已经创建了epoll_fd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#define MAX_EVENTS 10

int main() {
struct epoll_event event, events[MAX_EVENTS];
event.events = EPOLLIN;
event.data.fd = server_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) == -1) {
perror("epoll_ctl: EPOLL_CTL_ADD failed");
exit(EXIT_FAILURE);
}
while (true) {
// 等待事件发生
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait failed");
exit(EXIT_FAILURE);
}

// 处理每个发生的事件
for (int i = 0; i < nfds; i++) {
if (events[i].data.fd == server_fd) {
// 有新连接请求
new_socket = accept(server_fd, NULL, NULL);//accept注意要设置成非阻塞的,这里没有设置,是错误示范
event.events = EPOLLIN;
event.data.fd = new_socket;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &event);
} else if(events[i].events & EPOLLIN) {
// 有数据可读
char buffer[1024] = {0};
ssize_t count = read(events[i].data.fd, buffer, sizeof(buffer));
if (count == -1) {
perror("read failed");
close(events[i].data.fd);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
} else if (count == 0) {
// 客户端关闭连接
close(events[i].data.fd);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
} else {
// 输出接收到的数据
std::cout << "Received: " << buffer << std::endl;
}
}
}
}

close(server_fd);
return 0;
}

这里有两个要注意的点:

  • 一个是在处理新连接时,accept需要设置为非阻塞的,不然会卡着一直不往下面走
  • 另一个是要记得判断事件输出是否正确,”else if(events[i].events & EPOLLIN)“,否则会影响性能