cpp11简单线程池

线程池

为什么需要线程池?因为频繁的创建和销毁线程,都涉及系统调用。频繁的系统调用涉及到用户态和内核态之间的切换,开销大。线程池的作用就是先把线程创建好,有活干就让线程去干活,没活线程就阻塞着。这样不需要频繁创建销毁线程,提升效率。除此之外,还可以防止过分调度。

图解

  1. 空闲时

1

  1. 有空闲线程时来任务,将任务加入到任务队列中,并唤醒一个线程。唤醒后线程拿到任务,把任务从任务队列中移出,并执行。

2

  1. 在没有空闲线程时来任务,任务排队。

3

代码

main.cc:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <chrono>
#include "thread_pool.h"

std::mutex g_screen_mutex; // 向终端打印信息的锁

void testFunc() {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::lock_guard<std::mutex> lock(g_screen_mutex);
std::cout << "===============================================================" << std::endl;
std::cout << "| testFunc() at thread [ " << std::this_thread::get_id() << "] output [" << 0 << "] |" << std::endl;
std::cout << "===============================================================" << std::endl;
}

int main() {
thread_pool thread_pool;
for(int i = 0; i < 5 ; i++) { // 往里面插入 5 个任务
thread_pool.add_task(testFunc);
}
getchar(); // 等待,不要让主进程退出
return 0;
}

thread_pool.h:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_

#include <vector>
#include <queue>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>

class thread_pool {
public:
// 定义为一个函数类型,返回值为 void,没有入参
typedef std::function<void()> task_t;

thread_pool(int init_size = 3);
~thread_pool();
// 停止线程池
void stop();
// 向线程池加入任务
void add_task(const task_t&);

private:
thread_pool(const thread_pool&) = delete; // 禁止复制拷贝
const thread_pool& operator=(const thread_pool&) = delete;
// 线程池启动函数
void start();
// 每个线程的循环函数
void thread_loop();
// 从线程池里拿一个线程
task_t take();

int init_threads_size_; // 初始线程数量
std::vector<std::thread*> threads_; // 已经创建的线程列表
std::queue<task_t> tasks_; // 待执行任务列表
std::mutex mutex_; // 操作线程池共有变量之前先上锁
std::condition_variable wake_cond_; // 唤醒线程的条件
bool is_started_; // 线程池是否已经启动
};
#endif

thread_pool.cc:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <assert.h>
#include <iostream>
#include "thread_pool.h"

thread_pool::thread_pool(int init_size)
: init_threads_size_(init_size), mutex_(), wake_cond_(), is_started_(false) {
start();
}

thread_pool::~thread_pool() {
if (is_started_) {
stop();
}
}

void thread_pool::start() {
assert(threads_.empty());
is_started_ = true;
threads_.reserve(init_threads_size_);
for (int i = 0; i < init_threads_size_; ++i) {
// 非静态成员函数则需要传递 this 指针作为第一个参数
threads_.push_back(new std::thread(std::bind(&thread_pool::thread_loop, this)));
}
}

void thread_pool::stop() {
std::cout << "thread_pool::stop() stop." << std::endl;
{
std::unique_lock<std::mutex> lock(mutex_); // 只有 unique_lock 才能和 condition_variable 配合使用
is_started_ = false;
wake_cond_.notify_all(); // 销毁前唤醒所有线程
std::cout << "thread_pool::stop() notifyAll()." << std::endl;
}

for (auto thread : threads_) {
thread->join(); // 销毁之前需要等待所有线程完成
delete thread;
}
threads_.clear();
}

void thread_pool::thread_loop() {
std::cout << "thread_pool::threadLoop() tid : " << std::this_thread::get_id() << " start." << std::endl;
while (is_started_) {
task_t task = take(); // 如果没有拿到会阻塞在这里
if (task) {
task();
}
}
std::cout << "thread_pool::threadLoop() tid : " << std::this_thread::get_id() << " exit." << std::endl;
}

void thread_pool::add_task(const task_t& task) {
std::unique_lock<std::mutex> lock(mutex_);
tasks_.push(task);
wake_cond_.notify_one(); // 只唤醒一个线程
}

thread_pool::task_t thread_pool::take() {
std::unique_lock<std::mutex> lock(mutex_);
// 使用 while 循环,防止假唤醒
while (tasks_.empty() && is_started_) {
std::cout << "thread_pool::take() tid : " << std::this_thread::get_id() << " wait." << std::endl;
wake_cond_.wait(lock); // 如果没任务的话线程会阻塞在这里
}

std::cout << "thread_pool::take() tid : " << std::this_thread::get_id() << " wakeup." << std::endl;
task_t task;
size_t size = tasks_.size();
if (!tasks_.empty() && is_started_) {
task = tasks_.front(); // 拿出队列中第一个任务
tasks_.pop();
assert(size - 1 == tasks_.size());
}
if (task != nullptr) {
std::cout << "thread_pool::take() tid : " << std::this_thread::get_id() << " took a task!" << std::endl;
}
return task;
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@└────> # g++ main.cc thread_pool.cc -lpthread

@└────> # ./a.out
thread_pool::threadLoop() tid : 139841310938880 start.
thread_pool::take() tid : 139841310938880 wakeup.
thread_pool::take() tid : 139841310938880 took a task!
thread_pool::threadLoop() tid : 139841302546176 start.
thread_pool::take() tid : 139841302546176 wakeup.
thread_pool::take() tid : 139841302546176 took a task!
thread_pool::threadLoop() tid : 139841294153472 start.
thread_pool::take() tid : 139841294153472 wakeup.
thread_pool::take() tid : 139841294153472 took a task!
===============================================================
| testFunc() at thread [ 139841310938880] output [0] |
===============================================================
thread_pool::take() tid : 139841310938880 wakeup.
thread_pool::take() tid : 139841310938880 took a task!
===============================================================
| testFunc() at thread [ 139841302546176] output [0] |
===============================================================
thread_pool::take() tid : 139841302546176 wakeup.
thread_pool::take() tid : 139841302546176 took a task!
===============================================================
| testFunc() at thread [ 139841294153472] output [0] |
===============================================================
thread_pool::take() tid : 139841294153472 wait.
===============================================================
| testFunc() at thread [ 139841310938880] output [0] |
===============================================================
thread_pool::take() tid : 139841310938880 wait.
===============================================================
| testFunc() at thread [ 139841302546176] output [0] |
===============================================================
thread_pool::take() tid : 139841302546176 wait.
(键入回车)
thread_pool::stop() stop.
thread_pool::stop() notifyAll().
thread_pool::take() tid : 139841294153472 wakeup.
thread_pool::threadLoop() tid : 139841294153472 exit.
thread_pool::take() tid : 139841310938880 wakeup.
thread_pool::threadLoop() tid : 139841310938880 exit.
thread_pool::take() tid : 139841302546176 wakeup.
thread_pool::threadLoop() tid : 139841302546176 exit.

从结果可以看出,确实是三个线程在执行 5 个任务,并且线程执行完成后并没有销毁。这里的函数都是 void() 类型,即返回值为 void,并且没有参数,不具备泛用性。如果需要线程执行其他类型函数则需要包装函数类。

通用线程池请见:通用线程池