为什么要零拷贝

传统的 IO 拷贝在计算机中拷贝次数太多,速度太慢,零拷贝可以减少拷贝次数,增加系统性能。另外,零拷贝并不是指没有进行文件的拷贝,只是减少了拷贝的次数。

DMA

直接内存访问(Direct Memory Access)是一种执行 I/O 的工作方式。在这种方式中,DMA 控制器从 CPU 完全接管对总线的控制。这意味着数据交换不经过 CPU,而直接在内存和 I/O 设备之间进行 。DMA 方式一般用于高速传送成组数据。DMA 控制器将向内存发出地址和控制信号,修改地址,对传送的字的个数计数,并且以中断方式向 CPU 报告传送操作的结束。

DMA 方式的主要优点是速度快。由于 CPU 根本不参加传送操作,因此就省去了 CPU 取指令、取数、送数等操作。在数据传送过程中,没有保存现场、恢复现场之类的工作。内存地址修改、传送字个数的计数等等,也不是由软件实现,而是用硬件线路直接实现的。所以 DMA 方式能满足高速 I/O 设备的要求,也有利于 CPU 效率的发挥。

用户态和内核态

在 Linux 系统中,有的程序权限很高,可以访问计算机的任何资源,但是有的程序权限就低,只能访问部分资源。这两个类型的程序,就可以映射为用户态和内核态。内核态是计算机的核心,可以访问计算机的任何资源,如网卡、硬盘。为了安全,CPU 不能让用户程序肆无忌惮的访问计算机的任何资源,这样如果用户程序不稳定可能会造成系统崩溃,因此才有的用户态。

  • 内核态:可以访问内存的所有数据,包括外围设备,例如硬盘,网卡,cpu 也可以将自己从一个程序切换到另一个程序。
  • 用户态:只能受限的访问内存,且不允许访问外围设备,占用 cpu 的能力被剥夺,cpu 资源可以被其他程序获取。

综上所述,在 CPU 想要读取硬盘文件的时候,需要从用户态切换为内核态,才有权限。读取完成之后,为了程序安全,需要从内核态切换为用户态。

普通拷贝

1
1-1

在普通的拷贝时,流程如下:

  1. 切换到内核态,先到内核态查询内核缓冲区,如果内核缓冲区有数据,则可以直接拷贝到用户空间中。如果内核缓冲区没有,则 CPU 会让 DMA 加载到内核空间中。这里就会有一次 DMA 拷贝。
  2. 拷贝到内核缓冲区之后,CPU 将会从内核缓冲区拷贝走。这是一次 CPU 拷贝。拷贝完成切换到用户态。
  3. 写数据的时候,再次切换到内核态。切换完成之后,写到 socket 缓冲区。写完之后,切换到用户态。
  4. DMA 通过异步的方式将 socket 缓冲区的数据通过网卡发送到对端。

这种普通的IO。总共有 4 次 CPU 切换(上图蓝色)。分别是:读 2 次、写 2 次。

4 次文件拷贝,分别是:

  1. 文件从硬盘到内核空间
  2. 内核空间到 CPU
  3. CPU 到 socket 缓冲区
  4. socket 缓冲区到网卡。

零拷贝

mmap

2
mmap 是零拷贝的一种方式通过虚拟内存的方式实现。也就是说用户空间和内核空间使用同一个物理地址。这样,文件就不在需要经过用户空间。可以从内核缓冲区直接复制到 socket 缓冲区。减少了一次文件拷贝。流程如下:

  1. 调用 mmap,将一块用户空间映射到内核空间。此时进入内核态。DMA 把数据加载到内核空间,这是一次 DMA 拷贝。
  2. 切换回用户态。
  3. 调用 write,向用户空间写数据。数据直接从内核缓冲区写入到 socket 缓冲区。这是一次 CPU 拷贝。
  4. 返回用户态。DMA 异步的将 socket 缓冲区内容通过网卡发送到对端。

共计 3 次内存拷贝,4 次用户内核态切换。

sendfile

3
3-1
sendfile 函数可以在两个文件描述符之间传递数据(完全在内核中操作),从而避免了内核缓冲区和用户缓冲区之间的数据拷贝。流程如下:

  1. 系统调用 sendfile() 通过 DMA 把硬盘数据拷贝到内核缓冲区,这是一次 DMA 拷贝。
  2. 然后数据被内核拷贝到另外一个与 socket 相关的 socket 缓冲区。这里没有用户态和核心态之间的切换,在内核中直接完成了从一个缓冲区到另一个缓冲区的拷贝。这里虽说是写了 cpu 复制,但是如果网卡支持 scatter-gather ,并没有直接复制内容,而是复制了一些 offset 和 length 之类的数据到 socket 缓冲区。如下图:
    3-2
  3. DMA 异步将 socket 缓冲区内容发送给对端,是一次 DMA 拷贝。如果网卡支持 scatter-gather,那么就直接从内核缓冲区直接 DMA 拷贝到网卡驱动。

共计 3 次或 2 次内存拷贝,2 次用户内核态切换。

进程内存布局

1
在 x86_64 架构的 32 位操作系统中,linux 的进程内存布局如图所示。这个是进程的虚拟地址空间,这些虚拟地址通过页表映射到物理内存。页表由操作系统维护,由处理器引用。每一个进程都有一个自己的页表。内核也是一个特殊的进程,因为虚拟地址被使能会应用于所有软件,所以内核需要在每个进程的地址空间中都保留一部分虚拟地址专门给内核使用。

内核空间

从 0xC0000000 到 0xFFFFFFFF 这 1G 的空间是内核空间,而 0x00000000 到 0xBFFFFFFF 是用户空间。用户空间无法直接访问内核的虚拟内存空间,仅能通过系统调用来进入内核态,从而来访问内核空间的内存地址。只要用户态的程序试图访问这些页,就会导致一个页错误(page fault)。在 linux 中,内核空间持续存在,并且所有进程中都映射到同样的物理内存。内核的代码和数据总是可以被寻址的,因为随时为系统调用和中断做准备。另外,用户进程也是无法访问 0x00000000 ~ 0x08048000 这一段虚拟内存地址的,在这段地址上有诸多例如 C 库,动态加载器如 ld.so 等的映射地址。 如果用户进程访问到该区间会返回段错误。

