处理非活动连接

其中依赖的lst_timer.h见上篇文章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#include "lst_timer.h"

#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define TIMESLOT 5
static int pipefd[2];

static sort_timer_lst timer_lst;
static int epollfd = 0;

int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

void addfd(int epollfd, int fd)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}

void sig_handler(int sig) // 将信号发往pipefd[1]
{
int save_errno = errno;
int msg = sig;
send(pipefd[1], (char*)&msg, 1, 0);
errno = save_errno;
}

void addsig(int sig)
{
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = sig_handler;
sa.sa_flags |= SA_RESTART; // 重新调用被该信号终止的系统调用 P182
sigfillset(&sa.sa_mask); // 在信号集中设置所有信号
assert(sigaction(sig, &sa, NULL) != -1);
}

void timer_handler()
{
timer_lst.tick();
alarm(TIMESLOT);
}

void cb_func(client_data* user_data)
{
epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0); // 把user_data->sockfd从监听的fd中删除
assert(user_data);
close(user_data->sockfd);
printf("close fd %d\n", user_data->sockfd);
}

int main(int argc, char* argv[])
{
if (argc <= 2) {
printf("usage:%s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);

int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);

int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);

ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);

ret = listen(listenfd, 5);
assert(ret != -1);

epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd != -1);
addfd(epollfd, listenfd);

ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); // 双向管道
assert(ret != -1);
setnonblocking(pipefd[1]);
addfd(epollfd, pipefd[0]);

addsig(SIGALRM);
addsig(SIGTERM);
bool stop_server = false;

client_data* users = new client_data[FD_LIMIT];
bool timeout = false;
alarm(TIMESLOT); // 定时

while (!stop_server) {
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR)) { // EINTR代表临时性失败,进行系统调用时执行信号处理函数去了。 再次调用有可能成功
printf("epoll failure\n");
break;
}

for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd);
users[connfd].address = client_address;
users[connfd].sockfd = connfd;

util_timer* timer = new util_timer; // 加入定时器
timer->user_data = &users[connfd];
timer->cb_func = cb_func;
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
users[connfd].timer = timer;
timer_lst.add_timer(timer);
} else if ((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) { // 处理信号
int sig;
char signals[1024];
ret = recv(pipefd[0], signals, sizeof(signals), 0);
if (ret == -1) {
continue;
} else if (ret == 0) {
continue;
} else {
for (int i = 0; i < ret; ++i) {
switch(signals[i]) {
case SIGALRM: {
printf("SIGALARM has been triggered!\n");
timeout = true; // 标记有定时任务要处理
break;
}
case SIGTERM: {
stop_server = true;
}
}
}
}
} else if (events[i].events & EPOLLIN) { // 处理接收数据
memset(users[sockfd].buf, '\0', BUFFER_SIZE);
ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0);
printf("get %d bytes of clinet data %s from %d\n", ret, users[sockfd].buf, sockfd);
util_timer* timer = users[sockfd].timer;
if (ret < 0) {
if (errno != EAGAIN) { // 非阻塞系统调用由于资源限制,意思很明显,让你再次尝试
cb_func(&users[sockfd]); // 发生读错误,sockfd可以删了
if (timer) {
timer_lst.del_timer(timer);
}
}
} else if (ret == 0) { // 对方关闭连接 服务器也关闭连接
cb_func(&users[sockfd]);
if (timer) {
timer_lst.del_timer(timer);
}
} else {
if (timer) {
time_t cur = time(NULL);
timer->expire = cur + 3 * TIMESLOT;
printf("adjust timer once\n");
timer_lst.adjust_timer(timer);
}
}
} else {

}
}
if (timeout) { // 处理定时事件(清理链表中超时的事件),优先级比io低。
timer_handler();
timeout = false;
}
}

close(listenfd);
close(pipefd[1]);
close(pipefd[0]);
delete[] users;
return 0;

}

每过五秒就会发送一个SIGALRM信号,每个客户端和服务器的过期连接时常是3个timeslot,超时后服务器会自动断开与客户端的连接。每次客户端向服务器发送信息后,会调用adjust_timer来重新调整这个连接的到期时间,至发送信息后+三个timeslot。


服务器:
在这里插入图片描述


客户端:
在这里插入图片描述


reference: Linux高性能服务器编程——游双