1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
#ifndef PROCESSPOOL_H
#define PROCESSPOOL_H

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/stat.h>

class process {
public:
process() : m_pid(-1) {}

public:
pid_t m_pid;
int m_pipefd[2]; // 父进程和子进程通信用管道
};

template <typename T> // 模板参数是处理逻辑任务的类
class processpool {
private:
// 定义为私有,只能通过create静态函数来创建processpool实例
processpool(int listenfd, int process_number = 8);

public:
// 单例模式,是程序正确处理信号的必要条件
static processpool<T> *create(int listenfd, int process_number = 8) {
if (!m_instance) {
m_instance = new processpool<T>(listenfd, process_number);
}
return m_instance;
}
~processpool() {
delete[] m_sub_process;
}
void run();

private:
void setup_sig_pipe();
void run_parent();
void run_child();

private:
static const int MAX_PROCESS_NUMBER = 16; // 最大子进程数量
static const int USER_PER_PROCESS = 65536; // 每个子进程处理最大客户数量
static const int MAX_EVENT_NUMBER = 10000; // epoll最多能处理的事件数
int m_process_number; // 进程总数
int m_idx; // 子进程在池中序号
int m_epollfd; // 每个进程有个epoll内核事件表
int m_listenfd; // 监听socket
int m_stop; // 子进程通过m_stop判断是否停止运行
process *m_sub_process; // 保存所有子进程描述信息
static processpool<T> *m_instance; // 进程池静态实例
};
template <typename T>
processpool<T> *processpool<T>::m_instance = NULL;

static int sig_pipefd[2]; // 信号管道 统一事件源

static int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

static void addfd(int epollfd, int fd) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}

static void removefd(int epollfd, int fd) { // 从epollfd表示的内核事件表中删除fd的 所有事件
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);
close(fd);
}

static void sig_handler(int sig) {
int save_errno = errno;
int msg = sig;
send(sig_pipefd[1], (char *)&msg, 1, 0);
errno = save_errno;
}

static void addsig(int sig, void(handler)(int), bool restart = true) {
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler;
if (restart)
{
sa.sa_flags |= SA_RESTART;
}
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}

template <typename T>
processpool<T>::processpool(int listenfd, int process_number) // 进程池构造函数,参数listenfd是监听的socket,必须在创建进程池之前被创建
: m_listenfd(listenfd), m_process_number(process_number), m_idx(-1), m_stop(false) {
assert((process_number > 0) && (process_number <= MAX_PROCESS_NUMBER));

m_sub_process = new process[process_number];
assert(m_sub_process);

for (int i = 0; i < process_number; ++i) {
int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_sub_process[i].m_pipefd);
assert(ret == 0);

m_sub_process[i].m_pid = fork();
assert(m_sub_process[i].m_pid >= 0);
if (m_sub_process[i].m_pid > 0) { // 父进程
close(m_sub_process[i].m_pipefd[1]); // 只向子进程写
continue;
}
else { // 子进程
close(m_sub_process[i].m_pipefd[0]); // 只从父进程读
m_idx = i;
break;
}
}
}

template <typename T>
void processpool<T>::setup_sig_pipe() { // 统一事件源
m_epollfd = epoll_create(5); // 创建epoll事件监听表和信号管道
assert(m_epollfd != -1);

int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
assert(ret != -1);

setnonblocking(sig_pipefd[1]);
addfd(m_epollfd, sig_pipefd[0]);

addsig(SIGCHLD, sig_handler); // 设置信号处理函数
addsig(SIGTERM, sig_handler);
addsig(SIGINT, sig_handler);
addsig(SIGPIPE, SIG_IGN);
}

template <typename T>
void processpool<T>::run() { // 父进程中m_idx值为 -1, 子进程中m_idx值 >= 0,判断接下来运行的是父还是子进程的代码
if (m_idx != -1)
{
run_child();
return;
}
run_parent();
}

template <typename T>
void processpool<T>::run_child() {
setup_sig_pipe();

int pipefd = m_sub_process[m_idx].m_pipefd[1]; // 每个子进程都通过其在进程池中的序号值m_idx找到与父进程通信的管道
addfd(m_epollfd, pipefd); // 子进程需要监听管道文件描述符pipefd 因为父进程将通过它通知子进程accept新连接

epoll_event events[MAX_EVENT_NUMBER];
T *users = new T[USER_PER_PROCESS];
assert(users);
int number = 0; // epoll事件数量
int ret = -1;

while (!m_stop) {
number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR)) {
printf("epoll failure\n");
break;
}

for (int i = 0; i < number; i++) { // 处理每个事件
int sockfd = events[i].data.fd;
if ((sockfd == pipefd) && (events[i].events & EPOLLIN)) { // 父进程有数据给子进程
int client = 0;
ret = recv(sockfd, (char *)&client, sizeof(client), 0); // 从父子进程之间管道读取数据,结果保存client中读取成功有新客户
if (((ret < 0) && (errno != EAGAIN)) || ret == 0) {
continue;
} else {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0) {
printf("errno is: %d\n", errno);
continue;
}
addfd(m_epollfd, connfd); // 模板类T必须实现init方法,初始化一个客户连接。我们用connfd来索引处理的逻辑对象 提高效率
users[connfd].init(m_epollfd, connfd, client_address); // 实现在cgi_server.cpp中
}
} else if ((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)) { // 有信号,处理信号
int sig;
char signals[1024];
ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
if (ret <= 0) {
continue;
} else {
for (int i = 0; i < ret; ++i) {
switch (signals[i]) {
case SIGCHLD: { // 回收子进程资源
pid_t pid;
int stat;
while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
continue;
}
break;
}
case SIGTERM:
case SIGINT: {
m_stop = true;
break;
}
default: {
break;
}
}
}
}
} else if (events[i].events & EPOLLIN) { // 其他数据可读 必然是客户请求到来 调用逻辑处理对象的process方法处理
users[sockfd].process(); // 还没实现
} else {
continue;
}
}
}