用户空间

在用户空间的最顶部的部分被叫做栈空间,它一般用于存放函数参数或局部变量。例如:调用一个函数会将函数参数压入到栈空间中,在函数返回时,参数会被栈弹出清理。进程中的每一个线程都有属于自己的栈

mmap

在栈的低一段便是 mmap,mmap 是一种高效便捷的文件 I/O 方式。内核将文件内容映射在此段内存中,例如加载动态链接库。另外,在 linux 中,如果你通过 malloc 申请一块大于 MMAP_THRESHOLD(默认大小是 128KB)大小的堆空间时,glibc 会返回一块匿名的 mmap 内存块而非一块堆内存。

堆同栈一样,都是为进程运行提供动态的内存分配,但其和栈的的一个很大区别在于堆上内存的生命周期和执行分配的函数的生命周期不同,堆上分配的内存只有在对应进程通过系统调用主动释放进程结束后才会释放。堆的内存分配效率比栈要低得多。因为栈是由操作系统提供管理的,会在底层堆栈提供支持,分配专门的寄存器($esp)存放栈的地址,包括压栈出栈也都有专门的指令执行,所以执行效率很高。而堆则是由 C 函数库提供支持,它的机制相对复杂,例如分配一块内存,库函数会按照一定的算法在堆内存空间中搜索可用的足够大的内存空间,如果没有足够大的连续空间,则需要操作系统来重新整理堆内存,这样才有机会分到足够大小的空间,然后才返回。对于堆来说,频繁的 malloc/free(new/delete)势必会造成内存空间的不连续,从而造成大量的内存碎片,程序的运行效率降低。而对于栈来说,分配的一定是连续的内存空间。

BSS段

堆段再往下便是 BSS 段这个静态内存区域,它是用来存储静态局部静态全局变量的,其在编译期间便决定了虚拟内存的消耗,BSS 段存放的是未初始化的变量。另外根据 C 语言标准规定,未初始化的静态成员变量的初始值必须为 0,所以内核在加载二进制文件后执行程序前会将 BSS 段清 0。

DATA段

DATA 段也是个静态内存区域,也是用来存储静态局部或静态全局变量。但是放的是已经初始化的变量。

代码段

DATA 段再往下便是代码段,这段中存有程序的指令代码。TEXT 段是通过只读的方式加载到内存中的,它可以在多个进程中被安全共享。

什么是树状数组

有这样一个问题,给定一个数组,求它的前 nn 项和。
1
最简单粗暴的方式,就是一个个加起来,存到一个大小为 nn 的数组里面,时间复杂度为 O(n)O(n)
但是有一个数变了,由 b1b_1 变为了 b2b_2,需要更新上述的数组,那么更改的时间复杂度为 O(n)O(n),因为为了维护上述数组的性质,需要在变更的数之后的所有数都给加上这个变化 b2b1b_2 - b_1。求和的时间复杂度为 O(1)O(1),因为拿到前缀和之后数组的结果就是答案了。
如果这个数组要被频繁修改,那这种方式效率就很低下了。有没有更快的方式呢?

树状数组的雏形

我们可以做以下处理:
2
首先将每两个数的和都记录下来,那么求和的时候就可以只求 (n/2)(n/2) 次了。那么再对第二层的数组再做一个两两求和…次数就变为 (n/4)(n/4) 次了。以此类推。最后就变成上图这个样子了。如果是这样的话,修改一个数,我们只需要更改 log2nlog_2n个数就可以了!
仔细观察上述数组,我们发现有一些数字是根本用不上的。第 ii 层的偶数个数都用不上。那么他就会变成这个样子:
3
这样空间复杂度就从 O(n2)O(n^2) 退化为 O(n)O(n) 了。这其中的每个数代表一段区间内数字的和。

lowerbits

在介绍树状数组的操作之前,先引入一个函数,这个函数代码如下:

1
2
3
inline int lowerbits(int x) {
return x & (-x);
}

这个函数的作用是将一个数的二进制格式中最低位的1单独拿出来,比如:

1
2
binary(12) = 00001100   (就展示最低的一个字节)
lowerbits(12) = 00000100 (取最后一个1之后的部分)

这个函数在树状数组中有什么作用呢?lowerbits(x)lowerbits(x)指代的是下标为 xx 的树状数组元素所代表的原数组的区间长度。如图:
4
lowerbits(12)=4lowerbits(12) = 4 代表树状数组 1212 下标(从 11 开始)所代表的原数组中 44 个数字元素的和。即树状数组元素 1010 是原数组 3,1,2,43,1,2,4 的和。

用树状数组的求和

5
顺着图上的树形结构依次往左上角求和。如图所示。求前 1414 项和的值等于 b[14]+b[12]+b[8]b[14] + b[12] + b[8]
代码如下:

1
2
3
4
5
6
7
8
int count(int n) {
int res = 0;
while (n > 0) {
res += b[n];
n -= lowerbits(n);
}
return res;
}

由此可见,利用树状数组求和的时间复杂度是 O(log2n)O(log_2n)

更新树状数组

6
更新操作则是把所有包含了新数字的树状数组元素统统更新一遍,如图中红线所示。顺着树形结构向右上角更新。

1
2
3
4
5
6
7
// delta 是第 i 位数字的变化值
void add(int i, int delta) {
while (i < N) {
i += delta;
i += lowerbits(i);
}
}

由此可见,更新时间复杂度为 O(log2n)O(log_2n),比最开始的方式快了很多。

初始化树状数组

以下表示中 a[i]a[i] 为原数组元素,b[i]b[i] 为树状数组元素。

直接初始化为0,并且对每个点调用add。

1
2
3
4
5
void init() {
for (int i = 1; i < N; i++) {
add(i, a[i]);
}
}

时间复杂度 O(nlog2n)O(nlog_2n)


使用前缀和辅助数组

