「Linux 系统编程」条件变量与生产者-消费者模型

1 条件变量

1.1 条件变量概述

在多线程编程中,条件变量(Condition Variable)是一种重要的同步机制,它允许线程在某个条件不满足时进入等待状态当条件满足时再由其他线程唤醒

条件变量本质上就是 “条件等待 + 通知” 的一个机制结合互斥量(Mutex)使用,条件变量能够高效地解决复杂的线程同步问题,特别是在生产者-消费者模型中。


1.2 Linux C 条件变量相关函数

  • 条件变量初始化与销毁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    #include <pthread.h>

    // 静态初始化条件变量
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

    // 动态初始化条件变量
    pthread_cond_init(&cond, NULL);

    // 条件变量销毁(静态初始化通常不用显式销毁)
    pthread_cond_destroy(&cond);
  • 等待条件满足

    1
    2
    3
    4
    5
    6
    // 等待条件变量,同时释放互斥锁
    int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

    // 带超时的等待
    int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
    const struct timespec *abstime);
    1. 阻塞等待条件变量满足
    2. 解锁已经加锁成功的信号量 (相当于 pthread_mutex_unlock(&mutex))。12两步为一个原子操作
    3. 当条件满足,函数返回时,解除阻塞并重新申请获取互斥锁。重新加锁信号量(相当于 pthread_mutex_lock(&mutex))
  • 唤醒等待的线程

    1
    2
    3
    4
    5
    // 唤醒至少一个等待该条件变量的线程
    int pthread_cond_signal(pthread_cond_t *cond);

    // 唤醒所有等待该条件变量的线程
    int pthread_cond_broadcast(pthread_cond_t *cond);


2 条件变量的生产者-消费者模型

2.1 实现代码

生产者-消费者模型是一个经典的多线程同步问题,其中生产者生成数据,消费者处理数据,两者通过共享缓冲区进行通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#define BUFFER_SIZE 10

int buffer[BUFFER_SIZE];
int count = 0; // 统计缓冲区内数量
int in = 0, out = 0; // 记录缓冲区取数据和放数据位置

pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;


void* producer(void* arg) {
int i, item;
for (i = 0; i < 30; i ++ ) {
item = i;

// 生产者加锁
pthread_mutex_lock(&mtx);

// 缓冲区已满,生产者阻塞等待,并暂时释放锁
if (count == BUFFER_SIZE) {
pthread_cond_wait(&not_full, &mtx);
}

// 条件满足,生产者重新获得锁,并执行后续放入缓冲区操作
buffer[in] = item;
in = (in + 1) % BUFFER_SIZE;
count ++ ;

printf("生产者生产: %d, 缓冲区数量: %d\n", item, count);

// 生产者释放锁
pthread_mutex_unlock(&mtx);

// 通知缓冲区已经有数据,唤醒阻塞等待的消费者
pthread_cond_signal(&not_empty);

// 模拟生产过程
usleep(10000);
}
return NULL;
}

void* consumer(void* arg) {
int i, item;
for (int i = 0; i < 30; i ++ ) {
item = i;

// 消费者加锁
pthread_mutex_lock(&mtx);

// 缓冲区无数据,消费者阻塞等待,并暂时释放锁
if (count == 0) {
pthread_cond_wait(&not_empty, &mtx);
}

// 条件满足,消费者重新获得锁,并执行后续缓冲区取数据操作
item = buffer[out];
out = (out + 1) % BUFFER_SIZE;
count -- ;

printf("消费者消费: %d, 缓冲区数量: %d\n", item, count);

// 消费者释放锁
pthread_mutex_unlock(&mtx);

// 通知缓冲区未满,唤醒阻塞等待的生产者
pthread_cond_signal(&not_full);

// 模拟消费过程
usleep(15000);
}
return NULL;
}

int main() {
pthread_t pro_t, con_t;
pthread_create(&pro_t, NULL, producer, NULL);
pthread_create(&con_t, NULL, consumer, NULL);

pthread_join(pro_t, NULL);
pthread_join(con_t, NULL);

return 0;
}


2.2 条件变量状态变化分析

在单生产者-单消费者模型中,两个条件变量 not_fullnot_empty 的状态变化可以用二进制状态(0和1)来简化理解。