delete[] users;
users = NULL;
close(pipefd);
//close( m_listenfd ); // 应该由m_listenfd的创建者来关闭,所谓的对象由哪个函数创建,就该由那个函数销毁
close(m_epollfd);
}

template <typename T>
void processpool<T>::run_parent() {
setup_sig_pipe();

addfd(m_epollfd, m_listenfd);

epoll_event events[MAX_EVENT_NUMBER];
int sub_process_counter = 0;
int new_conn = 1;
int number = 0;
int ret = -1;

while (!m_stop) {
number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR)) {
printf("epoll failure\n");
break;
}

for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if (sockfd == m_listenfd) {
int i = sub_process_counter; // 有新连接到来,就采用round robin方式将其分给一个子进程
do {
if (m_sub_process[i].m_pid != -1) { // 选出空闲进程的进程数组索引
break;
}
i = (i + 1) % m_process_number;
} while (i != sub_process_counter);

if (m_sub_process[i].m_pid == -1) {
m_stop = true;
break;
}
sub_process_counter = (i + 1) % m_process_number;
//send( m_sub_process[sub_process_counter++].m_pipefd[0], ( char* )&new_conn, sizeof( new_conn ), 0 );
send(m_sub_process[i].m_pipefd[0], (char *)&new_conn, sizeof(new_conn), 0);
printf("send request to child %d\n", i);
//sub_process_counter %= m_process_number;
} else if ((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)) { // 处理父进程收到的信号
int sig;
char signals[1024];
ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
if (ret <= 0) {
continue;
} else {
for (int i = 0; i < ret; ++i) {
switch (signals[i]) {
case SIGCHLD: {
pid_t pid;
int stat;
while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
for (int i = 0; i < m_process_number; ++i) {
// 如果进程池中第i个子进程退出了,则主进程关闭对应通信管道,并设置m_pid为-1,标记子进程已经退出
if (m_sub_process[i].m_pid == pid) {
printf("child %d join\n", i);
close(m_sub_process[i].m_pipefd[0]);
m_sub_process[i].m_pid = -1;
}
}
}
m_stop = true;
for (int i = 0; i < m_process_number; ++i) {
if (m_sub_process[i].m_pid != -1) {
m_stop = false;
}
}
break;
}
case SIGTERM:
case SIGINT: {
printf("kill all the clild now\n"); // 杀了所有子进程
for (int i = 0; i < m_process_number; ++i) {
int pid = m_sub_process[i].m_pid;
if (pid != -1) {
kill(pid, SIGTERM);
}
}
break;
}
default: {
break;
}
}
}
}
} else {
continue;
}
}
}

//close( m_listenfd ); // 由创建者关闭 见后文
close(m_epollfd);
}

#endif

此头文件需要传入cgi模板类以进行实例化线程池。模板代码见下一节。


reference:
linux高性能服务器编程——游双

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#ifndef LOCKER_H
#define LOCKER_H

#include <exception>
#include <pthread.h>
#include <semaphore.h>

class sem {
public:
sem() {
if (sem_init(&m_sem, 0, 0) != 0) {
throw std::exception();
}
}
~sem() {
sem_destroy(&m_sem);
}
bool wait() {
return sem_wait(&m_sem) == 0;
}
bool post() {
return sem_post(&m_sem) == 0;
}

private:
sem_t m_sem;
};

class locker {
public:
locker() {
if (pthread_mutex_init(&m_mutex, NULL) != 0) {
throw std::exception();
}
}

~locker() {
pthread_mutex_destroy(&m_mutex);
}

bool lock() {
return pthread_mutex_lock(&m_mutex) == 0;
}

bool unlock() {
return pthread_mutex_unlock(&m_mutex) == 0;
}

private:
pthread_mutex_t m_mutex;
};

// 封装条件变量的类
class cond {
public:
cond() {
if (pthread_mutex_init(&m_mutex, NULL) != 0) {
throw std::exception();
}
if (pthread_cond_init(&m_cond, NULL) != 0) {
pthread_mutex_destroy(&m_mutex);
throw std::exception();
}
}

~cond() {
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
}

bool wait() {
int ret = 0;
pthread_mutex_lock(&m_mutex);
ret = pthread_cond_wait(&m_cond, &m_mutex);
pthread_mutex_unlock(&m_mutex);
return ret == 0;
}

bool signal() {
return pthread_cond_signal(&m_cond) == 0;
}

private:
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};

#endif

充分复用代码供后续使用。


reference:高性能服务器编程——游双P280P_{280}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include <sys/socket.h>
#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>