1
2
3
4
5
6
void init() {
for (int i = 1; i < N; i++) {
pre[i] = pre[i - 1] + a[i];
b[i] = pre[i] - pre[i - lowerbits(i)];
}
}

这种方式时间复杂度为 O(n)O(n),但是要占用额外的空间。

继承目的

继承的本质是为了复用代码,派生类复用基类的代码,从而减少冗余的代码,使得创建和维护一个引用变得更加容易。继承代表的是 is-a 的关系。

继承种类

公有继承

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <iostream>
using namespace std;

class A {
public:
A() = default;
A(int a, int b, int c) : pub(a), pro(b), priv(c) {}
int pub;
protected:
int pro;
private:
int priv;
};

class B : public A { // 公有继承
public:
B(int a, int b, int c) {
pub = a;
pro = b;
// pri = c;
// test.cc:20:9: error: ‘pri’ was not declared in this scope
// pri = c;
// ^~~
}
};


int main() {
B b(1, 2, 3);
cout << "b.pub:" << b.pub << endl;
// cout << "b.pro:" << b.pro << endl;
// test.cc:31:27: error: ‘int A::pro’ is protected within this context
// cout << "b.pro:" << b.pro << endl;
// ^~~
// cout << "b.priv:" << b.priv << endl;
// test.cc:32:28: error: ‘int A::priv’ is private within this context
// cout << "b.priv:" << b.priv << endl;
// ^~~~
return 0;
}
1
2
@└────> # ./a.out 
b.pub:1

可以看到,公有继承即保留基类的成员属性,公有 => 公有,保护 => 保护,私有 => 无法访问。

保护继承

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
using namespace std;

class A {
public:
A() = default;
A(int a, int b, int c) : pub(a), pro(b), priv(c) {}
int pub;
protected:
int pro;
private:
int priv;
};

class B : protected A { // 保护继承
public:
B(int a, int b, int c) {
pub = a;
pro = b;
}
};

int main() {
B b(1, 2, 3);
cout << "b.pub:" << b.pub << endl;
return 0;
}
1
2
3
4
5
@└────> # g++ test.cc 
test.cc: In function ‘int main()’:
test.cc:25:27: error: ‘int A::pub’ is inaccessible within this context
cout << "b.pub:" << b.pub << endl;
^~~

保护继承会把 公有 => 保护,保护 => 保护, 私有 => 无法访问。所以原先的公有成员也不能以公有形式访问了。

私有继承

私有继承结果和保护一样,公有 => 私有,保护 => 私有,私有 => 无法访问。

覆写规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
using namespace std;

class A {
public:
void func(int a) {
cout << "A::func(int a)" << endl;
}
};

class B : public A {
public:
void func(int a, int b) {
cout << "B::func(int a, int b)" << endl;
}
};

