「C++ 多线程」线程池与简单线程池实现

文章大图来源:pixiv_id=105287025

1 线程池

1.1 基本概念

线程池 是一种多线程处理机制。线程池 会预先创建一组线程,这些线程在应用程序启动时就已经创建好,且可以反复使用,等待分配任务并执行。

当有新的任务需要执行时,线程池 会分配一个空闲的线程来执行该任务,而不需要每次执行任务都重新创建线程和销毁线程。

1.2 基本组成部分

  • 工作线程

    线程池中实际执行任务的线程。这些线程在创建后通常处于等待状态,等待任务到来后被唤醒执行任务。

    在一个简单的线程池实现中,工作线程会不断地检查任务队列,看是否有任务需要执行。

  • 任务队列

    用于存储等待执行的任务的队列。任务通常是一个可调用对象(例如函数)。当有新的任务需要执行时,任务会被添加到任务队列中。

    任务队列可以是先进先出(FIFO)的结构,以保证任务按照添加的顺序执行。

1.3 线程池的优势

  • 提高性能

    如果不使用线程池的机制,采用每次过来一个任务都重新创建一个线程执行,执行完之后再销毁线程,这样会有比较大的开销。

    使用线程池可以重用现有的线程,从而一定程度上减少了频繁创建和销毁线程的开销。

  • 并发线程数量控制

    线程池会预先创建一定数量的线程,从而限制并发线程的数量,防止系统因为创建过多的线程而导致资源耗尽(例如 CPU 过载、内存不足等问题)。

  • 简化线程管理

    实现线程池可以避免手动地管理线程的生命周期。通常线程池还提供了任务排队和调度功能,使得多线程编程更加容易。

2 简单线程池实现

2.1 基本结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class ThreadPool{
void work(); // 线程执行的函数(任务内容)
bool stop; // 线程池是否停止标记
std::queue<std::function<void()>> tasks; // 任务队列,任务被包装为 void function
std::vector<std::thread> threads; // 线程池
std::condition_variable cv; // 条件变量,用于唤醒等待的线程
std::mutex mtx; // 互斥量,用于互斥访问标记、任务队列
public:
// 构造函数,预先创建 thread_num 个线程
ThreadPool(int thread_num);

// 析构函数
~ThreadPool();

// 添加任务到任务队列(参数:函数,可变参数模板)
// 传入万能引用,返回一个 std::future 对象(获取结果为 std::result_of<F(Arg...)>::type)
template<typename F, typename... Arg>
auto submit(F&& f, Arg&&... args) -> std::future<typename std::result_of<F(Arg...)>::type>;
};

在上面的代码中,定义一个 ThreadPool 类,也就是我们的线程池。

  • 公有成员

    ThreadPool 类中的 构造函数,用于预先创建一定数量的线程。析构函数 则用于线程池停止工作,等待所有线程执行完毕。

    还实现了一个 submit 函数来提交任务到任务队列,此处使用好了可变参数模板,其中 f 表示传入的函数(一个可调用对象,表示需要执行的任务),args 则为函数的参数。submit 函数的返回值是一个 std::future 对象,且此 std::future 对象的结果值类型为 typename std::result_of<F(Arg...),也就是传入的函数的结果的数据类型。

    submit 的具体实现中,会通过 std::packaged_task 结合 std::bind 将任务包装,并转化为 std::function<void()> 类型传入到任务队列中,以便处理各种不同返回值和参数的任务。

  • 私有成员

    work() 函数是线程接口函数,是一个线程所需执行的任务内容。

    stop 是一个 bool 类型变量,用于标记线程池是否停止工作。当 stoptrue 时,标志着线程池停止工作。

    cv 是一个 std::condition_variable 条件变量,主要用于在 有任务进队时唤醒阻塞状态中的线程;也用于在线程池停止工作时,唤醒所有线程线程池已停止。

    mtx 是一个 std::mutex 互斥锁,用于多线程互斥访问共享数据,共享数据包括 任务队列 tasks 以及 线程池停止标志 stop

    works 是一个可变长的数组,存储一定数量的 std::thread 线程,每个线程执行 work 函数。

    tasks 是任务队列,存储的是 std::function<void()> 的可调用对象,每个任务函数经过 submit 函数后被包装成统一的 std::function<void()> 可调用对象。