static const int CONTROL_LEN = CMSG_LEN(sizeof(int));
// 发送文件描述符,fd参数适用来传递信息的unix域socket,fd_to_send参数是待发送文件的文件描述符
void send_fd(int fd, int fd_to_send) {
struct iovec iov[1]; // 单次系统调用对多个缓冲区进行读写
struct msghdr msg;
char buf[0];

iov[0].iov_base = buf;
iov[0].iov_len = 1;
msg.msg_name = NULL;
msg.msg_namelen= 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;

cmsghdr cm;
cm.cmsg_len = CONTROL_LEN;
cm.cmsg_level = SOL_SOCKET;
cm.cmsg_type = SCM_RIGHTS;
*(int *)CMSG_DATA(&cm) = fd_to_send;
// 设置辅助数据
msg.msg_control = &cm;
msg.msg_controllen = CONTROL_LEN;

sendmsg(fd, &msg, 0);
}
// 接收目标文件描述符
int recv_fd(int fd) {
struct iovec iov[1];
struct msghdr msg;
char buf[0];

iov[0].iov_base = buf;
iov[0].iov_len = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;

cmsghdr cm;
msg.msg_control = &cm;
msg.msg_controllen = CONTROL_LEN;

recvmsg(fd, &msg, 0);

int fd_to_read = *(int *)CMSG_DATA(&cm);
return fd_to_read;
}

int main()
{
int pipefd[2];
int fd_to_pass = 0;
// 创建父子进程间的管道,文件描述符fd[0] fd[1]都是unix域的socket
int ret = socketpair(PF_UNIX, SOCK_DGRAM, 0, pipefd);
assert(ret != -1);

pid_t pid = fork();
assert(pid >= 0);

if (pid == 0) { // 子进程
close(pipefd[0]);
fd_to_pass = open("test.txt", O_RDWR, 0666);
// 子进程通过管道将文件描述符发送到父进程,如果文件test.txt打开失败,则子进程将标准输入文件描述符发送到父进程
send_fd(pipefd[1], (fd_to_pass > 0) ? fd_to_pass : 0);
close(fd_to_pass);
exit(0);
}

close(pipefd[1]); // 父进程
fd_to_pass = recv_fd(pipefd[0]);
char buf[1024];
memset(buf, '\0', 1024);
read(fd_to_pass, buf, 1024);
printf("I got fd %d and data %s\n", fd_to_pass, buf);
close(fd_to_pass);
}

子进程把打开的文件描述符fd值放入一个socket中,父进程从socket中读到fd的值,最后再读fd中的内容。
运行结果:
在这里插入图片描述


reference:
linux高性能服务器编程——游双P267P_{267}

服务器代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>

#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536

struct client_data {
sockaddr_in address; // 客户端ip地址
int connfd; // socket文件描述符
pid_t pid; // 处理这个连接的子进程pid
int pipefd[2]; // 和父进程通信用的管道
};

static const char* shm_name = "/my_shm";
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;

char* share_mem = 0;

client_data* users = 0; // 客户连接数组,进程用客户连接的编号来索引这个数组

int* sub_process = 0; // 子进程和客户连接的映射关系表。用进程的pid来索引这个数组,即可取得该进程 所处理的客户连接 的编号

int user_count = 0; // 当前客户数量
bool stop_child = false;

int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

void addfd(int epollfd, int fd) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}

void sig_handler(int sig) {
int save_errno = errno;
int msg = sig;
send(sig_pipefd[1], (char*)&msg, 1, 0);
errno = save_errno;
}

void addsig(int sig, void(*handler)(int), bool restart = true) {
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler;
if (restart) {
sa.sa_flags |= SA_RESTART;
}
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}

void del_resource() {
close(sig_pipefd[0]);
close(sig_pipefd[1]);
close(listenfd);
close(epollfd);
shm_unlink(shm_name);
delete[] users;
delete[] sub_process;
}

void child_term_handler(int sig) { // 停止一个子进程
stop_child = true;
}

// 子进程运行的函数。参数idx指出该子进程处理的客户连接的编号,users是保存所有客户连接数据的数组。参数share_mem指出共享内存的起始地址
int run_child(int idx, client_data* users, char* share_mem) {
epoll_event events[MAX_EVENT_NUMBER];
// 子进程用io复用技术来同时监听两个文件描述符: 客户连接socket和与父进程的管道文件描述符
int child_epollfd = epoll_create(5);
assert(child_epollfd != -1);
int connfd = users[idx].connfd; // 客户端socket
addfd(child_epollfd, connfd);
int pipefd = users[idx].pipefd[1]; // 父进程管道文件描述符
addfd(child_epollfd, pipefd);
int ret;
// 子进程设置自己的信号处理函数
addsig(SIGTERM, child_term_handler, false);

while (!stop_child) {
int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR)) {
printf("epoll failure\n");
break;
}