int main() {
B b;
b.func(10);
return 0;
}
1
2
3
4
5
6
7
8
9
@└────> # g++ test.cc 
test.cc: In function ‘int main()’:
test.cc:20:14: error: no matching function for call to ‘B::func(int)’
b.func(10);
^
test.cc:13:10: note: candidate: ‘void B::func(int, int)’
void func(int a, int b) {
^~~~
test.cc:13:10: note: candidate expects 2 arguments, 1 provided

在基类中的某些函数,如果没有 virtual 关键字,函数名是 func (参数类型我们不管)。如果派生类中也声明了这个成员函数,那在派生类的作用域中,所有和这个函数同名的函数都被隐藏。

如果基类中的函数和派生类中有两个名字一样的函数 func 满足下面的两个条件

  1. 在基类中函数声明的时候有 virtual 关键字
  2. 基类中的函数和派生类中的函数一模一样,包括函数名,参数,返回类型都一样。

那么这就是叫做覆盖(override),这也就是虚函数,多态的性质。其他的情况,只要名字一样,不满足上面覆盖的条件,就是隐藏。

好多人认为,基类中的函数会继承下来和派生类中的同名不同参的函数构成重载。

重载(overload):
必须在一个域中,函数名称相同但是函数参数不同。重载的作用就是同一个函数有不同的行为,因此不是在一个域中的函数是无法构成重载的。

必须在一个域中,而继承是在两个类中了,所以上面的想法是不成立的。所以,相同的函数名的函数,在基类和派生类中的关系只能是覆盖或者隐藏。

隐藏(hide):
指派生类的成员函数隐藏了基类函数的成员函数。隐藏一词可以这么理解:在调用一个类的成员函数的时候,编译器会沿着类的继承链,逐级的向上查找函数的定义,如果找到了那么就停止查找。所以如果一个派生类和一个基类都有同一个同名(暂且不论参数是否相同)的函数,而编译器最终选择了在派生类中的函数,那么这个派生类的成员函数“隐藏”了基类的成员函数,阻止了编译器继续向上查找函数的定义。

CUDA 编程

CUDA 一种异构计算平台

CUDA 是 NVIDIA 推出的用于自家 GPU 的并行计算框架,也就是说 CUDA 只能在 NVIDIA 的 GPU 上运行,而且只有当要解决的计算问题是可以大量并行计算的时候才能发挥 CUDA 的作用。CUDA 的主要作用是连接 GPU 和 应用程序,方便用户通过 CUDA 的 API 调度 GPU 进行计算。

一个CUDA应用通常可以分解为两部分,

  1. CPU 主机端代码
  2. GPU 设备端代码

CUDA nvcc 编译器会自动分离你代码里面的不同部分,host 代码用 cpp 写成,使用本地的 g++ 编译器编译,设备端代码,也就是核函数,用 CUDA C 编写,通过 nvcc 编译,链接阶段,在内核程序调用或者明显的 GPU 设备操作时,添加运行时库。

1

CUDA 编程

简单示例代码

hello world 例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/*
*hello_world.cu
*/
#include<stdio.h>
__global__ void hello_world(void) {
printf("GPU: Hello world! \n");
}

int main(int argc, char **argv) {
printf("CPU: Hello world!\n");
hello_world<<<1,10>>>();
cudaDeviceReset(); //if no this line, it can not output hello world from gpu
return 0;
}

简单介绍其中几个关键字

1
__global__ // 是告诉编译器这个是个可以在设备上执行的核函数
1
hello_world<<<1, 10>>>(); // 其中变量的含义是<<<线程块的个数,每个线程块中线程的个数>>> 一个核函数被执行的次数就是两个参数的乘积
1
cudaDeviceReset();

这句话如果没有,则不能正常的运行,因为这句话包含了隐式同步,GPU 和 CPU 执行程序是异步的,核函数调用后成立刻会到主机线程继续,而不管 GPU 端核函数是否执行完毕,所以上面的程序就是GPU 刚开始执行,CPU已经退出程序了,所以我们要等 GPU 执行完了,再退出主机线程。

调用核函数

核函数就是在 CUDA 模型上诸多线程中运行的那段串行代码,这段代码在 GPU 上运行,用 NVCC 编译,产生的机器码是 GPU 的机器码,所以我们写 CUDA 程序就是写核函数,第一步我们要确保核函数能正确的运行产生正确的结果,第二优化 CUDA 程序的部分,无论是优化算法,还是调整内存结构,线程结构都是要调整核函数内的代码,来完成这些优化的。
我们一直把我们的 CPU 当做一个控制者,运行核函数,要从 CPU 发起。

1
kernel_name<<<grid, block, share_mem, stream>>>(argument list);

<<<grid, block, share_mem, stream>>> 是对 GPU 代码执行的线程结构的配置。我们通过 CUDA C 内置的数据类型 dim3 类型的变量来配置 grid 和 block。

  1. grid: grid 中 block 的个数
  2. block: 每个 block 中 thread 的布局
  3. 是一个可选参数,用于设置每个 block 除了静态分配的 shared memory 外,最多能动态分配的 shared memory 大小,单位为字节,默认为 0。
  4. 是一个可选参数,是 cudaStream_t 类型,初始值为 0,用于表示该核函数处于哪个流中。

例如:

1
kernel_name<<<4, 8>>>(argument list);

表现为

2

可以用 threadIdx.x 和 blockIdx.x (dim3 类型,可以为x, y, z)来组合获得对应的线程的唯一标识。当主机启动了核函数,控制权马上回到主机(不阻塞),而不是主机等待设备完成核函数的运行。想要主机等待设备端执行可以用下面这个指令:

1
cudaError_t cudaDeviceSynchronize(void);

当然,有些操作要阻塞,比如内存拷贝,因为要用到 host。

编写核函数

1
2
3
4
__global__ void sumArraysOnGPU(float *A, float *B, float *C) {
int i = threadIdx.x;
C[i] = A[i] + B[i];
}
限定符 执行 调用 备注
__global__ 设备端执行 可以从主机调用也可以从计算能力3以上的设备调用 必须有一个void的返回类型
__device__ 设备端执行 设备端调用
__host__ 主机端执行 主机调用 可以省略

Kernel核函数编写有以下限制

  1. 只能访问设备内存
  2. 必须有 void 返回类型
  3. 不支持可变数量的参数
  4. 不支持静态变量
  5. 显示异步行为

完整示例代码

一般 CUDA 程序分成下面这些步骤:

  1. 分配 GPU 内存
  2. 拷贝内存到设备
  3. 调用 CUDA 内核函数来执行计算
  4. 把计算完成数据拷贝回主机端
  5. 内存释放
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <cuda_runtime.h>
#include <stdio.h>
// 错误检验的宏
#define CHECK(call)\
{\
const cudaError_t error=call;\
if(error!=cudaSuccess)\
{\
printf("ERROR: %s:%d,",__FILE__,__LINE__);\
printf("code:%d,reason:%s\n",error,cudaGetErrorString(error));\
exit(1);\
}\
}

__global__ void sumArraysGPU(float* a, float* b, float* res) {
int i = threadIdx.x;
res[i] = a[i] + b[i];
}

void initialData(float* vec, int n) {
for (int i = 0; i < n; i++) {
vec[i] = (float)i;
}
}

int main(int argc,char **argv) {
int dev = 0;
cudaSetDevice(dev);

int nElem = 32;
printf("Vector size:%d\n", nElem);
int nByte = sizeof(float)*nElem;
float *a_h = (float*)malloc(nByte); // 输入数据 a
float *b_h = (float*)malloc(nByte); // 输入数据 b
float *res_from_gpu_h = (float*)malloc(nByte); // 用于接受从 gpu 返回的结果
memset(res_from_gpu_h, 0, nByte);

float *a_d, *b_d, *res_d; // 核函数的输入核输出地址(在 gpu 上申请的内存)
CHECK(cudaMalloc((float**)&a_d, nByte)); // 在 gpu 上申请内存
CHECK(cudaMalloc((float**)&b_d, nByte));
CHECK(cudaMalloc((float**)&res_d, nByte));

initialData(a_h, nElem);
initialData(b_h, nElem);

CHECK(cudaMemcpy(a_d, a_h, nByte, cudaMemcpyHostToDevice)); // 将输入数据拷贝到 gpu
CHECK(cudaMemcpy(b_d, b_h, nByte, cudaMemcpyHostToDevice));

dim3 block(nElem);
dim3 grid(nElem / block.x);
sumArraysGPU<<<grid, block>>>(a_d, b_d, res_d); // 调用核函数
printf("Execution configuration<<<%d, %d>>>\n", block.x, grid.x);

CHECK(cudaMemcpy(res_from_gpu_h, res_d, nByte, cudaMemcpyDeviceToHost)); // 将核函数结果从 gpu 返回给 主机

cudaFree(a_d);
cudaFree(b_d);
cudaFree(res_d);

free(a_h);
free(b_h);
free(res_from_gpu_h);

return 0;
}
1
nvcc xxx.cu -o a.out

gpu计算

异构计算

异构计算,首先必须了解什么是异构,不同的计算机架构就是异构,按照指令集划分或者按照内存结构划分。

GPU 本来的任务是做图形图像的,并行度很高,一定距离外的像素点之间的计算是独立的,所以属于并行任务。GPU 插在主板的 PCIe 卡口上,运行程序的时候,CPU 像是一个控制者,指挥两台 显卡完成工作后进行汇总,和下一步工作安排,所以 CPU 我们可以把它看做一个指挥者,主机端,host,而完成大量计算的 GPU 是我们的计算设备,device。

1

上面这张图能大致反应 CPU 和 GPU 的架构不同。

左图:一个四核 CPU 一般有四个 ALU,ALU 是完成逻辑计算的核心,也是我们平时说四核八核的核,控制单元,缓存也在片上,DRAM 是内存,一般不在片上,CPU 通过总线访问内存。

右图:GPU,绿色小方块是 ALU,我们注意红色框内的部分 SM,这一组 ALU 共用一个 Control 单元和 Cache,这个部分相当于一个完整的多核 CPU,但是不同的是 ALU 多了,Control 部分变小。所以计算能力提升了,控制能力减弱了。所以对于控制密集的程序,一个 GPU 的 SM 是没办法和 CPU 比较的,但是对了逻辑简单,数据量大的任务,GPU 更高效。并且,一个 GPU 有好多个 SM。

2

CPU和GPU之间通过 PCIe 总线连接,用于传递指令和数据,这部分也是后面要讨论的性能瓶颈之一。
一个异构应用包含两种以上架构,所以代码也包括不止一部分:

  1. 主机代码在主机端运行,被编译成主机架构的机器码
  2. 设备端的在设备上执行,被编译成设备架构的机器码。

所以主机端的机器码和设备端的机器码是隔离的,自己执行自己的,没办法交换执行。
主机端代码主要是控制设备,完成数据传输控制类工作,设备端主要的任务就是计算

因为当没有 GPU 的时候 CPU 也能完成这些计算,只是速度会慢很多,所以可以把 GPU 看成 CPU 的一个加速设备。

GPU 硬件结构

GPU的硬件结构,也不是具体的硬件结构,就是与 CUDA 相关的几个概念:thread,block,grid,Wrap,SP,SM。

  1. SP:最基本的处理单元,streaming processor。最后具体的指令和任务都是在 SP 上处理的。GPU 进行并行计算,也就是很多个 SP 同时做处理。每个 SP 有它自己的寄存器,比较稀缺的资源。

  2. SM:多个(几十或者上百,取决于设备) SP 加上其他的一些资源组成一个 SM,streaming multiprocessor。其他资源也就是存储资源,共享内存,寄储器等。各个 SM 之间只能通过全局内存间接通信,没有其它互联通道,所以这个集群只适合进行纯并行化计算。如果在计算过程中每个 SM 之间还需要通信,则整体运行效率很低。

  3. Wrap:

  1. SM 中的 SP 会分成成组的 Warp,每组 32 个。
  2. Wrap Scheduler 会从在 SM 上的所有 Warp 中进行指令调度。从已经有指令可以被执行的 Warp 中挑选然后分配下去。这些 Warp 可能来自与驻留在 SM 上的任何线程块。
  3. 所以,Warp 是 GPU 执行程序时的调度单位,同在一个 Wrap 的线程,以不同数据资源执行相同的指令。
  4. 一个 SM 上在某一个时刻,有 32 个线程在执行同一条指令,这 32 个线程可以选择性执行,虽然有些可以不执行,但是他也不能执行别的指令。
  5. 当一个 Warp 空闲时(或者在读数据,或者执行完),SM 就可以调度驻留在该 SM 上的另一个 Warp。
  6. 并发的 Warp 之间切换是没消耗的,因为资源早就被分配到所有 thread 和 block。

3

4

如上图,如果有 if-else 分支,同一个 Warp 内的线程,不能在执行 if 的同时,另一群在执行 else,而是在执行 if 时,另一群选择等待。这种现象又被称为 Warp 发散。

  1. grid、block、thread:在利用 cuda 进行编程时,一个核函数会分配一个 grid。一个 grid 分为多个 block,而一个 block 分为多个 thread。其中任务划分到是否影响最后的执行效果。划分的依据是任务特性和 GPU 本身的硬件特性
  1. block 是软件概念,通过设置该属性告诉 GPU 我有多少个线程,该如何组织。
  2. 一个 block 只会由一个 SM 进行调度,一旦被分配好 SM,block 就会一直驻留在 SM 中直到程序结束。
  3. 一个 SM 可以拥有多个 block,但是要顺序执行:

5

  1. 一个 block 有多个 Warp,例如一个有 512 线程的 block,有(512 / 32 = 16)个 Warp,这些 Warp 轮流进入 SM,由 Warp Scheduler 负责调度。若 block 内的线程数不是 32 的整数倍,那多余的 thread 单独为一个 Warp。
  2. 目前一个 block 内最多 1024 个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "locker.h"

// 线程池类,定义为模板类是为了代码复用,模板参数T是任务类
template< typename T >
class threadpool
{
public:
// thread_number是线程池中线程的数量, max_requests是请求队列中最多允许的 等待处理的请求的数量
threadpool( int thread_number = 8, int max_requests = 10000 );
~threadpool();
// 向请求队列添加任务
bool append( T* request );

private:
// 工作线程运行的函数, 它不断从工作队列中取出任务执行之
static void* worker( void* arg );
void run();

private:
int m_thread_number; // 线程池中线程数
int m_max_requests; // 请求队列中允许的最大请求数量
pthread_t* m_threads; // 描述线程池的数组,其大小为m_thread_number
std::list< T* > m_workqueue; // 请求队列
locker m_queuelocker; // 保护请求队列的互斥锁
sem m_queuestat; // 是否有任务要处理
bool m_stop; // 是否结束线程
};

template< typename T >
threadpool< T >::threadpool( int thread_number, int max_requests ) :
m_thread_number( thread_number ), m_max_requests( max_requests ), m_stop( false ), m_threads( NULL )
{
if( ( thread_number <= 0 ) || ( max_requests <= 0 ) )
{
throw std::exception();
}

m_threads = new pthread_t[ m_thread_number ];
if( ! m_threads )
{
throw std::exception();
}
// 创建thread_number个线程,并将它们设置为脱离线程
for ( int i = 0; i < thread_number; ++i )
{
printf( "create the %dth thread\n", i );
if( pthread_create( m_threads + i, NULL, worker, this ) != 0 )
{
delete [] m_threads;
throw std::exception();
}
if( pthread_detach( m_threads[i] ) )
{
delete [] m_threads;
throw std::exception();
}
}
}

template< typename T >
threadpool< T >::~threadpool()
{
delete [] m_threads;
m_stop = true;
}

template< typename T >
bool threadpool< T >::append( T* request )
{
m_queuelocker.lock(); // 操作工作队列要加锁,因为他被所有线程共享
if ( m_workqueue.size() > m_max_requests )
{
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back( request );
m_queuelocker.unlock();
m_queuestat.post();
return true;
}

template< typename T >
void* threadpool< T >::worker( void* arg )
{
threadpool* pool = ( threadpool* )arg;
pool->run();
return pool;
}

template< typename T >
void threadpool< T >::run()
{
while ( ! m_stop )
{
m_queuestat.wait();
m_queuelocker.lock();
if ( m_workqueue.empty() )
{
m_queuelocker.unlock();
continue;
}
T* request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if ( ! request )
{
continue;
}
request->process(); // 模板类需要实现这个接口
}
}

#endif

在c++中使用pthread_create时,第三个参数必须为静态函数。在静态函数中使用类的动态成员(包括成员函数和成员变量),只能通过两种方式实现:

  1. 通过类的静态对象来调用。比如单例模式中,静态函数可以通过类的全局唯一实例来访问动态成员函数。
  2. 将类的对象作为参数传给该静态函数,在静态函数中引用这个对象,并调用动态方法。

上面代码中向线程参数设置为this指针,在worker函数中获得该指针并调用动态方法run()。


reference:
linux高性能服务器编程——游双P301P_{301}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>

static const char* request = "GET http://localhost/index.html HTTP/1.1\r\nConnection: keep-alive\r\n\r\nxxxxxxxxxxxx";

int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}

void addfd( int epoll_fd, int fd )
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLOUT | EPOLLET | EPOLLERR;
epoll_ctl( epoll_fd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}

// 向服务器写len个字节数据
bool write_nbytes( int sockfd, const char* buffer, int len )
{
int bytes_write = 0;
printf( "write out %d bytes to socket %d\n", len, sockfd );
while( 1 )
{
bytes_write = send( sockfd, buffer, len, 0 );
if ( bytes_write == -1 )
{
return false;
}
else if ( bytes_write == 0 )
{
return false;
}

len -= bytes_write;
buffer = buffer + bytes_write;
if ( len <= 0 )
{
return true;
}
}
}

// 从服务器读取数据
bool read_once( int sockfd, char* buffer, int len )
{
int bytes_read = 0;
memset( buffer, '\0', len );
bytes_read = recv( sockfd, buffer, len, 0 );
if ( bytes_read == -1 )
{
return false;
}
else if ( bytes_read == 0 )
{
return false;
}
printf( "read in %d bytes from socket %d with content: %s\n", bytes_read, sockfd, buffer );

return true;
}

// 向服务器发起num个tcp连接,可以通过num更改压力
void start_conn( int epoll_fd, int num, const char* ip, int port )
{
int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );

for ( int i = 0; i < num; ++i )
{
sleep( 1 );
int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
printf( "create 1 sock\n" );
if( sockfd < 0 )
{
continue;
}

if ( connect( sockfd, ( struct sockaddr* )&address, sizeof( address ) ) == 0 )
{
printf( "build connection %d\n", i );
addfd( epoll_fd, sockfd );
}
}
}