2.2 构造函数实现

构造函数中,我们需要预先创建 thread_num 个线程,每个线程执行的是一个 ThreadPool::work 函数。故我们可以用 类成员函数 创建线程的方法:

1
2
3
4
5
6
// 构造函数
ThreadPool::ThreadPool(int thread_num): stop(false) {
for(size_t i = 0; i < thread_num; i ++ ){
threads.emplace_back(&ThreadPool::work, this);
}
}

在上面的代码中,stop 被初始化为 false,表示线程开始工作;threads 中依次放入 thread_num 个执行 ThreadPool::work 的线程。this 指针是一个隐含的指针,它指向调用该成员函数的对象本身。

2.3 析构函数实现

析构函数中,首先我们需要做的是更新线程池停止标记为 true,此时我们需要用到互斥锁。

然后,我们需要通知所有等待的线程,当前线程池已经停止,可以直接 return 返回;而其它还处于执行任务中的线程可以让它们执行任务结束。

最后,将所有线程调用 join() 等待线程执行完毕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 析构函数
ThreadPool::~ThreadPool(){
{
std::lock_guard<std::mutex> lck(mtx); // 加锁,互斥访问 stop 标记
stop = true;
}

// 通知所有等待中的线程,线程池已经停止
cv.notify_all();

// 等待所有子线程运行完毕
for(auto& t : threads){
if(t.joinable()){
t.join();
}
}
}

在上面的代码中,先定义一个作用域,内部使用 std::lock_guard 进行加锁,对 stop 进行修改,作用域结束后自动解锁。然后通过条件变量 cv 调用 notify_all 来通知所有线程此时线程池已经停止。最后每个线程调用 join() (可以先检查一下是否 joinable())。

2.4 submit 提交任务函数实现

submit 函数用于向线程池的任务队列中提交一个任务,并返回一个 std::future 对象,通过这个 std::future 对象,后续可以通过 std::future::get() 获取任务执行的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 提交任务函数
template<typename F, typename... Arg>
auto ThreadPool::submit(F&& f, Arg&&... args) -> std::future<typename std::result_of<F(Arg...)>::type>{
// 执行 f 函数返回的数据类型
using func_type = typename std::result_of<F(Arg...)>::type;

// 通过智能指针,指向函数模板为 func_type() 的包装任务,避免共同访问时被销毁
// 通过 std::bind 绑定函数、可变参数列表得到一个包装任务
// std::forward 用于完美转发,将参数以原始的类型(左值或右值)传递
auto task = std::make_shared<std::packaged_task<func_type()>>(
std::bind(std::forward<F>(f), std::forward<Arg>(args)...)
);

// 添加任务到任务队列
{
std::lock_guard<std::mutex> lck(mtx);
if(stop){
throw std::runtime_error("ERROR: The thread pool is stoped.");
}
// 将前面构造好的可调用对象 packaged_task 添加任务队列
tasks.emplace([task](){ // 捕获 task 智能指针
(*task)(); // 解引用,获取指向的包装任务,并通过()调用这个任务
});
}

// 唤醒一个等待的线程来执行任务
cv.notify_one();

// 返回 std::future 对象,后续等待获取结果
return task -> get_future();
}

