「C++ 多线程」生产者-消费者问题及 std::condition_variable

文章大图来源:pixiv_id=125244936

元旦快乐!🎉🤗

1 问题背景

1.1 生产者-消费者问题

1.1.1 定义

生产者-消费者问题(Producer - Consumer Problem)是一个经典的多线程并发编程问题。这个问题描述了在一个共享缓冲区环境下,两类线程(分别为 生产者消费者)之间的协作关系。

生产者:负责生成数据,并将数据放入共享缓冲区;

消费者:从共享缓冲区中取出数据,并进行一系列操作。

例如,在一个甜品店中,甜点师(生产者)制作甜点并且将其放在展示架(缓冲区),顾客(消费者)从展示架(缓冲区)上拿走甜点并购买消费。

1.1.2 问题约束

  1. 共享缓冲区通常有一个固定的大小;

  2. 同步问题:

    生产者和消费者的速度可能不同。生产者可能生产数据的速度比消费者消费数据快,导致缓冲区被填满,此时要等待消费者消费一些数据;反之,消费者可能消费数据的速度更快,缓冲区可能出现空的情况,那么消费者要等待生产者放入数据。

    • 当缓冲区已满时,生产者 需要等待 消费者 取出数据后,才能继续放入数据;

    • 当缓冲区为空时,消费者 需要等待 生产者 放入数据后,才能继续取出数据;

  3. 互斥问题:

    共享缓冲区同一时刻只能被一个线程访问。两类线程对共享缓冲区需要互斥访问。如果不进行互斥访问,会造成数据竞争、数据不一致等问题。

1.2 仅用互斥锁实现生产者-消费者

1.2.1 实现思路和代码

对于上面 1.1.1 生产者-消费者问题 定义 的问题,我们可以考虑使用 互斥锁 来确保生产者和消费者对共享缓冲区的互斥访问。

那么,怎么处理两类线程的同步问题呢?我们可以先考虑一种比较低效的方法:

  • 如果 生产者 线程获得锁之后,发现缓冲区已满,那么就先释放锁,休息一段时间。在休息的时间段内,消费者 线程有机会获得锁并消费数据;

  • 如果 消费者 线程获得锁之后,发现缓冲区为空,那么就先释放锁,休息一段时间。在休息的时间段内,生产者 线程有机会获得锁并生产数据。

由此,我们可以得到一个简易的 生产者-消费者 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>

std::mutex mtx; // 互斥量
std::queue<int> buffer; // 共享缓冲区
const int buffer_size = 10; // 缓冲区大小限制

// 生产者函数
void producer(int data_num){
for(int i = 1; i <= data_num; i ++ ){
std::unique_lock<std::mutex> lck(mtx);
if(buffer.size() < buffer_size){ // 可以放入数据
buffer.push(i);
std::cout << "Produced: " << i << "\n";
}else{ // 缓冲区已满
lck.unlock(); // 先解锁
// 休息一段时间再尝试
std::this_thread::sleep_for(std::chrono::milliseconds(100));
i -- ; // 回退
}
}
}