void close_conn( int epoll_fd, int sockfd )
{
epoll_ctl( epoll_fd, EPOLL_CTL_DEL, sockfd, 0 );
close( sockfd );
}

int main( int argc, char* argv[] )
{
assert( argc == 4 );
int epoll_fd = epoll_create( 100 );
start_conn( epoll_fd, atoi( argv[ 3 ] ), argv[1], atoi( argv[2] ) );
epoll_event events[ 10000 ];
char buffer[ 2048 ];
while ( 1 )
{
int fds = epoll_wait( epoll_fd, events, 10000, 2000 );
for ( int i = 0; i < fds; i++ )
{
int sockfd = events[i].data.fd;
if ( events[i].events & EPOLLIN )
{
if ( ! read_once( sockfd, buffer, 2048 ) )
{
close_conn( epoll_fd, sockfd );
}
struct epoll_event event;
event.events = EPOLLOUT | EPOLLET | EPOLLERR; // 读完之后设置成可写
event.data.fd = sockfd;
epoll_ctl( epoll_fd, EPOLL_CTL_MOD, sockfd, &event );
}
else if( events[i].events & EPOLLOUT )
{
if ( ! write_nbytes( sockfd, request, strlen( request ) ) )
{
close_conn( epoll_fd, sockfd );
}
struct epoll_event event;
event.events = EPOLLIN | EPOLLET | EPOLLERR; // 写完之后设置成可读
event.data.fd = sockfd;
epoll_ctl( epoll_fd, EPOLL_CTL_MOD, sockfd, &event );
}
else if( events[i].events & EPOLLERR )
{
close_conn( epoll_fd, sockfd );
}
}
}
}