for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
// 本子进程负责的客户连接有数据到达
if ((sockfd == connfd) && (events[i].events & EPOLLIN)) {
memset(share_mem + idx * BUFFER_SIZE, '\0', BUFFER_SIZE);
// 将客户数据读取到对应 读缓存 中。 读缓存是共享内存的一段,开始于 idx*BUFFER_SIZE处,长度为BUFFER_SIZE字节。因此,各个客户连接的
// 读缓存是共享的
ret = recv(connfd, share_mem + idx * BUFFER_SIZE, BUFFER_SIZE - 1, 0);
if (ret < 0) {
if (errno != EAGAIN) {
stop_child = true;
}
} else if (ret == 0) {
stop_child = true;
} else {
// 成功读取客户数据后就通知主进程(通过管道)来处理
send(pipefd, (char*)&idx, sizeof(idx), 0);
}
// 主进程通知本进程(通过管道)将第client个客户的数据发送到本进程负责处理的客户端
} else if ((sockfd == pipefd) && (events[i].events & EPOLLIN)) {
int client = 0;
// 接收主进程发送来的数据,即 有客户数据到达的连接 的编号。
ret = recv(sockfd, (char*)&client, sizeof(client), 0);
if (ret < 0) {
if (errno != EAGAIN) {
stop_child = true;
}
} else if (ret == 0) {
stop_child = true;
} else {
send(connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0); // 通知这个子进程负责的connfd,有个编号为client的客户端发消息了,把这段消息发给connfd
}
} else {
continue;
}
}
}

close(connfd);
close(pipefd);
close(child_epollfd);
return 0;
}

int main(int argc, char* argv[])
{
if (argc <= 2) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);

int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);

listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);

ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);

ret = listen(listenfd, 5);
assert(ret != -1);

user_count = 0;
users = new client_data[USER_LIMIT + 1];
sub_process = new int[PROCESS_LIMIT];
for (int i = 0; i < PROCESS_LIMIT; ++i) {
sub_process[i] = -1;
}

epoll_event events[MAX_EVENT_NUMBER];
epollfd = epoll_create(5);
assert(epollfd != -1);
addfd(epollfd, listenfd);

ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
assert(ret != -1);
setnonblocking(sig_pipefd[1]);
addfd(epollfd, sig_pipefd[0]);

addsig(SIGCHLD, sig_handler);
addsig(SIGTERM, sig_handler);
addsig(SIGINT, sig_handler);
addsig(SIGPIPE, SIG_IGN);
bool stop_server = false;
bool terminate = false;

// 创建共享内存,作为所有客户socket连接的读缓存
shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666);
assert(shmfd != -1);
ret = ftruncate(shmfd, USER_LIMIT * BUFFER_SIZE);
assert(ret != -1);

share_mem = (char*)mmap(NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0);
assert(share_mem != MAP_FAILED);
close(shmfd);

while (!stop_server) {
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR)) {
printf("epoll failure\n");
break;
}

for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
// 新的客户连接到来
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);

if (connfd < 0) {
printf("errno is: %d\n", errno);
continue;
}
if (user_count >= USER_LIMIT) {
const char* info = "too many users\n";
printf("%s", info);
send(connfd, info, strlen(info), 0);
close(connfd);
continue;
}

// 保存第user_counter个客户连接的相关数据
users[user_count].address = client_address;
users[user_count].connfd = connfd;
// 在主进程和子进程间建立管道,以传递必要数据
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd);
assert(ret != -1);
pid_t pid = fork();
if (pid < 0) {
close(connfd);
continue;
} else if (pid == 0) { // 子进程
close(epollfd);
close(listenfd);
close(users[user_count].pipefd[0]);
close(sig_pipefd[0]);
close(sig_pipefd[1]);
run_child(user_count, users, share_mem);
munmap((void*)share_mem, USER_LIMIT * BUFFER_SIZE);
exit(0);
} else { // 父进程
close(connfd);
close(users[user_count].pipefd[1]);
addfd(epollfd, users[user_count].pipefd[0]); // 监听子进程的管道
users[user_count].pid = pid;
// 记录新的客户连接在数组users中的索引值,建立进程pid和该索引值之间的映射关系
sub_process[pid] = user_count;
user_count++;
}
} else if ((sockfd == sig_pipefd[0]) && events[i].events & EPOLLIN) { // 处理信号事件
int sig;
char signals[1024];
ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
if (ret == -1) {
continue;
} else if (ret == 0) {
continue;
} else {
for (int i = 0; i < ret; ++i) {
switch(signals[i]) {
case SIGCHLD: { // 子进程推出,表示有某个客户端关闭了连接
pid_t pid;
int stat;
while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
// 用子进程的pid取得被关闭的客户连接的编号
int del_user = sub_process[pid];
sub_process[pid] = -1;
if ((del_user < 0) || (del_user > USER_LIMIT)) {
continue;
}
// 清除第del_user个客户连接使用的相关数据
epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0);
close(users[del_user].pipefd[0]);
users[del_user] = users[--user_count];
sub_process[users[del_user].pid] = del_user;
}
if (terminate && user_count == 0) {
stop_server = true;
}
break;
}
case SIGTERM:
case SIGINT: { // 结束服务器程序
printf("kill all the child now\n");
if (user_count == 0) {
stop_server = true;
break;
}
for (int i = 0; i < user_count; ++i) {
int pid = users[i].pid;
kill(pid, SIGTERM);
}
terminate = true;
break;
}
default: {
break;
}
}
}
}
} else if (events[i].events & EPOLLIN) { // 某个子进程向父进程写入了数据
int child = 0;
// 读取管道数据,child变量记录了是哪个客户连接有数据到达
ret = recv(sockfd, (char*)&child, sizeof(child), 0);
printf("read data from child accross pipe\n");
if (ret == -1) {
continue;
} else if (ret == 0) {
continue;
} else {
// 向除负责处理第child个客户连接的子进程之外的其他子进程发送消息,通知他们有客户数据要写
for (int j = 0; j < user_count; ++j) {
if (users[j].pipefd[0] != sockfd) {
printf("send data to child accross pipe\n");
send(users[j].pipefd[0], (char*)&child, sizeof(child), 0);
}
}
}
}
}
}
del_resource();
return 0;
}