// 消费者函数
void consumer(){
while(true){
std::unique_lock<std::mutex> lck(mtx);
if(!buffer.empty()){ // 可以消费数据
int data = buffer.front();
buffer.pop();
std::cout << "Consumed: " << data << "\n";
lck.unlock(); // 提前解锁
// 模拟与 cv 无关的数据操作
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}else{
lck.unlock(); // 先解锁
// 休息一段时间再尝试
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}

int main(){
std::thread producer_thread(producer, 100);
std::thread consumer_thread(consumer);
producer_thread.join();
consumer_thread.join();

return 0;
}

可能的部分运行结果如下:

在上面的代码中,使用 std::queue 来模拟共享缓冲区,缓冲区的大小设定为 10

在生产者函数中,进行 data_num 次循环,每次循环生产一个数据。由于需要一些提前解锁然后休息的操作,因此采用了 std::unique_lock 更加灵活地处理加锁、解锁。如果缓冲区未满,那么可以放入数据;否则,先解锁,然后休息一段时间,使得消费者线程有机会获得锁。

在消费者函数中,消费者不断消费数据。也采用了 std::unique_lock 处理加锁、解锁。如果缓冲区不为空,那么可以取出数据,取出数据后,假设我们需要对数据进行一些处理等操作,并且这些操作不需要互斥量维护,那么可以先提前解锁,然后再进行对数据的一系列操作;否则,先解锁,然后休息一段时间,使得生产者线程有机会获得锁。

1.2.2 存在的问题

可以看出,“休息一段时间” 的做法显然是比较低效的,主要问题在于:

  • 忙等待:生产者和消费者在无法继续操作时通过休眠来等待缓冲区状态改变,很有可能会休眠很多次,这就导致了资源的浪费。

  • 缺少同步机制:生产者和消费者 不知道何时等待继续工作。缓冲区中有数据了,无法及时通知消费者可以消费数据;缓冲区有空缺了,也无法及时通知生产者可以放入数据。

为了解决这些问题,C++11 引入了 条件变量 std::condition_variable。

2 std::condition_variable

2.1 基本概念

std::condition_variable 是 C++11 引入的一个类,在头文件 <condition_variable> 中,主要用于 线程间的同步std::condition_variable 在多线程编程中实现线程之间的等待和通知机制有着重要作用。

2.2 成员函数和基本用法

2.2.1 wait()

std::condition_variable::wait() 函数是 std::condition_variable 中的一个关键成员函数,主要用于让线程 等待某个条件成立,通常配合互斥量(mutex)进和 std::unique_lock 使用,以实现线程的同步。

其基本的函数原型为:

1
2
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );

其中 lock 是互斥锁;pred 是一个 Predicate 类型,必须是一个 可调用对象(例如函数、lambda 表示、函数指针等)。

基本的使用方法如下:

1
2
3
4
5
6
7
8
9
// 条件变量 和 互斥量
std::condition_variable cv;
std::mutex mtx;

// 线程函数
void wait_foo(){
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, check()); // check() 为判断条件是否成立的函数
}

当一个线程获得锁之后,调用 wait() 函数时:

  • 如果 check() 函数判断条件为 true,那么线程会继续向下执行;

  • 如果 check() 函数判断条件为 false,那么线程会阻塞在 cv.wait(lck, check()); 这一行,进入等待,同时 线程会自动地暂时释放锁

    线程进入阻塞(等待)状态,直到判断条件变为 true 时,线程会被唤醒,并重新获得锁继续执行下去。

    而条件成立后,线程的唤醒需要配合另一个 std::condition_variable 中的函数—— std::condition_variable::notify_one()(或者 std::condition_variable::notify_all())。

如果 wait() 函数不加 可调用对象 这个参数,那么 std::condition_variable::wait() 默认条件为 false,线程会进入阻塞状态。

1
2
3
4
5
// 线程函数
void wait_foo(){
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck); // 不加可调用对象参数,默认为 false,进入阻塞状态
}

注意: 在使用 wait() 函数时,只能搭配 std::unique_lock 互斥锁使用,不能使用 std::lock_guardstd::condition_variable::wait() 需要在等待期间自动释放互斥量,然后在被唤醒后重新获取互斥量。但是 std::lock_guard 没有提供这样的功能。

2.2.2 notify_one()

std::condition_variable::notify_one() 主要用于当某个条件成立,通知 一个 处于阻塞状态下(等待该条件成立)的线程,进而唤醒这个线程。

以下是一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>

std::mutex mtx; // 互斥量
std::condition_variable cv; // 条件变量
bool flag; // 判断的条件

// 等待条件线程函数
void wait_foo(){
std::unique_lock<std::mutex> lck(mtx);
std::cout << "wait_foo(): wating..." << "\n";
cv.wait(lck, []{ return flag; });
std::cout << "wait_foo(): waiting ends, flag = " << flag << "\n";
}

// 通知唤醒线程函数
void notify_foo(){
// 第一次尝试唤醒。但是不满足条件,唤醒失败
std::this_thread::sleep_for(std::chrono::seconds(2));
{
std::lock_guard<std::mutex> lck(mtx);
std::cout << "notify_foo(): notifying..." << "\n";
}
cv.notify_one(); // 作用域结束解锁后再唤醒更好

// 第二次尝试唤醒。满足条件,成功唤醒。
std::this_thread::sleep_for(std::chrono::seconds(2));
{
std::lock_guard<std::mutex> lck(mtx);
flag = true;
std::cout << "notify_foo(): notifying again..." << "\n";
}
cv.notify_one();
}