客户端运行:

1
./clientname {ip} {port} {number}

在这里插入图片描述


reference:
Linux高性能服务器编程——游双P329P_{329}

CGI 是Web 服务器运行时外部程序的规范,按CGI 编写的程序可以扩展服务器功能。
CGI(Common Gateway Interface) 是WWW技术中最重要的技术之一,有着不可替代的重要地位。CGI是外部应用程序(CGI程序)与WEB服务器之间的接口标准,是在CGI程序和Web服务器之间传递信息的过程。
CGI规范允许Web服务器执行外部程序,并将它们的输出发送给Web浏览器,CGI将Web的一组简单的静态超媒体文档变成一个完整的新的交互式媒体。

CGI的处理步骤:

  1. 通过Internet把用户请求送到web服务器。
  2. web服务器接收用户请求并交给CGI程序处理。
  3. CGI程序把处理结果传送给web服务器。
  4. web服务器把结果送回到用户。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/stat.h>

#include "processpool.h"

// 用于处理客户cgi请求的类,他可以作为processpool类的模板参数
class cgi_conn {
public:
cgi_conn(){};
~cgi_conn(){};
// 初始化客户连接,清空读缓冲区
void init(int epollfd, int sockfd, const sockaddr_in& client_addr) {
m_epollfd = epollfd;
m_sockfd = sockfd;
m_address = client_addr;
memset(m_buf, '\0', BUFFER_SIZE);
m_read_idx = 0;
}

void process() {
int idx = 0;
int ret = -1;
// 循环读取和分析客户数据
while (true) {
idx = m_read_idx;
ret = recv(m_sockfd, m_buf + idx, BUFFER_SIZE - 1 - idx, 0);
// 如果读操作发生错误,则关闭客户连接,如果是暂时无数据可读,则退出循环
if (ret < 0) {
if (errno != EAGAIN) {
removefd(m_epollfd, m_sockfd);
}
break;
} else if (ret == 0) {
// 如果对方关闭连接,服务器也关闭连接
removefd(m_epollfd, m_sockfd);
break;
} else {
m_read_idx += ret;
printf("user content is: %s\n", m_buf);
for (; idx < m_read_idx; ++idx) {
if ((idx >= 1) && (m_buf[idx - 1] == '\r') && (m_buf[idx] == '\n')) {
break;
}
}
// 如果没有遇到字符\r\n 则需要读取更多客户数据
if (idx == m_read_idx) {
continue;
}
m_buf[idx - 1] = '\0';

char* filename = m_buf;
// 判断客户要运行的cgi程序是否存在
if (access(filename, F_OK) == -1) {
removefd(m_epollfd, m_sockfd);
break;
}
// 创建子进程来执行cgi程序
ret = fork();
if (ret == -1) {
removefd(m_epollfd, m_sockfd);
break;
} else if (ret > 0) { // 父进程
// 父进程只需要关闭连接
removefd(m_epollfd, m_sockfd);
break;
} else {
// 子进程需要将标准输出定向到 m_sockfd,并执行cgi程序
close(STDOUT_FILENO);
dup(m_sockfd); // 把m_sockfd重定向到标准输出
execl(m_buf, m_buf, 0);
exit(0);
}
} // 读数据end
} // while end

} // process end

private:
static const int BUFFER_SIZE = 1024;
static int m_epollfd;
int m_sockfd;
sockaddr_in m_address;
char m_buf[BUFFER_SIZE];
int m_read_idx; // 读的光标位置
};