server:
在这里插入图片描述


client 1:
在这里插入图片描述


client 2:
在这里插入图片描述


先让服务器监听4444端口,之后客户端1连接,客户端2连接。客户端1发送11111,发现客户端1和2都能收到。客户端2发送2222,发现客户端1和2都能收到。


reference: linux高性能服务器编程——游双P255P_{255}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <sys/sem.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>

union semun { // 此结构体为semctl系统调用的第四个参数的推荐格式,由sys/sem.h给出
int val; // 用于SETVAL命令
struct semid_ds* buf; // 用于IPC_STAT和IPC_SET命令
unsigned short int* array; // 用于GETALL和SETALL命令
struct seminfo* __buf; // 用于IPC_INFO命令
};

void pv(int sem_id, int op) { // 此函数用于操作sem_id信号集的信号量,对其做出 +op 的操作
struct sembuf sem_b; // 每一个sembuf结构体都对应一个信号量操作
sem_b.sem_num = 0; // 信号集中信号量的编号,0表示信号集中第0个信号量
sem_b.sem_op = op;
sem_b.sem_flg = SEM_UNDO; // 该标志意思为,当进程退出时取消正在进行的semop操作
semop(sem_id, &sem_b, 1); // 失败时 数组内所有操作都不执行
}

int main(int argc, char* argv[]) {
int sem_id = semget(IPC_PRIVATE, 1, 0666); // 无论信号量是否已经存在,都会创建一个 新 的信号量。IPC_NEW这个名字更合适

union semun sem_un;
sem_un.val = 1;
semctl(sem_id, 0, SETVAL, sem_un); // 第0个信号,执行SETVAL命令,参数为sum_un

pid_t id = fork();
if (id < 0) {
return 1;
} else if (id == 0) { // 子进程
printf("child try to get binary sem\n");
pv(sem_id, -1);
printf("child get the sem and would release it after 5 seconds\n");
sleep(5);
pv(sem_id, 1);
exit(0);
} else { // 父进程
printf("parent try to get binary sem\n");
pv(sem_id, -1);
printf("parent get the sem and would release it after 5 seconds\n");
sleep(5);
pv(sem_id, 1);
}
waitpid(id, NULL, 0); // 父进程等待子进程结束
semctl(sem_id, 0, IPC_RMID, sem_un); // remove id 删除信号量
return 0;
}

在这里插入图片描述


reference: linux高性能服务器编程——游双

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#ifndef MIN_HEAP
#define MIN_HEAP

#include <iostream>
#include <netinet/in.h>
#include <time.h>
using std::exception;

#define BUFFER_SIZE 64

class heap_timer;
struct client_data {
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
heap_timer* timer;
};

class heap_timer {
public:
heap_timer(int delay) {
expire = timer(NULL) + delay;
}

time_t expire;
void (*cb_func)(client_data*);
client_data* user_data;
};

class time_heap {
public:
time_heap(int cap) throw (std::exception) : capacity(cap), cur_size(0) {
array = new heap_timer* [capacity];
if (!array) {
throw std:exception();
}
for (int i = 0; i < capacity; ++i) {
array[i] = NULL;
}
}

time_heap(heap_timer** init_array, int size, int capacity) throw (std::exception) : cur_size(size), capacity(capacity) {
if (capacity < size) {
throw std::exception();
}
array = new heap_timer* [capacity];
if (!array) {
throw std::exception();
}
for (int i = 0; i < capacity; ++i) {
array[i] = NULL;
}
if (size != 0) {
for (int i = 0; i < size; ++i) {
array[i] = init_array[i];
}
for (int i = (cur_size - 1) / 2; i >= 0; --i) {
percolate_down(i);
}
}
}

~time_heap() {
for (int i = 0; i < cur_size; ++i) {
delete array[i];
}
delete[] array;
}

void add_timer(heap_timer* timer) throw (std::exception) {
if (!timer) {
return;
}
if (cur_size >= capacity) {
resize();
}
int hole = cur_size++;
int parent = 0;
for (; hole > 0; hole = parent) { // 上滤
parent = (hole - 1) / 2;
if (array[parent]->expire <= timer->expire) {
break;
}
array[hole] = array[parent];
}
array[hole] = timer;
}

void del_timer(heap_timer* timer) {
if (!timer) {
return;
}
timer->cb_func = NULL; // 所谓的延迟销毁,节省真正删除该定时器造成的开销,缺点是容易使数组膨胀
}

heap_timer* top() const {
if (empty()) {
return NULL;
}
return array[0];
}

void pop_timer() {
if (empty()) {
return;
}
if (array[0]) {
delete array[0];
array[0] = array[--cur_size];
percolate_down(0);
}
}

void tick() { // 把所有到期的事件都处理掉
heap_timer* tmp = array[0];
time_t cur = timer(NULL);
while (!empty()) {
if (!tmp) {
break;
}
if (tmp->expire > cur) {
break;
}
if (array[0]->cb_func) {
array[0]->cb_func(array[0]->user_data);
}
pop_timer();
tmp = array[0];
}
}

bool empty() const {
return cur_size == 0;
}

private:
void percolate_down(int hole) {
heap_timer* temp = array[hole];
int child = 0;
for (; ((hole * 2 + 1) <= (cur_size - 1)); hole = child) {
child = hole * 2 + 1;
if ((child < (cur_size - 1)) && (array[child + 1]->expire < array[child]->expire)) { // 右孩子比左孩子小,换右孩子
++child;
}
if (array[child]->expire < temp->expire) { // 交换操作
array[hole] = array[child];
} else {
break;
}
}
array[hole] = temp;
}

void resize() throw (std::exception) {
heap_timer** temp = new heap_timer* [2 * capacity];
for (int i = 0; i < 2 * capacity; ++i) {
temp[i] = NULL;
}
if (!temp) {
throw std::exception();
}
capacity = 2 * capacity;
for (int i = 0; i < cur_size; ++i) {
temp[i] = array[i];
}
delete[] array;
array = temp;
}

heap_timer** array;
int capacity;
int cur_size;
}

