文章大图来源:pixiv_id=125244936
元旦快乐!🎉🤗
1 问题背景
1.1 生产者-消费者问题
1.1.1 定义
生产者-消费者问题 (Producer - Consumer Problem)是一个经典的多线程并发编程问题。这个问题描述了在一个共享缓冲区环境下,两类线程(分别为 生产者 和 消费者 )之间的协作关系。
生产者 :负责生成数据,并将数据放入共享缓冲区;
消费者 :从共享缓冲区中取出数据,并进行一系列操作。
例如,在一个甜品店中,甜点师(生产者)制作甜点并且将其放在展示架(缓冲区),顾客(消费者)从展示架(缓冲区)上拿走甜点并购买消费。
1.1.2 问题约束
共享缓冲区通常有一个固定的大小;
同步问题:
生产者和消费者的速度可能不同。生产者可能生产数据的速度比消费者消费数据快,导致缓冲区被填满,此时要等待消费者消费一些数据;反之,消费者可能消费数据的速度更快,缓冲区可能出现空的情况,那么消费者要等待生产者放入数据。
互斥问题:
共享缓冲区同一时刻只能被一个线程访问。两类线程对共享缓冲区需要互斥访问。如果不进行互斥访问,会造成数据竞争、数据不一致等问题。
1.2 仅用互斥锁实现生产者-消费者
1.2.1 实现思路和代码
对于上面 1.1.1 生产者-消费者问题 定义 的问题,我们可以考虑使用 互斥锁 来确保生产者和消费者对共享缓冲区的互斥访问。
那么,怎么处理两类线程的同步问题呢?我们可以先考虑一种比较低效的方法:
由此,我们可以得到一个简易的 生产者-消费者 代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <chrono> std::mutex mtx; std::queue<int > buffer; const int buffer_size = 10 ; void producer (int data_num) { for (int i = 1 ; i <= data_num; i ++ ){ std::unique_lock<std::mutex> lck (mtx) ; if (buffer.size () < buffer_size){ buffer.push (i); std::cout << "Produced: " << i << "\n" ; }else { lck.unlock (); std::this_thread::sleep_for (std::chrono::milliseconds (100 )); i -- ; } } }void consumer () { while (true ){ std::unique_lock<std::mutex> lck (mtx) ; if (!buffer.empty ()){ int data = buffer.front (); buffer.pop (); std::cout << "Consumed: " << data << "\n" ; lck.unlock (); std::this_thread::sleep_for (std::chrono::milliseconds (100 )); }else { lck.unlock (); std::this_thread::sleep_for (std::chrono::milliseconds (100 )); } } }int main () { std::thread producer_thread (producer, 100 ) ; std::thread consumer_thread (consumer) ; producer_thread.join (); consumer_thread.join (); return 0 ; }
可能的部分运行结果如下:
在上面的代码中,使用 std::queue
来模拟共享缓冲区,缓冲区的大小设定为 10
。
在生产者函数中,进行 data_num
次循环,每次循环生产一个数据。由于需要一些提前解锁然后休息的操作,因此采用了 std::unique_lock
更加灵活地处理加锁、解锁。如果缓冲区未满,那么可以放入数据;否则,先解锁,然后休息一段时间,使得消费者线程有机会获得锁。
在消费者函数中,消费者不断消费数据。也采用了 std::unique_lock
处理加锁、解锁。如果缓冲区不为空,那么可以取出数据,取出数据后,假设我们需要对数据进行一些处理等操作,并且这些操作不需要互斥量维护,那么可以先提前解锁,然后再进行对数据的一系列操作;否则,先解锁,然后休息一段时间,使得生产者线程有机会获得锁。
1.2.2 存在的问题
可以看出,“休息一段时间” 的做法显然是比较低效的,主要问题在于:
为了解决这些问题,C++11 引入了 条件变量 std::condition_variable。
2 std::condition_variable
2.1 基本概念
std::condition_variable
是 C++11 引入的一个类,在头文件 <condition_variable>
中,主要用于 线程间的同步 。std::condition_variable
在多线程编程中实现线程之间的等待和通知机制有着重要作用。
2.2 成员函数和基本用法
2.2.1 wait()
std::condition_variable::wait()
函数是 std::condition_variable
中的一个关键成员函数,主要用于让线程 等待某个条件成立 ,通常配合互斥量(mutex)进和 std::unique_lock
使用,以实现线程的同步。
其基本的函数原型为:
1 2 template < class Predicate >void wait ( std::unique_lock<std::mutex>& lock, Predicate pred ) ;
其中 lock
是互斥锁;pred
是一个 Predicate
类型,必须是一个 可调用对象 (例如函数、lambda 表示、函数指针等)。
基本的使用方法如下:
1 2 3 4 5 6 7 8 9 std::condition_variable cv; std::mutex mtx;void wait_foo () { std::unique_lock<std::mutex> lck (mtx) ; cv.wait (lck, check ()); }
当一个线程获得锁之后,调用 wait()
函数时:
如果 check()
函数判断条件为 true
,那么线程会继续向下执行;
如果 check()
函数判断条件为 false
,那么线程会阻塞在 cv.wait(lck, check());
这一行,进入等待,同时 线程会自动地暂时释放锁 。
线程进入阻塞(等待)状态,直到判断条件变为 true
时,线程会被唤醒,并重新获得锁继续执行下去。
而条件成立后,线程的唤醒需要配合另一个 std::condition_variable
中的函数—— std::condition_variable::notify_one()
(或者 std::condition_variable::notify_all()
)。
如果 wait()
函数不加 可调用对象 这个参数,那么 std::condition_variable::wait()
默认条件为 false
,线程会进入阻塞状态。
1 2 3 4 5 void wait_foo () { std::unique_lock<std::mutex> lck (mtx) ; cv.wait (lck); }
注意: 在使用 wait()
函数时,只能搭配 std::unique_lock
互斥锁使用,不能使用 std::lock_guard
。std::condition_variable::wait()
需要在等待期间自动释放互斥量,然后在被唤醒后重新获取互斥量。但是 std::lock_guard
没有提供这样的功能。
2.2.2 notify_one()
std::condition_variable::notify_one()
主要用于当某个条件成立,通知 一个 处于阻塞状态下(等待该条件成立)的线程,进而唤醒这个线程。
以下是一个简单的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <chrono> std::mutex mtx; std::condition_variable cv; bool flag; void wait_foo () { std::unique_lock<std::mutex> lck (mtx) ; std::cout << "wait_foo(): wating..." << "\n" ; cv.wait (lck, []{ return flag; }); std::cout << "wait_foo(): waiting ends, flag = " << flag << "\n" ; }void notify_foo () { std::this_thread::sleep_for (std::chrono::seconds (2 )); { std::lock_guard<std::mutex> lck (mtx) ; std::cout << "notify_foo(): notifying..." << "\n" ; } cv.notify_one (); std::this_thread::sleep_for (std::chrono::seconds (2 )); { std::lock_guard<std::mutex> lck (mtx) ; flag = true ; std::cout << "notify_foo(): notifying again..." << "\n" ; } cv.notify_one (); }int main () { std::thread wait_t (wait_foo) ; std::thread notify_t (notify_foo) ; wait_t .join (); notify_t .join (); return 0 ; }
在上面的代码中,定义了一个等待条件的线程函数 wait_foo()
和一个通知唤醒的线程函数 notify_foo()
。我们创建两个线程分别执行这两个函数,一个线程 wait_t
等待条件成立,一个线程 notify_t
用于唤醒这个等待条件成立的线程。
在 wait_foo()
中,先尝试获得锁,获得锁之后,判断条件 flag
是否为 true
,显然一开始 flag
为 false
,所以一开始 wait_t
会进入阻塞状态,同时暂时释放锁。
notify_foo()
中,一开始先尝试唤醒,此时条件 flag
没有进行改变,所以这次唤醒是失败的;然后尝试进行第二次唤醒,这次修改条件 flag
为 true
,之后通知唤醒阻塞状态的线程 wait_t
。
处于阻塞态的线程 wait_t
在线程 notify_t
第二次调用 std::condition_variable::notify_one()
函数后(此时条件被修改为 true
),成功被唤醒,重新获得锁,然后继续执行下去。
可能的运行结果如下:
2.2.3 notify_all()
与 std::condition_variable::notify_one()
不同的是,std::condition_variable::notify_all()
唤醒的是 所有 处于阻塞状态下(等待某个条件成立)。
在线程较多的情况下,std::condition_variable::notify_all()
通常比较高效。
但是,std::condition_variable::notify_all()
在有多种进行判断条件是否成立的线程时,会出现部分唤醒没有用的情况:
例如,在多生产者 - 多消费者模型中,当一个生产者调用 std::condition_variable::notify_all()
唤醒所有等待线程时,可能会出现唤醒其他一些处于阻塞态的生产者线程,那么这些唤醒可能暂时是没有用的。
不过这种影响比较小。当一类线程只有一个时,我们考虑使用 notify_one()
(每次唤醒一个处于阻塞态的线程)进行唤醒;当一类线程有很多个时,我们考虑使用 notify_all()
。
3 使用 condition_variable 实现生产者-消费者
3.1 实现思路和代码
在 2 std::condition_variable 中,已经介绍了 std::condition_variable
的基本用法。我们可以通过这个条件变量来实现 生产者-消费者 问题中的同步机制。
在生产者函数 producer
中,每次生产并向缓冲区放入数据前,需要等待条件 buffer.size() < BUFFER_SIZE
成立。条件成立时,线程继续执行下去。缓冲区中已经有数据了,此时生产者线程需要尝试唤醒消费者线程。
在消费者函数 consumer
中,每次消费一个数据之前,需要等待条件 !buffer.empty()
成立。 条件成立后继续执行。此时缓冲区出现空位,消费者线程需要唤醒生产者线程。
实现代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <chrono> std::mutex mtx; std::condition_variable cv; std::queue<int > buffer; const int BUFFER_SIZE = 10 ; void producer (int data_num) { for (int i = 1 ; i <= data_num; i ++ ){ std::unique_lock<std::mutex> lck (mtx) ; cv.wait (lck, [&](){ return buffer.size () < BUFFER_SIZE; }); buffer.push (i); std::cout << "Produced: " << i << "\n" ; cv.notify_one (); } }void consumer () { while (true ){ std::unique_lock<std::mutex> lck (mtx) ; cv.wait (lck, [&](){ return !buffer.empty (); }); int data = buffer.front (); buffer.pop (); std::cout << "Consumed: " << data << "\n" ; lck.unlock (); cv.notify_one (); std::this_thread::sleep_for (std::chrono::milliseconds (100 )); } }int main () { std::thread producer_thread (producer, 100 ) ; std::thread consumer_thread (consumer) ; producer_thread.join (); consumer_thread.join (); return 0 ; }
可能的运行结果如下:
在上面的代码中,因为只有一个生产者和消费者,所以暂时只要用 notify_one()
就好了。
3.2 多生产者、消费者问题
3.2.1 多生产者、消费者实现
在 3.1 condition_variable 实现消费者-生产者代码 中,我们只有一个生产者和一个消费者。
但实际场景中往往会有多个生产者和消费者,例如 1.1.1 生产者-消费者问题定义 中的甜点师-顾客的例子,一个店铺里可能有多个甜点师在制作,会有很多个顾客光临这个甜品店。
我们可以修改 3.1 生产者-消费者实现代码 中的代码,创建多个生产者线程和多个消费者线程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <chrono> std::mutex mtx; std::condition_variable cv; std::queue<int > buffer; const int BUFFER_SIZE = 10 ; const int PRODUCER_NUM = 2 ; const int CONSUMER_NUM = 5 ; void producer (int id, int data_num) { for (int i = 1 ; i <= data_num; i ++ ){ std::unique_lock<std::mutex> lck (mtx) ; cv.wait (lck, [&](){ return buffer.size () < BUFFER_SIZE; }); buffer.push (i); std::cout << "producer " << id << " produced data: " << i << "\n" ; cv.notify_all (); } }void consumer (int id) { while (true ){ std::unique_lock<std::mutex> lck (mtx) ; cv.wait (lck, [&](){ return !buffer.empty (); }); int data = buffer.front (); buffer.pop (); std::cout << "consumer " << id << " consumed data: " << data << "\n" ; lck.unlock (); cv.notify_all (); std::this_thread::sleep_for (std::chrono::milliseconds (100 )); } }int main () { std::vector<std::thread> producers (PRODUCER_NUM) ; std::vector<std::thread> consumers (CONSUMER_NUM) ; for (size_t i = 0 ; i < producers.size (); i ++ ){ producers[i] = std::thread (producer, i + 1 , 100 ); } for (size_t i = 0 ; i < consumers.size (); i ++ ){ consumers[i] = std::thread (consumer, i + 1 ); } for (auto & t : producers) t.join (); for (auto & t : consumers) t.join (); return 0 ; }
在上面的代码中,定义了一个生产者个数和一个消费者个数,在线程接口函数中传递了线程编号标记的参数 id
(便于表示不同的生产者/消费者)。
由于现在是多个生产者线程和多个消费者线程,因此将 notify_one()
改为了 notify_all()
。其他部分代码大致相同。
可能的运行结果如下:
看似运行起来好像没有什么毛病,但实际上在 多生产者线程、多消费者线程 情况下,可能会出现 虚假唤醒 的问题。
3.2.2 虚假唤醒问题
虚假唤醒 是指线程从等待条件变量 std::condition_variable::wait()
函数中唤醒,但实际上等待的条件并没有真正满足。通常 虚假唤醒 的问题是由于多线程环境下 系统调度、信号传输 等原因导致的。
例如,缓冲区为空,当一个生产者线程放入一个数据后(放入后缓冲区就这一个数据),通知唤醒了一个等待的消费者线程的时候,有另一个消费者线程消费了这个唯一的数据。这是线程调度的不确定性所导致的。
此时,这个消费者线程收到被唤醒的通知,表面上条件成立了,但是实际上 buffer
还是为空。这就出现了 虚假唤醒 。
在现实场景中,我想要买草莓蛋糕但是甜点师正在制作中,等待了一会,甜点师通知我展示架上有新的草莓蛋糕了,我正准备去拿的时候却被另一个人给抢先了。这正是太倒霉了!🍓🍰
虚假唤醒还有很多其他的情况,总之类似上述的条件没有实际满足的情况都属于虚假唤醒。
3.2.3 虚假唤醒的解决
我们可以通过线程被唤醒后,再次检查是否满足条件成立 。再次检查条件如果发现不成立,那么线程需要又一次进入阻塞状态,因此我们需要实现一个 循环检查 。
在 2.2.1 std::condition_variable::wait() 中,如果 wait()
函数不接受 可调用对象 的参数,函数默认条件为 false
,线程会进入阻塞状态。
在生产者线程函数中,我们可以写一个 while
循环,循环判断的条件为 buffer.size() >= BUFFER_SIZE
,如果条件满足,那么执行 cv.wait(lck)
,使得生产者线程进入阻塞状态,同时暂时释放互斥锁。
生产者线程卡在 cv.wait(lck);
这一行,如果某个消费者线程唤醒了生产者线程,那么会再次进入 while
循环进行条件判断。如果此时 buffer.size() < BUFFER_SIZE
,那么会退出循环,然后生产者线程继续执行下去;否则,生产者线程会再次陷入阻塞状态,同时暂时释放互斥锁。
这样就通过 while
循环实现了再次检查以及循环检查。类似的,消费者线程中也可以写一个 while
循环,确保消费者线程不会被虚假唤醒。
修改后的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <chrono> std::mutex mtx; std::condition_variable cv; std::queue<int > buffer; const int BUFFER_SIZE = 10 ; const int PRODUCER_NUM = 2 ; const int CONSUMER_NUM = 5 ; void producer (int id, int data_num) { for (int i = 1 ; i <= data_num; i ++ ){ std::unique_lock<std::mutex> lck (mtx) ; while (buffer.size () >= BUFFER_SIZE){ cv.wait (lck); } buffer.push (i); std::cout << "producer " << id << " produced data: " << i << "\n" ; cv.notify_all (); } }void consumer (int id) { while (true ){ std::unique_lock<std::mutex> lck (mtx) ; while (buffer.empty ()){ cv.wait (lck); } int data = buffer.front (); buffer.pop (); std::cout << "consumer " << id << " consumed data: " << data << "\n" ; lck.unlock (); cv.notify_all (); std::this_thread::sleep_for (std::chrono::milliseconds (100 )); } }int main () { std::vector<std::thread> producers (PRODUCER_NUM) ; std::vector<std::thread> consumers (CONSUMER_NUM) ; for (size_t i = 0 ; i < producers.size (); i ++ ){ producers[i] = std::thread (producer, i + 1 , 100 ); } for (size_t i = 0 ; i < consumers.size (); i ++ ){ consumers[i] = std::thread (consumer, i + 1 ); } for (auto & t : producers) t.join (); for (auto & t : consumers) t.join (); return 0 ; }
3.3 消费者退出机制编写
在 3.2.3 虚假唤醒的解决 中,我们已经解决了 虚假唤醒 问题。但是,我们还存在一个问题:消费者一直都在消费数据,没有停止的时候。由于这个问题,我们的程序到最后也没有停止。
那么,如何写一个合理的消费者退出机制呢?一个比较愚蠢的办法就是,记录数据的数量,达到生产者生产数据的数量上限时,消费者线程函数中退出循环。但是这种写法的局限性比较高,如果生产者生产数据的代码有一点改动就得跟着一起修改。
我们同样可以借助条件变量来实现一个合理的消费者退出机制。
我们可以维护一个完成生产工作的生产者计数 produce_finished_count
,这个计数 需要互斥访问和修改 :
在生产者线程函数生产数据循环结束之后,生产者线程再次获得锁,然后修改 produce_finished_count
(produce_finished_count ++
)。如果修改之后,produce_finished_count
达到了生产者线程的总数,那么需要尝试唤醒阻塞状态中的消费者线程,通知它们不会再有新的数据了;
在消费者线程函数中,如果 buffer.empty() && produce_finished_count < PRODUCER_NUM
,表明缓冲区为空,但是还会有新的数据生产,因此这时消费者线程进入阻塞状态。
通常情况下,当 !buffer.empty()
成立时(有数据了),线程被唤醒,然后(可能)退出循环,消费者线程继续执行下去。但是,可能会在 produce_finished_count == PRODUCER_NUM
成立的时候,线程被通知唤醒然后退出循环。那么我们需要在判断一下是否是这种情况,如果是这种情况(缓冲区为空,且不会有新的数据再生产),那么消费者线程函数就从 while(true)
中 break
退出。
当然,在生产者线程函数中,当 produce_finished_count == PRODUCER_NUM
成立时去唤醒消费者线程也不一定表明就是完全没数据了,可能缓冲区还有一些数据,只不过不会有新数据再生产了。这个时候,消费者线程在之后判断 buffer.empty() && produce_finished_count == PRODUCER_NUM
是否成立时,buffer
并不为空,那么消费者线程会继续执行下去去消费缓冲区的数据。
我们可以将上面的退出机制实现带入现实场景中:甜点师今天工作量达到上限了,不会再制作新的草莓蛋糕了,我正在等待新的草莓蛋糕,但是轮到我的时候,甜点师告诉我已经没有且不再做了,那我只能遗憾地回家了,又是倒霉的一天。🍰😢
修改后的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 #include <iostream> #include <thread> #include <mutex> #include <atomic> #include <condition_variable> #include <queue> #include <vector> #include <chrono> std::mutex mtx; std::condition_variable cv; std::queue<int > buffer; const int BUFFER_SIZE = 10 ; const int PRODUCER_NUM = 2 ; const int CONSUMER_NUM = 5 ; int produce_finished_count; void producer (int id, int data_num) { for (int i = 1 ; i <= data_num; i ++ ){ std::unique_lock<std::mutex> lck (mtx) ; while (buffer.size () >= BUFFER_SIZE){ cv.wait (lck); } buffer.push (i); std::cout << "producer " << id << " produced data: " << i << "\n" ; cv.notify_all (); } { std::lock_guard<std::mutex> lck (mtx) ; produce_finished_count ++ ; if (produce_finished_count == PRODUCER_NUM){ cv.notify_all (); } } }void consumer (int id) { while (true ){ std::unique_lock<std::mutex> lck (mtx) ; while (buffer.empty () && produce_finished_count < PRODUCER_NUM){ cv.wait (lck); } if (buffer.empty () && produce_finished_count == PRODUCER_NUM){ break ; } int data = buffer.front (); buffer.pop (); std::cout << "consumer " << id << " consumed data: " << data << "\n" ; lck.unlock (); cv.notify_all (); std::this_thread::sleep_for (std::chrono::milliseconds (100 )); } }int main () { std::vector<std::thread> producers (PRODUCER_NUM) ; std::vector<std::thread> consumers (CONSUMER_NUM) ; for (size_t i = 0 ; i < producers.size (); i ++ ){ producers[i] = std::thread (producer, i + 1 , 100 ); } for (size_t i = 0 ; i < consumers.size (); i ++ ){ consumers[i] = std::thread (consumer, i + 1 ); } for (auto & t : producers) t.join (); for (auto & t : consumers) t.join (); return 0 ; }
可能的运行结果如下:
可以看出,生产者-消费者顺利的执行,并且最后数据全部消费完后程序正常地退出了。
还有一个小小的可以优化的地方就是,对完成生产工作的生产者计数 produce_finished_count
的互斥访问和修改只有简单的自加操作,那么可以使用 std::atomic
原子操作来实现对 produce_finished_count
互斥,使得程序更加高效。不过,暂时还没有写到有关 原子操作 的部分,所以暂时不展开了。😉
参考
std::condition_variable::wait
std::condition_variable::notify_one
std::condition_variable::notify_all