int cgi_conn::m_epollfd = -1;

int main(int argc, char* argv[]) {
if (argc <= 2) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);

int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);

int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);

ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);

ret = listen(listenfd, 5);
assert(ret != -1);

processpool<cgi_conn>* pool = processpool<cgi_conn>::create(listenfd); // 创建唯一进程池实例
if (pool) {
pool->run();
delete pool;
}
close(listenfd); // 正如前文提到的,main函数创建了文件描述符listenfd,那么就由它亲自关闭之

return 0;
}

本程序需要配合上一节的线程池头文件processpool.h。
运行结果如下:

服务器

在这里插入图片描述


客户端:

在这里插入图片描述


这里只是简单模拟,由于服务器是将信息作为文件名去访问服务器文件,而客户端随意发送信息,所以判断信息不存在就直接退出了。


reference:linux高性能服务器编程——游双

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
#ifndef PROCESSPOOL_H
#define PROCESSPOOL_H

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/stat.h>

class process {
public:
process() : m_pid(-1) {}

public:
pid_t m_pid;
int m_pipefd[2]; // 父进程和子进程通信用管道
};

template <typename T> // 模板参数是处理逻辑任务的类
class processpool {
private:
// 定义为私有,只能通过create静态函数来创建processpool实例
processpool(int listenfd, int process_number = 8);

public:
// 单例模式,是程序正确处理信号的必要条件
static processpool<T> *create(int listenfd, int process_number = 8) {
if (!m_instance) {
m_instance = new processpool<T>(listenfd, process_number);
}
return m_instance;
}
~processpool() {
delete[] m_sub_process;
}
void run();

private:
void setup_sig_pipe();
void run_parent();
void run_child();

private:
static const int MAX_PROCESS_NUMBER = 16; // 最大子进程数量
static const int USER_PER_PROCESS = 65536; // 每个子进程处理最大客户数量
static const int MAX_EVENT_NUMBER = 10000; // epoll最多能处理的事件数
int m_process_number; // 进程总数
int m_idx; // 子进程在池中序号
int m_epollfd; // 每个进程有个epoll内核事件表
int m_listenfd; // 监听socket
int m_stop; // 子进程通过m_stop判断是否停止运行
process *m_sub_process; // 保存所有子进程描述信息
static processpool<T> *m_instance; // 进程池静态实例
};
template <typename T>
processpool<T> *processpool<T>::m_instance = NULL;

static int sig_pipefd[2]; // 信号管道 统一事件源

static int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}

static void addfd(int epollfd, int fd) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}

static void removefd(int epollfd, int fd) { // 从epollfd表示的内核事件表中删除fd的 所有事件
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);
close(fd);
}

static void sig_handler(int sig) {
int save_errno = errno;
int msg = sig;
send(sig_pipefd[1], (char *)&msg, 1, 0);
errno = save_errno;
}

static void addsig(int sig, void(handler)(int), bool restart = true) {
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler;
if (restart)
{
sa.sa_flags |= SA_RESTART;
}
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}

template <typename T>
processpool<T>::processpool(int listenfd, int process_number) // 进程池构造函数,参数listenfd是监听的socket,必须在创建进程池之前被创建
: m_listenfd(listenfd), m_process_number(process_number), m_idx(-1), m_stop(false) {
assert((process_number > 0) && (process_number <= MAX_PROCESS_NUMBER));

m_sub_process = new process[process_number];
assert(m_sub_process);

for (int i = 0; i < process_number; ++i) {
int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, m_sub_process[i].m_pipefd);
assert(ret == 0);

m_sub_process[i].m_pid = fork();
assert(m_sub_process[i].m_pid >= 0);
if (m_sub_process[i].m_pid > 0) { // 父进程
close(m_sub_process[i].m_pipefd[1]); // 只向子进程写
continue;
}
else { // 子进程
close(m_sub_process[i].m_pipefd[0]); // 只从父进程读
m_idx = i;
break;
}
}
}