#endif

堆是一种高效的数据结构,不熟悉堆的同学先学习下堆。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
上图来自Linux高性能服务器编程——游双P211P_{211}

添加一个定时器的时间复杂度为O(log2n)O(log_2n),删除定时器复杂度为O(1)O(1),执行一个定时器时间复杂度为O(1)O(1)


reference:Linux高性能服务器编程——游双

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#ifndef TIME_WHEEL_TIMER
#define TIME_WHEEL_TIMER

#include <time.h>
#include <netinet/in.h>
#include <stdio.h>

#define BUFFER_SIZE 64
class tw_timer;
struct client_data {
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
tw_timer* timer;
};

class tw_timer {
public:
tw_timer(int rot, int ts) : next(NULL), prev(NULL), rotation(rot), time_slot(ts) {}

int rotation;
int time_slot;
void (*cb_func)(client_data*);
client_data* user_data;
tw_timer* next;
tw_timer* prev;
};

class time_wheel {
public:
time_wheel() : cur_slot(0) {
for (int i = 0; i < N; ++i) {
slots[i] = NULL;
}
}
~time_wheel() {
for (int i = 0; i < N; ++i) {
while(tmp) {
slots[i] = tmp->next;
delete tmp;
tmp = slots[i];
}
}
}

tw_timer* add_timer(int timeout) {
if (timeout < 0) {
return NULL;
}
int ticks = 0;
// 根据待插入定时器的超时值计算他在时间轮转动多少滴答后触发,并将该滴答数存储变量tics中。若待插入定时器超时值小于槽间隔SI,将tics向上折合为1.否则向下折合为timeout/SI
if (timeout < SI) {
ticks = 1;
} else {
ticks = timeout / SI;
}
int rotation = ticks / N; // 计算待插入的定时器转动多少圈后触发
int ts = (cur_slot + (ticks % N)) % N; // 定时器应该被插入哪个槽
tw_timer* timer = new tw_timer(rotation, ts);

if (!slots[ts]) { // 没有定时器,把新建的定时器插入,并设置为头节点
printf("add timer, rotation is %d, ts is %d, cur_slot is %d\n", rotation, ts, cur_slot);
slots[ts] = timer;
} else { // 将定时器插入第ts个槽中
timer->next = slots[ts];
slots[ts]->prev = timer;
slots[ts] = timer;
}
return timer;
}

void del_timer(tw_timer* timer) {
if (!timer) {
return;
}
int ts = timer->time_slot;
if (timer == slots[ts]) { // 是头节点
slots[ts] = slots[ts]->next;
if (slots[ts]) {
slots[ts]->prev = NULL;
}
delete timer;
} else {
timer->prev->next = timer->next;
if (timer->next) {
timer->next->prev = timer->prev;
}
delete timer;
}
}

void tick() {
tw_timer* tmp = slots[cur_slot];
printf("current slot is %d\n", cur_slot);
while (tmp) {
printf("tick the timer once\n");
if (tmp->rotation > 0) { // ratation大于0,这一轮不起作用
tmp->rotation--;
tmp = tmp->next;
} else { // 到期了,执行定时任务并且删除定时器
tmp->cb_func(tmp->user_data);
if (tmp == slots[cur_slot]) {
printf("delete header in cur_slot\n");
slots[cur_slot] = tmp->next;
delete tmp;
if (slots[cur_slot]) {
slots[cur_slot]->prev = NULL;
}
tmp = slots[cur_slot];
} else {
tmp->prev->next = tmp->next;
if (tmp->next) {
tmp->next->prev = tmp->prev;
}
tw_timer* tmp2 = tmp->next;
delete tmp;
tmp = tmp2;
}
}
}
cur_slot = ++cur_slot % N;
}
private:
static const int N = 60; // 时间轮上槽的数目
static const int SI = 1; // 每1s时间轮转动一次,槽间隔为1s
tw_timer* slots[N]; // 每个元素指向一个定时器链表,无序
int cur_slot;
}