需要明确的是,条件变量本身并不存储状态信息。它们只是线程同步的机制,用于:

  1. 让线程在某个条件不满足时进入等待状态
  2. 当条件可能满足时唤醒等待的线程

真正的状态信息存储在共享变量(如 count)中,条件变量只是基于这些共享变量来控制线程的等待和唤醒。


可以将条件变量关联的条件抽象为二进制状态:

  • not_full 关联的条件:count < BUFFER_SIZE (缓冲区未满)
  • not_empty 关联的条件:count > 0 (缓冲区非空)

状态变化过程分析

  • 初始状态

    • count = 0 (缓冲区为空)
    • not_full 状态: 1 (真,因为 0 < BUFFER_SIZE)
    • not_empty 状态: 0 (假,因为 0 == 0)
  • 生产者生产一个项目

    1. 生产者获取互斥锁
    2. 检查 count == BUFFER_SIZE? 否,所以不等待
    3. 将项目放入缓冲区,count++
    4. 现在 count = 1
    5. 发送 not_empty 信号
    6. 释放互斥锁

    状态变化:

    • not_full 状态: 1 (真,因为 1 < BUFFER_SIZE)
    • not_empty 状态: 1 (真,因为 1 > 0)
  • 消费者消费一个项目

    1. 消费者获取互斥锁
    2. 检查 count == 0? 否,所以不等待
    3. 从缓冲区取出项目,count--
    4. 现在 count = 0
    5. 发送 not_full 信号
    6. 释放互斥锁

    状态变化:

    • not_full 状态: 1 (真,因为 0 < BUFFER_SIZE)
    • not_empty 状态: 0 (假,因为 0 == 0)
  • 缓冲区满时生产者尝试生产

    1. 假设缓冲区已满 (count == BUFFER_SIZE)
    2. 生产者获取互斥锁
    3. 检查 count == BUFFER_SIZE? 是,所以等待 not_full
    4. 生产者线程阻塞,释放互斥锁

    状态变化:

    • not_full 状态: 0 (假,因为 count == BUFFER_SIZE)
    • not_empty 状态: 1 (真,因为 count > 0)
  • 消费者消费后唤醒生产者

    1. 消费者获取互斥锁
    2. 消费一个项目,count--
    3. 现在 count = BUFFER_SIZE - 1
    4. 发送 not_full 信号,唤醒等待的生产者
    5. 释放互斥锁

    状态变化:

    • not_full 状态: 1 (真,因为 count < BUFFER_SIZE)
    • not_empty 状态: 1 (真,因为 count > 0)
  • 缓冲区空时消费者者尝试消费

    1. 假设缓冲区为空 (count == 0)
    2. 消费者获取互斥锁
    3. 检查 count == 0? 是,所以等待 not_empty
    4. 消费者线程阻塞,释放互斥锁

    状态变化:

    • not_full 状态: 1 (真,因为 count < BUFFER_SIZE)
    • not_empty 状态: 0 (假,因为 count == 0)

条件变量特点

  1. 条件变量不是状态存储机制:条件变量本身不存储状态,它们只是基于共享变量(如 count)来控制线程的等待和唤醒。

  2. 二元状态是抽象的:将条件变量视为具有二元状态是一种简化理解的方式,实际上它们反映的是共享变量的状态。

  3. 条件变量总是成对出现:在生产者和消费者模型中,通常需要两个条件变量,一个用于生产者等待(缓冲区不满),一个用于消费者等待(缓冲区不空)。

  4. 状态转换是同步的:条件变量的状态变化总是与共享变量的修改同步发生,并通过互斥锁保护。

  5. 唤醒操作可能没有等待者:发送条件变量信号时,如果没有线程在等待该条件,信号会被忽略,不会产生任何效果。



3 条件变量实现多生产者-多消费者

3.1 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#define BUFFER_SIZE 10
#define PRODUCER_NUM 2
#define CONSUMER_NUM 4

int buffer[BUFFER_SIZE];
int count = 0; // 统计缓冲区内数量
int in = 0, out = 0; // 记录缓冲区取数据和放数据位置

pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;