template <typename T>
void processpool<T>::setup_sig_pipe() { // 统一事件源
m_epollfd = epoll_create(5); // 创建epoll事件监听表和信号管道
assert(m_epollfd != -1);

int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
assert(ret != -1);

setnonblocking(sig_pipefd[1]);
addfd(m_epollfd, sig_pipefd[0]);

addsig(SIGCHLD, sig_handler); // 设置信号处理函数
addsig(SIGTERM, sig_handler);
addsig(SIGINT, sig_handler);
addsig(SIGPIPE, SIG_IGN);
}

template <typename T>
void processpool<T>::run() { // 父进程中m_idx值为 -1, 子进程中m_idx值 >= 0,判断接下来运行的是父还是子进程的代码
if (m_idx != -1)
{
run_child();
return;
}
run_parent();
}

template <typename T>
void processpool<T>::run_child() {
setup_sig_pipe();

int pipefd = m_sub_process[m_idx].m_pipefd[1]; // 每个子进程都通过其在进程池中的序号值m_idx找到与父进程通信的管道
addfd(m_epollfd, pipefd); // 子进程需要监听管道文件描述符pipefd 因为父进程将通过它通知子进程accept新连接

epoll_event events[MAX_EVENT_NUMBER];
T *users = new T[USER_PER_PROCESS];
assert(users);
int number = 0; // epoll事件数量
int ret = -1;

while (!m_stop) {
number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR)) {
printf("epoll failure\n");
break;
}

for (int i = 0; i < number; i++) { // 处理每个事件
int sockfd = events[i].data.fd;
if ((sockfd == pipefd) && (events[i].events & EPOLLIN)) { // 父进程有数据给子进程
int client = 0;
ret = recv(sockfd, (char *)&client, sizeof(client), 0); // 从父子进程之间管道读取数据,结果保存client中读取成功有新客户
if (((ret < 0) && (errno != EAGAIN)) || ret == 0) {
continue;
} else {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(m_listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0) {
printf("errno is: %d\n", errno);
continue;
}
addfd(m_epollfd, connfd); // 模板类T必须实现init方法,初始化一个客户连接。我们用connfd来索引处理的逻辑对象 提高效率
users[connfd].init(m_epollfd, connfd, client_address); // 实现在cgi_server.cpp中
}
} else if ((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)) { // 有信号,处理信号
int sig;
char signals[1024];
ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
if (ret <= 0) {
continue;
} else {
for (int i = 0; i < ret; ++i) {
switch (signals[i]) {
case SIGCHLD: { // 回收子进程资源
pid_t pid;
int stat;
while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
continue;
}
break;
}
case SIGTERM:
case SIGINT: {
m_stop = true;
break;
}
default: {
break;
}
}
}
}
} else if (events[i].events & EPOLLIN) { // 其他数据可读 必然是客户请求到来 调用逻辑处理对象的process方法处理
users[sockfd].process(); // 还没实现
} else {
continue;
}
}
}

delete[] users;
users = NULL;
close(pipefd);
//close( m_listenfd ); // 应该由m_listenfd的创建者来关闭,所谓的对象由哪个函数创建,就该由那个函数销毁
close(m_epollfd);
}

template <typename T>
void processpool<T>::run_parent() {
setup_sig_pipe();

addfd(m_epollfd, m_listenfd);

epoll_event events[MAX_EVENT_NUMBER];
int sub_process_counter = 0;
int new_conn = 1;
int number = 0;
int ret = -1;

while (!m_stop) {
number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR)) {
printf("epoll failure\n");
break;
}

for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if (sockfd == m_listenfd) {
int i = sub_process_counter; // 有新连接到来,就采用round robin方式将其分给一个子进程
do {
if (m_sub_process[i].m_pid != -1) { // 选出空闲进程的进程数组索引
break;
}
i = (i + 1) % m_process_number;
} while (i != sub_process_counter);

if (m_sub_process[i].m_pid == -1) {
m_stop = true;
break;
}
sub_process_counter = (i + 1) % m_process_number;
//send( m_sub_process[sub_process_counter++].m_pipefd[0], ( char* )&new_conn, sizeof( new_conn ), 0 );
send(m_sub_process[i].m_pipefd[0], (char *)&new_conn, sizeof(new_conn), 0);
printf("send request to child %d\n", i);
//sub_process_counter %= m_process_number;
} else if ((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)) { // 处理父进程收到的信号
int sig;
char signals[1024];
ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
if (ret <= 0) {
continue;
} else {
for (int i = 0; i < ret; ++i) {
switch (signals[i]) {
case SIGCHLD: {
pid_t pid;
int stat;
while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
for (int i = 0; i < m_process_number; ++i) {
// 如果进程池中第i个子进程退出了,则主进程关闭对应通信管道,并设置m_pid为-1,标记子进程已经退出
if (m_sub_process[i].m_pid == pid) {
printf("child %d join\n", i);
close(m_sub_process[i].m_pipefd[0]);
m_sub_process[i].m_pid = -1;
}
}
}
m_stop = true;
for (int i = 0; i < m_process_number; ++i) {
if (m_sub_process[i].m_pid != -1) {
m_stop = false;
}
}
break;
}
case SIGTERM:
case SIGINT: {
printf("kill all the clild now\n"); // 杀了所有子进程
for (int i = 0; i < m_process_number; ++i) {
int pid = m_sub_process[i].m_pid;
if (pid != -1) {
kill(pid, SIGTERM);
}
}
break;
}
default: {
break;
}
}
}
}
} else {
continue;
}
}
}

//close( m_listenfd ); // 由创建者关闭 见后文
close(m_epollfd);
}

#endif

此头文件需要传入cgi模板类以进行实例化线程池。模板代码见下一节。


reference:
linux高性能服务器编程——游双