文章大图来源:pixiv_id=105287025
1 线程池
1.1 基本概念
线程池 是一种多线程处理机制。线程池 会预先创建一组线程,这些线程在应用程序启动时就已经创建好,且可以反复使用,等待分配任务并执行。
当有新的任务需要执行时,线程池 会分配一个空闲的线程来执行该任务,而不需要每次执行任务都重新创建线程和销毁线程。
1.2 基本组成部分
-
工作线程
线程池中实际执行任务的线程。这些线程在创建后通常处于等待状态,等待任务到来后被唤醒执行任务。
在一个简单的线程池实现中,工作线程会不断地检查任务队列,看是否有任务需要执行。
-
任务队列
用于存储等待执行的任务的队列。任务通常是一个可调用对象(例如函数)。当有新的任务需要执行时,任务会被添加到任务队列中。
任务队列可以是先进先出(FIFO)的结构,以保证任务按照添加的顺序执行。
1.3 线程池的优势
-
提高性能
如果不使用线程池的机制,采用每次过来一个任务都重新创建一个线程执行,执行完之后再销毁线程,这样会有比较大的开销。
使用线程池可以重用现有的线程,从而一定程度上减少了频繁创建和销毁线程的开销。
-
并发线程数量控制
线程池会预先创建一定数量的线程,从而限制并发线程的数量,防止系统因为创建过多的线程而导致资源耗尽(例如 CPU 过载、内存不足等问题)。
-
简化线程管理
实现线程池可以避免手动地管理线程的生命周期。通常线程池还提供了任务排队和调度功能,使得多线程编程更加容易。
2 简单线程池实现
2.1 基本结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| class ThreadPool{ void work(); bool stop; std::queue<std::function<void()>> tasks; std::vector<std::thread> threads; std::condition_variable cv; std::mutex mtx; public: ThreadPool(int thread_num);
~ThreadPool();
template<typename F, typename... Arg> auto submit(F&& f, Arg&&... args) -> std::future<typename std::result_of<F(Arg...)>::type>; };
|
在上面的代码中,定义一个 ThreadPool
类,也就是我们的线程池。
-
公有成员
在 ThreadPool
类中的 构造函数,用于预先创建一定数量的线程。析构函数 则用于线程池停止工作,等待所有线程执行完毕。
还实现了一个 submit
函数来提交任务到任务队列,此处使用好了可变参数模板,其中 f
表示传入的函数(一个可调用对象,表示需要执行的任务),args
则为函数的参数。submit
函数的返回值是一个 std::future
对象,且此 std::future
对象的结果值类型为 typename std::result_of<F(Arg...)
,也就是传入的函数的结果的数据类型。
在 submit
的具体实现中,会通过 std::packaged_task
结合 std::bind
将任务包装,并转化为 std::function<void()>
类型传入到任务队列中,以便处理各种不同返回值和参数的任务。
-
私有成员
work()
函数是线程接口函数,是一个线程所需执行的任务内容。
stop
是一个 bool
类型变量,用于标记线程池是否停止工作。当 stop
为 true
时,标志着线程池停止工作。
cv
是一个 std::condition_variable
条件变量,主要用于在 有任务进队时唤醒阻塞状态中的线程;也用于在线程池停止工作时,唤醒所有线程线程池已停止。
mtx
是一个 std::mutex
互斥锁,用于多线程互斥访问共享数据,共享数据包括 任务队列 tasks
以及 线程池停止标志 stop
。
works
是一个可变长的数组,存储一定数量的 std::thread
线程,每个线程执行 work
函数。
tasks
是任务队列,存储的是 std::function<void()>
的可调用对象,每个任务函数经过 submit
函数后被包装成统一的 std::function<void()>
可调用对象。
2.2 构造函数实现
构造函数中,我们需要预先创建 thread_num
个线程,每个线程执行的是一个 ThreadPool::work
函数。故我们可以用 类成员函数 创建线程的方法:
1 2 3 4 5 6
| ThreadPool::ThreadPool(int thread_num): stop(false) { for(size_t i = 0; i < thread_num; i ++ ){ threads.emplace_back(&ThreadPool::work, this); } }
|
在上面的代码中,stop
被初始化为 false
,表示线程开始工作;threads
中依次放入 thread_num
个执行 ThreadPool::work
的线程。this
指针是一个隐含的指针,它指向调用该成员函数的对象本身。
2.3 析构函数实现
析构函数中,首先我们需要做的是更新线程池停止标记为 true
,此时我们需要用到互斥锁。
然后,我们需要通知所有等待的线程,当前线程池已经停止,可以直接 return
返回;而其它还处于执行任务中的线程可以让它们执行任务结束。
最后,将所有线程调用 join()
等待线程执行完毕。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| ThreadPool::~ThreadPool(){ { std::lock_guard<std::mutex> lck(mtx); stop = true; }
cv.notify_all();
for(auto& t : threads){ if(t.joinable()){ t.join(); } } }
|
在上面的代码中,先定义一个作用域,内部使用 std::lock_guard
进行加锁,对 stop
进行修改,作用域结束后自动解锁。然后通过条件变量 cv
调用 notify_all
来通知所有线程此时线程池已经停止。最后每个线程调用 join()
(可以先检查一下是否 joinable()
)。
2.4 submit 提交任务函数实现
submit
函数用于向线程池的任务队列中提交一个任务,并返回一个 std::future
对象,通过这个 std::future
对象,后续可以通过 std::future::get()
获取任务执行的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| template<typename F, typename... Arg> auto ThreadPool::submit(F&& f, Arg&&... args) -> std::future<typename std::result_of<F(Arg...)>::type>{ using func_type = typename std::result_of<F(Arg...)>::type;
auto task = std::make_shared<std::packaged_task<func_type()>>( std::bind(std::forward<F>(f), std::forward<Arg>(args)...) );
{ std::lock_guard<std::mutex> lck(mtx); if(stop){ throw std::runtime_error("ERROR: The thread pool is stoped."); } tasks.emplace([task](){ (*task)(); }); }
cv.notify_one();
return task -> get_future(); }
|
在上面的代码中,主要分为以下几个模块:
-
确定任务返回类型
首先通过 using func_type = typename std::result_of<F(Arg...)>::type;
来确定传入的可调用对象 f
在给定参数 args
情况下执行后的返回类型。例如,如果 f
是一个返回 int
的函数,那么 func_type
就会被推导为 int
。
-
构造 packaged_task 对象
使用 std::make_shared<std::packaged_task<func_type()>>
创建一个智能指针,指向一个 std::packaged_task
对象。
std::packaged_task
对一个可调用对象进行包装,使得它可以异步执行并获取其返回结果。这里通过 std::bind
把传入的可调用对象 f
和可变参数 args
进行绑定,构建出一个可被 std::packaged_task
包装的可调用实体,同时利用 std::forward
进行完美转发,保证参数的原始类型(左值或右值特性)得以保留传递,避免不必要的拷贝或移动语义问题。
为什么使用智能指针 指向一个 std::packaged_task
对象:因为std::packaged_task
是一个 可移动但不可复制 的类型。而为了允许多个线程并发地访问一个 std::packaged_task
对象,则采用了智能指针将指向一个 std::packaged_task
对象。
智能指针可以保证 std::packaged_task
对象在多线程并发访问时的正确生命周期管理,避免数据竞争导致的未定义行为。
-
添加任务到任务队列
任务队列属于共享数据,因此需要 std::lock_guard
加锁确保互斥访问。
然后将得到的 task
智能指针通过 *task
解引用,获取包装的任务,并通过 (*task)()
来调用这个任务。传入任务队列可以定义一个 lambda
匿名函数,无返回值无参数,也就是相当于一个 std::function<void()>
函数,并在匿名函数中通过 (*task)()
来调用包装的任务。
-
线程唤醒
当一个任务入队之后,通过 cv
调用 notify_one()
来唤醒一个等待的线程执行任务。
-
获取任务结果
最后返回一个 std::future
对象,等待获取任务的结果。
2.5 work 执行任务函数实现
在 work
函数中,每个线程会不断尝试从任务队列中取出任务并执行,直到线程池被标记且任务队列为空停止。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| void ThreadPool::work(){ while(true){ std::function<void()> task;
{ std::unique_lock<std::mutex> lck(mtx); while(tasks.empty() && !stop){ cv.wait(lck); } if(tasks.empty() && stop){ return; } task = std::move(tasks.front()); tasks.pop(); }
task(); } }
|
在上面的代码中,通过 while(true)
构建一个无限循环,表示线程会持续尝试获取任务执行。
在访问任务队列时,需要先进行加锁。由于我们需要通过 cv
条件变量来确保尝试取出任务时,任务队列中有任务且线程池未停止,所以加锁需要用 std::unique_lock
。
当 tasks.empty() && !stop
即任务队列为空且线程池未停止,线程会持续陷入阻塞状态。当 tasks
中有任务时,则取出任务;若 tasks.empty() && stop
即任务队列为空且线程池停止时,表明线程池完全停止了,此时直接返回。
成功取出一个任务后,通过 task()
执行任务。执行完毕后会再次进入循环继续进行尝试。
3 线程池测试
3.1 完整线程池代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
| #include <iostream> #include <thread> #include <mutex> #include <future> #include <condition_variable> #include <functional> #include <vector> #include <queue>
class ThreadPool{ void work(); bool stop; std::queue<std::function<void()>> tasks; std::vector<std::thread> threads; std::condition_variable cv; std::mutex mtx; public: ThreadPool(int thread_num);
~ThreadPool();
template<typename F, typename... Arg> auto submit(F&& f, Arg&&... args) -> std::future<typename std::result_of<F(Arg...)>::type>; };
ThreadPool::ThreadPool(int thread_num): stop(false) { for(size_t i = 0; i < thread_num; i ++ ){ threads.emplace_back(&ThreadPool::work, this); } }
ThreadPool::~ThreadPool(){ { std::lock_guard<std::mutex> lck(mtx); stop = true; }
cv.notify_all();
for(auto& t : threads){ if(t.joinable()){ t.join(); } } }
template<typename F, typename... Arg> auto ThreadPool::submit(F&& f, Arg&&... args) -> std::future<typename std::result_of<F(Arg...)>::type>{ using func_type = typename std::result_of<F(Arg...)>::type;
auto task = std::make_shared<std::packaged_task<func_type()>>( std::bind(std::forward<F>(f), std::forward<Arg>(args)...) );
{ std::lock_guard<std::mutex> lck(mtx); if(stop){ throw std::runtime_error("ERROR: The thread pool is stoped."); } tasks.emplace([task](){ (*task)(); }); }
cv.notify_one();
return task -> get_future(); }
void ThreadPool::work(){ while(true){ std::function<void()> task;
{ std::unique_lock<std::mutex> lck(mtx); while(tasks.empty() && !stop){ cv.wait(lck); } if(tasks.empty() && stop){ return; } task = std::move(tasks.front()); tasks.pop(); }
task(); } }
|
3.2 线程池测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| #include <iostream> #include <future> #include <vector> #include "Thread_Pool.cpp" using namespace std;
int main(){ ThreadPool pool(4);
vector<pair<future<int>, int>> results; for(int i = 0; i < 12; i ++ ){ auto res = pool.submit([](int x){ cout << "Task " << x << ": thread id = " << this_thread::get_id() << "\n"; return x * x; }, i); results.emplace_back(move(res), i); }
for(auto& [res, id] : results){ cout << "Task " << id << ": result = " << res.get() << "\n"; }
return 0; }
|
在上面的代码中,创建了一个有 4 个线程的线程池。提交了共 12 个任务,统一将 std::future
对象和编号传入到结果数组 results
中。最后依次调用 std::future::get()
来获取结果。
可能的运行结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| Task Task 0: thread id = Task 2: thread id = Task 140461319153408 140461335938816 Task 5: thread id = 140461335938816 Task 6: thread id = 140461335938816 Task 7: thread id = 140461335938816 Task 8: thread id = 140461335938816 Task 9: thread id = 140461335938816 Task 10: thread id = 140461335938816 Task 11: thread id = 140461335938816 Task Task 0: result = 4: thread id = 140461319153408 3: thread id = 140461327546112 1: thread id = 140461310760704 0 Task 1: result = 1 Task 2: result = 4 Task 3: result = 9 Task 4: result = 16 Task 5: result = 25 Task 6: result = 36 Task 7: result = 49 Task 8: result = 64 Task 9: result = 81 Task 10: result = 100 Task 11: result = 121
|
参考
- 线程池和实现