int main(){
std::thread wait_t(wait_foo);
std::thread notify_t(notify_foo);
wait_t.join();
notify_t.join();

return 0;
}

在上面的代码中,定义了一个等待条件的线程函数 wait_foo() 和一个通知唤醒的线程函数 notify_foo()。我们创建两个线程分别执行这两个函数,一个线程 wait_t 等待条件成立,一个线程 notify_t 用于唤醒这个等待条件成立的线程。

wait_foo() 中,先尝试获得锁,获得锁之后,判断条件 flag 是否为 true,显然一开始 flagfalse,所以一开始 wait_t 会进入阻塞状态,同时暂时释放锁。

notify_foo() 中,一开始先尝试唤醒,此时条件 flag 没有进行改变,所以这次唤醒是失败的;然后尝试进行第二次唤醒,这次修改条件 flagtrue,之后通知唤醒阻塞状态的线程 wait_t

处于阻塞态的线程 wait_t 在线程 notify_t 第二次调用 std::condition_variable::notify_one() 函数后(此时条件被修改为 true),成功被唤醒,重新获得锁,然后继续执行下去。

可能的运行结果如下:

2.2.3 notify_all()

std::condition_variable::notify_one() 不同的是,std::condition_variable::notify_all() 唤醒的是 所有 处于阻塞状态下(等待某个条件成立)。

在线程较多的情况下,std::condition_variable::notify_all() 通常比较高效。

但是,std::condition_variable::notify_all() 在有多种进行判断条件是否成立的线程时,会出现部分唤醒没有用的情况:

例如,在多生产者 - 多消费者模型中,当一个生产者调用 std::condition_variable::notify_all() 唤醒所有等待线程时,可能会出现唤醒其他一些处于阻塞态的生产者线程,那么这些唤醒可能暂时是没有用的。

不过这种影响比较小。当一类线程只有一个时,我们考虑使用 notify_one()(每次唤醒一个处于阻塞态的线程)进行唤醒;当一类线程有很多个时,我们考虑使用 notify_all()

3 使用 condition_variable 实现生产者-消费者

3.1 实现思路和代码

2 std::condition_variable 中,已经介绍了 std::condition_variable 的基本用法。我们可以通过这个条件变量来实现 生产者-消费者 问题中的同步机制。

在生产者函数 producer 中,每次生产并向缓冲区放入数据前,需要等待条件 buffer.size() < BUFFER_SIZE 成立。条件成立时,线程继续执行下去。缓冲区中已经有数据了,此时生产者线程需要尝试唤醒消费者线程。

在消费者函数 consumer 中,每次消费一个数据之前,需要等待条件 !buffer.empty() 成立。 条件成立后继续执行。此时缓冲区出现空位,消费者线程需要唤醒生产者线程。

实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>

std::mutex mtx; // 互斥量
std::condition_variable cv; // 条件变量
std::queue<int> buffer; // 共享缓冲区
const int BUFFER_SIZE = 10; // 缓冲区大小

// 生产者函数
void producer(int data_num){
for(int i = 1; i <= data_num; i ++ ){
std::unique_lock<std::mutex> lck(mtx);
// 等待 buffer 有空间可放入数据
cv.wait(lck, [&](){ return buffer.size() < BUFFER_SIZE; });
buffer.push(i);
std::cout << "Produced: " << i << "\n";
cv.notify_one(); // 唤醒等待状态的消费者线程
}
}