#endif

结构如图所示: 在这里插入图片描述
图片来自高性能服务器编程——游双
time slot=(current slot+(time interval/slot interval)) % Ntime\ slot = (current\ slot + (time\ interval / slot\ interval)) \ \%\ N
其中,time slottime\ slot为需要添加到的时间槽,current slotcurrent\ slot为现在时间轮所处的时间槽,time intervaltime\ interval为从现在开始计时,需要多久触发执行任务的时间,slot intervalslot\ interval为每个时间槽所代表的时间。NN为时间槽总数。
想要提高定时精度,使slot intervalslot\ interval值变小。提高执行效率,增大NN

添加定时器时间复杂度O(1)O(1),执行定时器时间复杂度O(N)O(N),实际上效率比O(N)O(N)快,因为不同定时器被散列到不同的链表上了。当用多个时间轮时,该时间复杂度近似为O(1)O(1)


reference:高性能linux服务器编程——游双

其中依赖的lst_timer.h见上篇文章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#include "lst_timer.h"

#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define TIMESLOT 5
static int pipefd[2];

static sort_timer_lst timer_lst;
static int epollfd = 0;

int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

void addfd(int epollfd, int fd)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}

void sig_handler(int sig) // 将信号发往pipefd[1]
{
int save_errno = errno;
int msg = sig;
send(pipefd[1], (char*)&msg, 1, 0);
errno = save_errno;
}

void addsig(int sig)
{
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = sig_handler;
sa.sa_flags |= SA_RESTART; // 重新调用被该信号终止的系统调用 P182
sigfillset(&sa.sa_mask); // 在信号集中设置所有信号
assert(sigaction(sig, &sa, NULL) != -1);
}

void timer_handler()
{
timer_lst.tick();
alarm(TIMESLOT);
}

void cb_func(client_data* user_data)
{
epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0); // 把user_data->sockfd从监听的fd中删除
assert(user_data);
close(user_data->sockfd);
printf("close fd %d\n", user_data->sockfd);
}

int main(int argc, char* argv[])
{
if (argc <= 2) {
printf("usage:%s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);

int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);

int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);

ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);

ret = listen(listenfd, 5);
assert(ret != -1);

epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd != -1);
addfd(epollfd, listenfd);

ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); // 双向管道
assert(ret != -1);
setnonblocking(pipefd[1]);
addfd(epollfd, pipefd[0]);

addsig(SIGALRM);
addsig(SIGTERM);
bool stop_server = false;

client_data* users = new client_data[FD_LIMIT];
bool timeout = false;
alarm(TIMESLOT); // 定时

while (!stop_server) {
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR)) { // EINTR代表临时性失败,进行系统调用时执行信号处理函数去了。 再次调用有可能成功
printf("epoll failure\n");
break;
}

for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd);
users[connfd].address = client_address;
users[connfd].sockfd = connfd;

util_timer* timer = new util_timer; // 加入定时器
timer->user_data = &users[connfd];
timer->cb_func = cb_func;
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
users[connfd].timer = timer;
timer_lst.add_timer(timer);
} else if ((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) { // 处理信号
int sig;
char signals[1024];
ret = recv(pipefd[0], signals, sizeof(signals), 0);
if (ret == -1) {
continue;
} else if (ret == 0) {
continue;
} else {
for (int i = 0; i < ret; ++i) {
switch(signals[i]) {
case SIGALRM: {
printf("SIGALARM has been triggered!\n");
timeout = true; // 标记有定时任务要处理
break;
}
case SIGTERM: {
stop_server = true;
}
}
}
}
} else if (events[i].events & EPOLLIN) { // 处理接收数据
memset(users[sockfd].buf, '\0', BUFFER_SIZE);
ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0);
printf("get %d bytes of clinet data %s from %d\n", ret, users[sockfd].buf, sockfd);
util_timer* timer = users[sockfd].timer;
if (ret < 0) {
if (errno != EAGAIN) { // 非阻塞系统调用由于资源限制,意思很明显,让你再次尝试
cb_func(&users[sockfd]); // 发生读错误,sockfd可以删了
if (timer) {
timer_lst.del_timer(timer);
}
}
} else if (ret == 0) { // 对方关闭连接 服务器也关闭连接
cb_func(&users[sockfd]);
if (timer) {
timer_lst.del_timer(timer);
}
} else {
if (timer) {
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
printf("adjust timer once\n");
timer_lst.adjust_timer(timer);
}
}
} else {

}
}
if (timeout) { // 处理定时事件(清理链表中超时的事件),优先级比io低。
timer_handler();
timeout = false;
}
}

close(listenfd);
close(pipefd[1]);
close(pipefd[0]);
delete[] users;
return 0;

}

每过五秒就会发送一个SIGALRM信号,每个客户端和服务器的过期连接时常是3个timeslot,超时后服务器会自动断开与客户端的连接。每次客户端向服务器发送信息后,会调用adjust_timer来重新调整这个连接的到期时间,至发送信息后+三个timeslot。


服务器:
在这里插入图片描述


客户端:
在这里插入图片描述


reference: Linux高性能服务器编程——游双

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#ifndef LST_TIMER
#define LST_TIMER