在上面的代码中,主要分为以下几个模块:

  • 确定任务返回类型

    首先通过 using func_type = typename std::result_of<F(Arg...)>::type; 来确定传入的可调用对象 f 在给定参数 args 情况下执行后的返回类型。例如,如果 f 是一个返回 int 的函数,那么 func_type 就会被推导为 int

  • 构造 packaged_task 对象

    使用 std::make_shared<std::packaged_task<func_type()>> 创建一个智能指针,指向一个 std::packaged_task 对象。

    std::packaged_task 对一个可调用对象进行包装,使得它可以异步执行并获取其返回结果。这里通过 std::bind 把传入的可调用对象 f 和可变参数 args 进行绑定,构建出一个可被 std::packaged_task 包装的可调用实体,同时利用 std::forward 进行完美转发,保证参数的原始类型(左值或右值特性)得以保留传递,避免不必要的拷贝或移动语义问题。

    为什么使用智能指针 指向一个 std::packaged_task 对象:因为std::packaged_task 是一个 可移动但不可复制 的类型。而为了允许多个线程并发地访问一个 std::packaged_task 对象,则采用了智能指针将指向一个 std::packaged_task 对象。

    智能指针可以保证 std::packaged_task 对象在多线程并发访问时的正确生命周期管理,避免数据竞争导致的未定义行为。

  • 添加任务到任务队列

    任务队列属于共享数据,因此需要 std::lock_guard 加锁确保互斥访问。

    然后将得到的 task 智能指针通过 *task 解引用,获取包装的任务,并通过 (*task)() 来调用这个任务。传入任务队列可以定义一个 lambda 匿名函数,无返回值无参数,也就是相当于一个 std::function<void()> 函数,并在匿名函数中通过 (*task)() 来调用包装的任务。

  • 线程唤醒

    当一个任务入队之后,通过 cv 调用 notify_one() 来唤醒一个等待的线程执行任务。

  • 获取任务结果

    最后返回一个 std::future 对象,等待获取任务的结果。

2.5 work 执行任务函数实现

work 函数中,每个线程会不断尝试从任务队列中取出任务并执行,直到线程池被标记且任务队列为空停止。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 线程执行的函数
void ThreadPool::work(){
while(true){
// 定义一个任务
std::function<void()> task;

// 从任务队列中取出一个任务
{
std::unique_lock<std::mutex> lck(mtx);
while(tasks.empty() && !stop){ // 避免虚假唤醒
cv.wait(lck); // 需要等待任务进队,线程陷入阻塞
}
// 可能由于 stop 而退出上面的循环,此时若任务队列为空,直接返回
if(tasks.empty() && stop){ // 任务队列为空且线程池停止
return;
}
task = std::move(tasks.front());
tasks.pop();
}

// 执行任务
task();
}
}

在上面的代码中,通过 while(true) 构建一个无限循环,表示线程会持续尝试获取任务执行。

在访问任务队列时,需要先进行加锁。由于我们需要通过 cv 条件变量来确保尝试取出任务时,任务队列中有任务且线程池未停止,所以加锁需要用 std::unique_lock

tasks.empty() && !stop 即任务队列为空且线程池未停止,线程会持续陷入阻塞状态。当 tasks 中有任务时,则取出任务;若 tasks.empty() && stop 即任务队列为空且线程池停止时,表明线程池完全停止了,此时直接返回。

成功取出一个任务后,通过 task() 执行任务。执行完毕后会再次进入循环继续进行尝试。

3 线程池测试

3.1 完整线程池代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#include <iostream>
#include <thread>
#include <mutex>
#include <future>
#include <condition_variable>
#include <functional>
#include <vector>
#include <queue>

class ThreadPool{
void work(); // 线程执行的函数(任务内容)
bool stop; // 线程池是否停止标记
std::queue<std::function<void()>> tasks; // 任务队列,任务被包装为 void function
std::vector<std::thread> threads; // 线程池
std::condition_variable cv; // 条件变量,用于唤醒等待的线程
std::mutex mtx; // 互斥量,用于互斥访问标记、任务队列
public:
// 构造函数,传入线程池中线程数量
ThreadPool(int thread_num);

// 析构函数
~ThreadPool();

// 添加任务到任务队列函数,返回类型后置(参数:函数,可变参数模板)
// 传入万能引用,返回一个 std::future 对象(获取结果为 std::result_of<F(Arg...)>::type)
template<typename F, typename... Arg>
auto submit(F&& f, Arg&&... args) -> std::future<typename std::result_of<F(Arg...)>::type>;
};

// 构造函数
ThreadPool::ThreadPool(int thread_num): stop(false) {
for(size_t i = 0; i < thread_num; i ++ ){
threads.emplace_back(&ThreadPool::work, this);
}
}