void* producer(void* arg) {
int producer_id = *(int*)arg;
int i, item;
for (i = 0; i < 20; i ++ ) {
item = i;

// 生产者加锁
pthread_mutex_lock(&mtx);

// 缓冲区已满,生产者阻塞等待,并暂时释放锁
while (count == BUFFER_SIZE) { // 循环检查,防止虚假唤醒
pthread_cond_wait(&not_full, &mtx);
}

// 条件满足,生产者重新获得锁,并执行后续放入缓冲区操作
buffer[in] = item;
in = (in + 1) % BUFFER_SIZE;
count ++ ;

printf("生产者 %d 生产: %d, 缓冲区数量: %d\n", producer_id, item, count);

// 生产者释放锁
pthread_mutex_unlock(&mtx);

// 通知缓冲区已经有数据,唤醒阻塞等待的消费者
pthread_cond_broadcast(&not_empty);

// 模拟生产过程
usleep(10000);
}
return NULL;
}

void* consumer(void* arg) {
int consumer_id = *(int*)arg;
int i, item;
for (int i = 0; i < 10; i ++ ) {
item = i;

// 消费者加锁
pthread_mutex_lock(&mtx);

// 缓冲区无数据,消费者阻塞等待,并暂时释放锁
while (count == 0) { // 循环检查,防止虚假唤醒
pthread_cond_wait(&not_empty, &mtx);
}

// 条件满足,消费者重新获得锁,并执行后续缓冲区取数据操作
item = buffer[out];
out = (out + 1) % BUFFER_SIZE;
count -- ;

printf("消费者 %d 消费: %d, 缓冲区数量: %d\n", consumer_id, item, count);

// 消费者释放锁
pthread_mutex_unlock(&mtx);

// 通知缓冲区未满,唤醒阻塞等待的生产者
pthread_cond_broadcast(&not_full);

// 模拟消费过程
usleep(15000);
}
return NULL;
}

int main() {
pthread_t pro_t[PRODUCER_NUM];
pthread_t con_t[CONSUMER_NUM];
int pro_id[PRODUCER_NUM];
int con_id[CONSUMER_NUM];

for (int i = 0; i < PRODUCER_NUM; i ++ ) {
pro_id[i] = i + 1;
pthread_create(&pro_t[i], NULL, producer, &pro_id[i]);
}
for (int i = 0; i < CONSUMER_NUM; i ++ ) {
con_id[i] = i + 1;
pthread_create(&con_t[i], NULL, consumer, &con_id[i]);
}

for (int i = 0; i < PRODUCER_NUM; i ++ ) {
pthread_join(pro_t[i], NULL);
}
for (int i = 0; i < CONSUMER_NUM; i ++ ) {
pthread_join(con_t[i], NULL);
}

return 0;
}


3.2 虚假唤醒

虚假唤醒(Spurious Wakeup)是多线程编程中一个重要的概念,指的是等待在条件变量上的线程 在没有收到明确信号的情况下被意外唤醒 的现象。通常 虚假唤醒 的问题是由于多线程环境下 系统调度、信号传输 等原因导致的。

这是 POSIX 标准允许的行为,主要出于以下原因:

  1. 系统调度/中断/信号干扰(历史原因)

    早期 Unix 系统中,当线程被信号(signal,如 SIGUSR1)中断时,可能从 wait 中提前返回。虽然现代系统大多已修复,但标准仍保留此行为以兼容。

  2. 多核 CPU 的内存一致性/缓存同步问题

    在 SMP(对称多处理器)系统中,不同 CPU 核心的缓存状态可能暂时不一致,导致条件判断出现“幻读”,线程误以为条件满足而醒来。

  3. 操作系统实现的权衡

    强制要求“绝对不虚假唤醒”会增加系统开销(如加更多锁、更多内存屏障)。POSIX 选择允许虚假唤醒,把正确性责任交给程序员 —— 你必须用 while 重检查条件。


POSIX 标准明确允许虚假唤醒的发生,但需要避免虚假唤醒的发生。通常使用while循环而不是if语句来检查条件:

1
2
3
4
5
6
pthread_mutex_lock(&mutex);
while (condition_is_false) { // 循环检查
pthread_cond_wait(&cond, &mutex);
}
// 处理条件满足的情况
pthread_mutex_unlock(&mutex);

使用 while 循环 防止虚假唤醒(spurious wakeup),确保被 唤醒后再次检查条件是否满足


为什么while循环能解决虚假唤醒问题

  • 重新检查条件:当线程从 pthread_cond_wait() 返回时,它会重新获取互斥锁,然后 while 循环会再次检查条件。

  • 确保条件真正满足:如果发生虚假唤醒,条件仍然不满足,线程会再次进入等待状态