#include <time.h>
#define BUFFER_SIZE 64
class util_timer;

struct client_data {
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
util_timer* timer;
};

class util_timer {
public:
util_timer() : prev(NULL), next(NULL){}
time_t expire;
void (*cb_func)(client_data*);
client_data* user_data;
util_timer* prev;
util_timer* next;
};

class sort_timer_lst
{
public:
sort_timer_lst() : head(NULL), tail(NULL) {}
~sort_timer_lst() {
util_timer* tmp = head;
while (tmp) {
head = tmp->next;
delete tmp;
tmp = head;
}
}

void add_timer(util_timer* timer) {
if (!timer) {
return;
}
if (!head) {
head = tail = timer;
return;
}

if (timer->expire < head->expire) {
timer->next = head;
head->prev = timer;
head = timer;
return;
}
add_timer(timer, head);
}

void adjust_timer(util_timer* timer) {
if (!timer) {
return;
}
util_timer* tmp = timer->next;
if (!tmp || (timer->expire < tmp->expire)) {
return;
}
if (timer == head) {
head = head->next;
head->prev = NULL;
timer->next = NULL;
add_timer(timer, head);
} else {
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
add_timer(timer, timer->next);
}
}

void del_timer(util_timer* timer) {
if (!timer) {
return;
}
if ((timer == head) && (timer == tail)) {
delete timer;
head = NULL;
tail = NULL;
return;
}

if (timer == head) {
head = head->next;
head->prev = NULL;
delete timer;
return;
}

if (timer == tail) {
tail = tail->prev;
tail->next = NULL;
delete timer;
return;
}

timer->prev->next = timer->next;
timer->next->prev = timer->prev;
delete timer;
}

void tick() {
if (!head) {
return;
}
printf("timer tick\n");
time_t cur = time(NULL);
util_timer* tmp = head;

while (tmp) {
if (cur < tmp->expire) {
break;
}
tmp->cb_func(tmp->user_data);
head = tmp->next;
if (head) {
head->prev = NULL;
}
delete tmp;
tmp = head;
}
}
private:
void add_timer(util_timer* timer, util_timer* lst_head) {
util_timer* prev = lst_head;
util_timer* tmp = prev->next;
while (tmp) {
if (timer->expire < tmp->expire) {
prev->next = timer;
timer->next = tmp;
tmp->prev = timer;
timer->prev = prev;
break;
}
prev = tmp;
tmp = tmp->next;
}

if (!tmp) {
prev->next = timer;
timer->prev = prev;
timer->next = NULL;
tail = timer;
}
}

util_timer* head;
util_timer* tail;
};

#endif /* LST_TIMER end */

此文件被包含在头文件中以便方便使用。tick为心跳函数,每隔一段时间执行一次。判断任务到期为定时器expire参数小于当前系统时间。
执行时间复杂度:
添加O(n)O(n),删除O(1)O(1),执行任务O(1)O(1)


reference:
Linux高性能服务器编程——游双

带外数据

带外数据用于迅速告知对方本端发生的重要的事件。它比普通的数据(带内数据)拥有更高的优先级,不论发送缓冲区中是否有排队等待发送的数据,它总是被立即发送。带外数据的传输可以使用一条独立的传输层连接,也可以映射到传输普通数据的连接中。

SIGURG信号的作用

在linux环境下,内核通知应用程序带外数据到达的方式有两种:

  1. 一种就是利用I/O复用技术的系统调用(如select)在接受到带外数据时将返回,并向应用程序报告socket上的异常事件。
  2. 另一种方法就是使用SIGURG信号。(下面代码)

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <fcntl.h>

#define BUF_SIZE 1024

static int connfd;

void sig_urg(int sig)
{
int save_errno = errno;
char buffer[BUF_SIZE];
memset(buffer, '\0', BUF_SIZE);
int ret = recv(connfd, buffer, BUF_SIZE - 1, MSG_OOB); // 接收带外数据
printf("got %d bytes of oob data '%s\n' ", ret, buffer);
errno = save_errno;
}

void addsig (int sig, void (* sig_handler) (int))
{
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = sig_handler;
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}

int main(int argc, char* argv[])
{
if (argc <= 2)
{
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);

struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);

int sock = socket(PF_INET, SOCK_STREAM, 0);
assert(socket >= 0);

int ret = bind(sock, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);

ret = listen(sock, 5);
assert(ret != -1);

struct sockaddr_in client;
socklen_t client_addrlength = sizeof(client);
connfd = accept(sock, (struct sockaddr*)&client, &client_addrlength);
if (connfd < 0) {
printf("errno is: %d\n", errno);
} else {
addsig(SIGURG, sig_urg);
fcntl(connfd, F_SETOWN, getpid()); // 设置SIGURG信号之前,我们必须设置socket的宿主进程或进程组

char buffer[BUF_SIZE];
while (1) {
memset(buffer, '\0', BUF_SIZE);
ret = recv(connfd, buffer, BUF_SIZE - 1, 0);
if (ret <= 0) {
break;
}
printf("got %d bytes of normal data '%s\n", ret, buffer);
}
close(connfd);
}
close(sock);
return 0;
}

客户端(本机模拟发送SIGURG)
1


服务端反应
2


reference:
linux高性能服务器编程——游双