基于共享内存的聊天室服务程序

服务器代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>

#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536

struct client_data {
sockaddr_in address; // 客户端ip地址
int connfd; // socket文件描述符
pid_t pid; // 处理这个连接的子进程pid
int pipefd[2]; // 和父进程通信用的管道
};

static const char* shm_name = "/my_shm";
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;

char* share_mem = 0;

client_data* users = 0; // 客户连接数组,进程用客户连接的编号来索引这个数组

int* sub_process = 0; // 子进程和客户连接的映射关系表。用进程的pid来索引这个数组,即可取得该进程 所处理的客户连接 的编号

int user_count = 0; // 当前客户数量
bool stop_child = false;

int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

void addfd(int epollfd, int fd) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}

void sig_handler(int sig) {
int save_errno = errno;
int msg = sig;
send(sig_pipefd[1], (char*)&msg, 1, 0);
errno = save_errno;
}

void addsig(int sig, void(*handler)(int), bool restart = true) {
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler;
if (restart) {
sa.sa_flags |= SA_RESTART;
}
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}

void del_resource() {
close(sig_pipefd[0]);
close(sig_pipefd[1]);
close(listenfd);
close(epollfd);
shm_unlink(shm_name);
delete[] users;
delete[] sub_process;
}

void child_term_handler(int sig) { // 停止一个子进程
stop_child = true;
}

// 子进程运行的函数。参数idx指出该子进程处理的客户连接的编号,users是保存所有客户连接数据的数组。参数share_mem指出共享内存的起始地址
int run_child(int idx, client_data* users, char* share_mem) {
epoll_event events[MAX_EVENT_NUMBER];
// 子进程用io复用技术来同时监听两个文件描述符: 客户连接socket和与父进程的管道文件描述符
int child_epollfd = epoll_create(5);
assert(child_epollfd != -1);
int connfd = users[idx].connfd; // 客户端socket
addfd(child_epollfd, connfd);
int pipefd = users[idx].pipefd[1]; // 父进程管道文件描述符
addfd(child_epollfd, pipefd);
int ret;
// 子进程设置自己的信号处理函数
addsig(SIGTERM, child_term_handler, false);

while (!stop_child) {
int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR)) {
printf("epoll failure\n");
break;
}

for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
// 本子进程负责的客户连接有数据到达
if ((sockfd == connfd) && (events[i].events & EPOLLIN)) {
memset(share_mem + idx * BUFFER_SIZE, '\0', BUFFER_SIZE);
// 将客户数据读取到对应 读缓存 中。 读缓存是共享内存的一段,开始于 idx*BUFFER_SIZE处,长度为BUFFER_SIZE字节。因此,各个客户连接的
// 读缓存是共享的
ret = recv(connfd, share_mem + idx * BUFFER_SIZE, BUFFER_SIZE - 1, 0);
if (ret < 0) {
if (errno != EAGAIN) {
stop_child = true;
}
} else if (ret == 0) {
stop_child = true;
} else {
// 成功读取客户数据后就通知主进程(通过管道)来处理
send(pipefd, (char*)&idx, sizeof(idx), 0);
}
// 主进程通知本进程(通过管道)将第client个客户的数据发送到本进程负责处理的客户端
} else if ((sockfd == pipefd) && (events[i].events & EPOLLIN)) {
int client = 0;
// 接收主进程发送来的数据,即 有客户数据到达的连接 的编号。
ret = recv(sockfd, (char*)&client, sizeof(client), 0);
if (ret < 0) {
if (errno != EAGAIN) {
stop_child = true;
}
} else if (ret == 0) {
stop_child = true;
} else {
send(connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0); // 通知这个子进程负责的connfd,有个编号为client的客户端发消息了,把这段消息发给connfd
}
} else {
continue;
}
}
}

close(connfd);
close(pipefd);
close(child_epollfd);
return 0;
}

int main(int argc, char* argv[])
{
if (argc <= 2) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);

int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);

listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);

ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);

ret = listen(listenfd, 5);
assert(ret != -1);

user_count = 0;
users = new client_data[USER_LIMIT + 1];
sub_process = new int[PROCESS_LIMIT];
for (int i = 0; i < PROCESS_LIMIT; ++i) {
sub_process[i] = -1;
}

epoll_event events[MAX_EVENT_NUMBER];
epollfd = epoll_create(5);
assert(epollfd != -1);
addfd(epollfd, listenfd);

ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
assert(ret != -1);
setnonblocking(sig_pipefd[1]);
addfd(epollfd, sig_pipefd[0]);