虚假唤醒示意图

这种方法确保了线程只有在条件真正满足时才会继续执行。


3.3 消费者退出机制

如果生产者生产的数据数量达不到消费者的总取数据数量,那么在所有生产者执行完毕后,剩余的没有执行完的消费者线程会一直等待下去。

因此,可以考虑添加一个消费者的退出机制:当没有生产者会再生产数据时,剩余的消费者直接退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#define BUFFER_SIZE 10
#define PRODUCER_NUM 3
#define CONSUMER_NUM 5

int buffer[BUFFER_SIZE];
int count = 0; // 统计缓冲区内数量
int in = 0, out = 0; // 记录缓冲区取数据和放数据位置
int production_done = 0; // 标志:生产者是否完成

pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;

void* producer(void* arg) {
int producer_id = *(int*)arg;
int i, item;
for (i = 0; i < 10; i ++ ) {
item = i;

pthread_mutex_lock(&mtx);

while (count == BUFFER_SIZE) {
pthread_cond_wait(&not_full, &mtx);
}

buffer[in] = item;
in = (in + 1) % BUFFER_SIZE;
count ++ ;

printf("生产者 %d 生产: %d, 缓冲区数量: %d\n", producer_id, item, count);

pthread_mutex_unlock(&mtx);
pthread_cond_broadcast(&not_empty);
usleep(10000);
}
return NULL;
}

void* consumer(void* arg) {
int consumer_id = *(int*)arg;
int item;
for (int i = 0; i < 10; i ++ ) {
pthread_mutex_lock(&mtx);

// 等待条件:缓冲区不为空或生产者已完成且缓冲区为空(则退出)
while (count == 0 && !production_done) {
pthread_cond_wait(&not_empty, &mtx);
}

// 如果缓冲区为空且生产者已完成,则退出循环
if (count == 0 && production_done) {
pthread_mutex_unlock(&mtx);
break;
}

item = buffer[out];
out = (out + 1) % BUFFER_SIZE;
count -- ;

printf("消费者 %d 消费: %d, 缓冲区数量: %d\n", consumer_id, item, count);

pthread_mutex_unlock(&mtx);
pthread_cond_broadcast(&not_full);
usleep(15000);
}
return NULL;
}

int main() {
pthread_t pro_t[PRODUCER_NUM];
pthread_t con_t[CONSUMER_NUM];
int pro_id[PRODUCER_NUM];
int con_id[CONSUMER_NUM];

for (int i = 0; i < PRODUCER_NUM; i ++ ) {
pro_id[i] = i + 1;
pthread_create(&pro_t[i], NULL, producer, &pro_id[i]);
}
for (int i = 0; i < CONSUMER_NUM; i ++ ) {
con_id[i] = i + 1;
pthread_create(&con_t[i], NULL, consumer, &con_id[i]);
}

// 等待所有生产者完成
for (int i = 0; i < PRODUCER_NUM; i ++ ) {
pthread_join(pro_t[i], NULL);
}

// 设置生产者完成标志
pthread_mutex_lock(&mtx);
production_done = 1;
pthread_mutex_unlock(&mtx);
pthread_cond_broadcast(&not_empty); // 唤醒所有等待的消费者

// 等待所有消费者完成
for (int i = 0; i < CONSUMER_NUM; i ++ ) {
pthread_join(con_t[i], NULL);
}

return 0;
}
  1. 添加 production_done 标志:这是一个共享变量,用于指示所有生产者已经完成。

  2. 消费者检查终止条件:在消费者中,等待条件改为 while (count == 0 && !production_done)。如果缓冲区为空且生产者已完成,消费者则退出循环。

  3. 主线程设置标志:在主线程中,等待所有生产者完成后,设置 production_done 为1,并广播 not_empty 条件以唤醒所有等待的消费者,让它们检查终止条件。

这样,即使消费者尝试消费的次数多于生产次数,它们也会在生产者结束后正常退出,避免程序卡住。



4 C++ 多线程 条件变量

可见 「C++ 多线程」生产者-消费者问题及 std::condition_variable


「Linux 系统编程」条件变量与生产者-消费者模型
https://marisamagic.github.io/2025/09/20/20250920/
作者
MarisaMagic
发布于
2025年9月20日
许可协议