// 析构函数
ThreadPool::~ThreadPool(){
{
std::lock_guard<std::mutex> lck(mtx); // 加锁,互斥访问 stop 标记
stop = true;
}

// 通知所有等待中的线程,线程池已经停止
cv.notify_all();

// 等待所有子线程运行完毕
for(auto& t : threads){
if(t.joinable()){
t.join();
}
}
}

// 提交任务函数
template<typename F, typename... Arg>
auto ThreadPool::submit(F&& f, Arg&&... args) -> std::future<typename std::result_of<F(Arg...)>::type>{
// 执行 f 函数返回的数据类型
using func_type = typename std::result_of<F(Arg...)>::type;

// 通过智能指针,指向函数模板为 func_type() 的包装任务,避免共同访问时被销毁
// 通过 std::bind 绑定函数、可变参数列表得到一个包装任务
// std::forward 用于完美转发,将参数以原始的类型(左值或右值)传递
auto task = std::make_shared<std::packaged_task<func_type()>>(
std::bind(std::forward<F>(f), std::forward<Arg>(args)...)
);

// 添加任务到任务队列
{
std::lock_guard<std::mutex> lck(mtx);
if(stop){
throw std::runtime_error("ERROR: The thread pool is stoped.");
}
// 将前面构造好的可调用对象 packaged_task 添加任务队列
tasks.emplace([task](){ // 捕获 task 智能指针
(*task)(); // 解引用,获取指向的包装任务,并通过()调用这个任务
});
}

// 唤醒一个等待的线程来执行任务
cv.notify_one();

// 返回 std::future 对象,后续等待获取结果
return task -> get_future();
}

// 线程执行的函数
void ThreadPool::work(){
while(true){
// 定义一个任务
std::function<void()> task;

// 从任务队列中取出一个任务
{
std::unique_lock<std::mutex> lck(mtx);
while(tasks.empty() && !stop){ // 避免虚假唤醒
cv.wait(lck); // 需要等待任务进队,线程陷入阻塞
}
// 可能由于 stop 而退出上面的循环,此时若任务队列为空,直接返回
if(tasks.empty() && stop){ // 任务队列为空且线程池停止
return;
}
task = std::move(tasks.front());
tasks.pop();
}

// 执行任务
task();
}
}

3.2 线程池测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <future>
#include <vector>
#include "Thread_Pool.cpp" // 引入线程池
using namespace std;

int main(){
ThreadPool pool(4); // 创建一个有 4 个线程的线程池

// 提交一些任务
vector<pair<future<int>, int>> results; // 存储 future 对象及任务编号
for(int i = 0; i < 12; i ++ ){
// 提交任务后,得到 std::future 对象
auto res = pool.submit([](int x){
cout << "Task " << x << ": thread id = " << this_thread::get_id() << "\n";
return x * x;
}, i);
results.emplace_back(move(res), i); // std::future 不可拷贝
}

// 获取结果
for(auto& [res, id] : results){
cout << "Task " << id << ": result = " << res.get() << "\n";
}

return 0;
}

在上面的代码中,创建了一个有 4 个线程的线程池。提交了共 12 个任务,统一将 std::future 对象和编号传入到结果数组 results 中。最后依次调用 std::future::get() 来获取结果。

可能的运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Task Task 0: thread id = Task 2: thread id = Task 140461319153408
140461335938816
Task 5: thread id = 140461335938816
Task 6: thread id = 140461335938816
Task 7: thread id = 140461335938816
Task 8: thread id = 140461335938816
Task 9: thread id = 140461335938816
Task 10: thread id = 140461335938816
Task 11: thread id = 140461335938816
Task Task 0: result = 4: thread id = 140461319153408
3: thread id = 140461327546112
1: thread id = 140461310760704
0
Task 1: result = 1
Task 2: result = 4
Task 3: result = 9
Task 4: result = 16
Task 5: result = 25
Task 6: result = 36
Task 7: result = 49
Task 8: result = 64
Task 9: result = 81
Task 10: result = 100
Task 11: result = 121

参考

  1. 线程池和实现

「C++ 多线程」线程池与简单线程池实现
https://marisamagic.github.io/2025/01/12/20250112/
作者
MarisaMagic
发布于
2025年1月12日
许可协议