addsig(SIGCHLD, sig_handler);
addsig(SIGTERM, sig_handler);
addsig(SIGINT, sig_handler);
addsig(SIGPIPE, SIG_IGN);
bool stop_server = false;
bool terminate = false;

// 创建共享内存,作为所有客户socket连接的读缓存
shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666);
assert(shmfd != -1);
ret = ftruncate(shmfd, USER_LIMIT * BUFFER_SIZE);
assert(ret != -1);

share_mem = (char*)mmap(NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0);
assert(share_mem != MAP_FAILED);
close(shmfd);

while (!stop_server) {
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR)) {
printf("epoll failure\n");
break;
}

for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
// 新的客户连接到来
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);

if (connfd < 0) {
printf("errno is: %d\n", errno);
continue;
}
if (user_count >= USER_LIMIT) {
const char* info = "too many users\n";
printf("%s", info);
send(connfd, info, strlen(info), 0);
close(connfd);
continue;
}

// 保存第user_counter个客户连接的相关数据
users[user_count].address = client_address;
users[user_count].connfd = connfd;
// 在主进程和子进程间建立管道,以传递必要数据
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd);
assert(ret != -1);
pid_t pid = fork();
if (pid < 0) {
close(connfd);
continue;
} else if (pid == 0) { // 子进程
close(epollfd);
close(listenfd);
close(users[user_count].pipefd[0]);
close(sig_pipefd[0]);
close(sig_pipefd[1]);
run_child(user_count, users, share_mem);
munmap((void*)share_mem, USER_LIMIT * BUFFER_SIZE);
exit(0);
} else { // 父进程
close(connfd);
close(users[user_count].pipefd[1]);
addfd(epollfd, users[user_count].pipefd[0]); // 监听子进程的管道
users[user_count].pid = pid;
// 记录新的客户连接在数组users中的索引值,建立进程pid和该索引值之间的映射关系
sub_process[pid] = user_count;
user_count++;
}
} else if ((sockfd == sig_pipefd[0]) && events[i].events & EPOLLIN) { // 处理信号事件
int sig;
char signals[1024];
ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
if (ret == -1) {
continue;
} else if (ret == 0) {
continue;
} else {
for (int i = 0; i < ret; ++i) {
switch(signals[i]) {
case SIGCHLD: { // 子进程推出,表示有某个客户端关闭了连接
pid_t pid;
int stat;
while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
// 用子进程的pid取得被关闭的客户连接的编号
int del_user = sub_process[pid];
sub_process[pid] = -1;
if ((del_user < 0) || (del_user > USER_LIMIT)) {
continue;
}
// 清除第del_user个客户连接使用的相关数据
epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0);
close(users[del_user].pipefd[0]);
users[del_user] = users[--user_count];
sub_process[users[del_user].pid] = del_user;
}
if (terminate && user_count == 0) {
stop_server = true;
}
break;
}
case SIGTERM:
case SIGINT: { // 结束服务器程序
printf("kill all the child now\n");
if (user_count == 0) {
stop_server = true;
break;
}
for (int i = 0; i < user_count; ++i) {
int pid = users[i].pid;
kill(pid, SIGTERM);
}
terminate = true;
break;
}
default: {
break;
}
}
}
}
} else if (events[i].events & EPOLLIN) { // 某个子进程向父进程写入了数据
int child = 0;
// 读取管道数据,child变量记录了是哪个客户连接有数据到达
ret = recv(sockfd, (char*)&child, sizeof(child), 0);
printf("read data from child accross pipe\n");
if (ret == -1) {
continue;
} else if (ret == 0) {
continue;
} else {
// 向除负责处理第child个客户连接的子进程之外的其他子进程发送消息,通知他们有客户数据要写
for (int j = 0; j < user_count; ++j) {
if (users[j].pipefd[0] != sockfd) {
printf("send data to child accross pipe\n");
send(users[j].pipefd[0], (char*)&child, sizeof(child), 0);
}
}
}
}
}
}
del_resource();
return 0;
}



server:
在这里插入图片描述


client 1:
在这里插入图片描述


client 2:
在这里插入图片描述


先让服务器监听4444端口,之后客户端1连接,客户端2连接。客户端1发送11111,发现客户端1和2都能收到。客户端2发送2222,发现客户端1和2都能收到。


reference: linux高性能服务器编程——游双P255P_{255}