// 消费者函数
void consumer(){
while(true){
std::unique_lock<std::mutex> lck(mtx);
// 等待 buffer 中有数据可消费
cv.wait(lck, [&](){ return !buffer.empty(); });
int data = buffer.front();
buffer.pop();
std::cout << "Consumed: " << data << "\n";
lck.unlock(); // 提前解锁
cv.notify_one(); // 唤醒等待状态的生产者线程
// 模拟与 cv 无关的数据操作
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}

int main(){
std::thread producer_thread(producer, 100);
std::thread consumer_thread(consumer);
producer_thread.join();
consumer_thread.join();

return 0;
}

可能的运行结果如下:

在上面的代码中,因为只有一个生产者和消费者,所以暂时只要用 notify_one() 就好了。

3.2 多生产者、消费者问题

3.2.1 多生产者、消费者实现

3.1 condition_variable 实现消费者-生产者代码 中,我们只有一个生产者和一个消费者。

但实际场景中往往会有多个生产者和消费者,例如 1.1.1 生产者-消费者问题定义 中的甜点师-顾客的例子,一个店铺里可能有多个甜点师在制作,会有很多个顾客光临这个甜品店。

我们可以修改 3.1 生产者-消费者实现代码 中的代码,创建多个生产者线程和多个消费者线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>

std::mutex mtx; // 互斥量
std::condition_variable cv; // 条件变量
std::queue<int> buffer; // 共享缓冲区
const int BUFFER_SIZE = 10; // 缓冲区大小
const int PRODUCER_NUM = 2; // 生产者个数
const int CONSUMER_NUM = 5; // 消费者个数

// 生产者函数
void producer(int id, int data_num){
for(int i = 1; i <= data_num; i ++ ){
std::unique_lock<std::mutex> lck(mtx);
// 等待 buffer 有空间可放入数据
cv.wait(lck, [&](){ return buffer.size() < BUFFER_SIZE; });
buffer.push(i);
std::cout << "producer " << id << " produced data: " << i << "\n";
cv.notify_all(); // 唤醒等待状态的消费者线程
}
}

// 消费者函数
void consumer(int id){
while(true){
std::unique_lock<std::mutex> lck(mtx);
// 等待 buffer 中有数据可消费
cv.wait(lck, [&](){ return !buffer.empty(); });
int data = buffer.front();
buffer.pop();
std::cout << "consumer " << id << " consumed data: " << data << "\n";
lck.unlock(); // 提前解锁
cv.notify_all(); // 唤醒等待状态的生产者线程
// 模拟与 cv 无关的数据操作,放在解锁和通知之后
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}

int main(){
std::vector<std::thread> producers(PRODUCER_NUM);
std::vector<std::thread> consumers(CONSUMER_NUM);
for(size_t i = 0; i < producers.size(); i ++ ){
producers[i] = std::thread(producer, i + 1, 100);
}
for(size_t i = 0; i < consumers.size(); i ++ ){
consumers[i] = std::thread(consumer, i + 1);
}
for(auto& t : producers) t.join();
for(auto& t : consumers) t.join();

return 0;
}

在上面的代码中,定义了一个生产者个数和一个消费者个数,在线程接口函数中传递了线程编号标记的参数 id(便于表示不同的生产者/消费者)。

由于现在是多个生产者线程和多个消费者线程,因此将 notify_one() 改为了 notify_all()。其他部分代码大致相同。

可能的运行结果如下:

看似运行起来好像没有什么毛病,但实际上在 多生产者线程、多消费者线程 情况下,可能会出现 虚假唤醒 的问题。

3.2.2 虚假唤醒问题

虚假唤醒 是指线程从等待条件变量 std::condition_variable::wait() 函数中唤醒,但实际上等待的条件并没有真正满足。通常 虚假唤醒 的问题是由于多线程环境下 系统调度、信号传输 等原因导致的。

例如,缓冲区为空,当一个生产者线程放入一个数据后(放入后缓冲区就这一个数据),通知唤醒了一个等待的消费者线程的时候,有另一个消费者线程消费了这个唯一的数据。这是线程调度的不确定性所导致的。

此时,这个消费者线程收到被唤醒的通知,表面上条件成立了,但是实际上 buffer 还是为空。这就出现了 虚假唤醒

在现实场景中,我想要买草莓蛋糕但是甜点师正在制作中,等待了一会,甜点师通知我展示架上有新的草莓蛋糕了,我正准备去拿的时候却被另一个人给抢先了。这正是太倒霉了!🍓🍰

虚假唤醒还有很多其他的情况,总之类似上述的条件没有实际满足的情况都属于虚假唤醒。

3.2.3 虚假唤醒的解决

我们可以通过线程被唤醒后,再次检查是否满足条件成立。再次检查条件如果发现不成立,那么线程需要又一次进入阻塞状态,因此我们需要实现一个 循环检查

2.2.1 std::condition_variable::wait() 中,如果 wait() 函数不接受 可调用对象 的参数,函数默认条件为 false,线程会进入阻塞状态。

在生产者线程函数中,我们可以写一个 while 循环,循环判断的条件为 buffer.size() >= BUFFER_SIZE,如果条件满足,那么执行 cv.wait(lck),使得生产者线程进入阻塞状态,同时暂时释放互斥锁。

生产者线程卡在 cv.wait(lck); 这一行,如果某个消费者线程唤醒了生产者线程,那么会再次进入 while 循环进行条件判断。如果此时 buffer.size() < BUFFER_SIZE,那么会退出循环,然后生产者线程继续执行下去;否则,生产者线程会再次陷入阻塞状态,同时暂时释放互斥锁。

这样就通过 while 循环实现了再次检查以及循环检查。类似的,消费者线程中也可以写一个 while 循环,确保消费者线程不会被虚假唤醒。

修改后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>

std::mutex mtx; // 互斥量
std::condition_variable cv; // 条件变量
std::queue<int> buffer; // 共享缓冲区
const int BUFFER_SIZE = 10; // 缓冲区大小
const int PRODUCER_NUM = 2; // 生产者个数
const int CONSUMER_NUM = 5; // 消费者个数

// 生产者函数
void producer(int id, int data_num){
for(int i = 1; i <= data_num; i ++ ){
std::unique_lock<std::mutex> lck(mtx);
// 等待 buffer 有空间可放入数据(循环检查,防止虚假唤醒)
while(buffer.size() >= BUFFER_SIZE){
cv.wait(lck); // 默认 false,解锁并进入等待
}
buffer.push(i);
std::cout << "producer " << id << " produced data: " << i << "\n";
cv.notify_all(); // 唤醒等待状态的消费者线程
}
}

// 消费者函数
void consumer(int id){
while(true){
std::unique_lock<std::mutex> lck(mtx);
// 等待 buffer 中有数据可消费(循环检查,防止虚假唤醒)
while(buffer.empty()){
cv.wait(lck); // 默认 false,解锁并进入等待
}
int data = buffer.front();
buffer.pop();
std::cout << "consumer " << id << " consumed data: " << data << "\n";
lck.unlock(); // 提前解锁
cv.notify_all(); // 唤醒等待状态的生产者线程
// 模拟与 cv 无关的数据操作,放在解锁和通知之后
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}

int main(){
std::vector<std::thread> producers(PRODUCER_NUM);
std::vector<std::thread> consumers(CONSUMER_NUM);
for(size_t i = 0; i < producers.size(); i ++ ){
producers[i] = std::thread(producer, i + 1, 100);
}
for(size_t i = 0; i < consumers.size(); i ++ ){
consumers[i] = std::thread(consumer, i + 1);
}
for(auto& t : producers) t.join();
for(auto& t : consumers) t.join();

return 0;
}

3.3 消费者退出机制编写

3.2.3 虚假唤醒的解决 中,我们已经解决了 虚假唤醒 问题。但是,我们还存在一个问题:消费者一直都在消费数据,没有停止的时候。由于这个问题,我们的程序到最后也没有停止。

那么,如何写一个合理的消费者退出机制呢?一个比较愚蠢的办法就是,记录数据的数量,达到生产者生产数据的数量上限时,消费者线程函数中退出循环。但是这种写法的局限性比较高,如果生产者生产数据的代码有一点改动就得跟着一起修改。

我们同样可以借助条件变量来实现一个合理的消费者退出机制。

我们可以维护一个完成生产工作的生产者计数 produce_finished_count,这个计数 需要互斥访问和修改

  • 在生产者线程函数生产数据循环结束之后,生产者线程再次获得锁,然后修改 produce_finished_countproduce_finished_count ++ )。如果修改之后,produce_finished_count 达到了生产者线程的总数,那么需要尝试唤醒阻塞状态中的消费者线程,通知它们不会再有新的数据了;

  • 在消费者线程函数中,如果 buffer.empty() && produce_finished_count < PRODUCER_NUM,表明缓冲区为空,但是还会有新的数据生产,因此这时消费者线程进入阻塞状态。

    通常情况下,当 !buffer.empty() 成立时(有数据了),线程被唤醒,然后(可能)退出循环,消费者线程继续执行下去。但是,可能会在 produce_finished_count == PRODUCER_NUM 成立的时候,线程被通知唤醒然后退出循环。那么我们需要在判断一下是否是这种情况,如果是这种情况(缓冲区为空,且不会有新的数据再生产),那么消费者线程函数就从 while(true)break 退出。

    当然,在生产者线程函数中,当 produce_finished_count == PRODUCER_NUM 成立时去唤醒消费者线程也不一定表明就是完全没数据了,可能缓冲区还有一些数据,只不过不会有新数据再生产了。这个时候,消费者线程在之后判断 buffer.empty() && produce_finished_count == PRODUCER_NUM 是否成立时,buffer 并不为空,那么消费者线程会继续执行下去去消费缓冲区的数据。

我们可以将上面的退出机制实现带入现实场景中:甜点师今天工作量达到上限了,不会再制作新的草莓蛋糕了,我正在等待新的草莓蛋糕,但是轮到我的时候,甜点师告诉我已经没有且不再做了,那我只能遗憾地回家了,又是倒霉的一天。🍰😢

修改后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <queue>
#include <vector>
#include <chrono>

std::mutex mtx; // 互斥量
std::condition_variable cv; // 条件变量
std::queue<int> buffer; // 共享缓冲区
const int BUFFER_SIZE = 10; // 缓冲区大小
const int PRODUCER_NUM = 2; // 生产者个数
const int CONSUMER_NUM = 5; // 消费者个数
int produce_finished_count; // 生产者完成计数
// std::atomic<int> produce_finished_count; // 也可以用原子操作

// 生产者函数
void producer(int id, int data_num){
for(int i = 1; i <= data_num; i ++ ){
std::unique_lock<std::mutex> lck(mtx);
// 等待 buffer 有空间可放入数据(循环检查,防止虚假唤醒)
while(buffer.size() >= BUFFER_SIZE){
cv.wait(lck); // 默认 false,解锁并进入等待
}
buffer.push(i);
std::cout << "producer " << id << " produced data: " << i << "\n";
cv.notify_all(); // 唤醒等待状态的消费者线程
}
{
// 当前生产者生成数据完毕,修改完成数据生成的生产者记数
std::lock_guard<std::mutex> lck(mtx);
produce_finished_count ++ ;
if(produce_finished_count == PRODUCER_NUM){
cv.notify_all(); // 唤醒等待的消费者线程
}
}
}

// 消费者函数
void consumer(int id){
while(true){
std::unique_lock<std::mutex> lck(mtx);
// 等待 buffer 中有数据可消费(循环检查,防止虚假唤醒)
while(buffer.empty() && produce_finished_count < PRODUCER_NUM){
cv.wait(lck); // 默认 false,解锁并进入等待
}
// 上面的检查中,可能因为 produce_finished_count == PRODUCER_NUM 而退出
// buffer 可能为空,说明没有新的数据再生产,这个时候从 while(true) 退出
// buffer 也可能还有数据,消费者继续执行下去消费数据。
if(buffer.empty() && produce_finished_count == PRODUCER_NUM){
break; // 退出
}
int data = buffer.front();
buffer.pop();
std::cout << "consumer " << id << " consumed data: " << data << "\n";
lck.unlock(); // 提前解锁
cv.notify_all(); // 唤醒等待状态的生产者线程
// 模拟与 cv 无关的、无需互斥量的数据操作,放在解锁和唤醒之后
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}

int main(){
std::vector<std::thread> producers(PRODUCER_NUM);
std::vector<std::thread> consumers(CONSUMER_NUM);
for(size_t i = 0; i < producers.size(); i ++ ){
producers[i] = std::thread(producer, i + 1, 100);
}
for(size_t i = 0; i < consumers.size(); i ++ ){
consumers[i] = std::thread(consumer, i + 1);
}
for(auto& t : producers) t.join();
for(auto& t : consumers) t.join();

return 0;
}

可能的运行结果如下:

可以看出,生产者-消费者顺利的执行,并且最后数据全部消费完后程序正常地退出了。

还有一个小小的可以优化的地方就是,对完成生产工作的生产者计数 produce_finished_count 的互斥访问和修改只有简单的自加操作,那么可以使用 std::atomic 原子操作来实现对 produce_finished_count 互斥,使得程序更加高效。不过,暂时还没有写到有关 原子操作 的部分,所以暂时不展开了。😉

参考

  1. std::condition_variable::wait

  2. std::condition_variable::notify_one

  3. std::condition_variable::notify_all


「C++ 多线程」生产者-消费者问题及 std::condition_variable
https://marisamagic.github.io/2025/01/01/20250101/
作者
MarisaMagic
发布于
2025年1月1日
许可协议