1 条件变量
1.1 条件变量概述
在多线程编程中,条件变量 (Condition Variable)是一种重要的同步机制,它允许线程在某个条件不满足时进入等待状态 ,当条件满足时再由其他线程唤醒 。
条件变量本质上就是 “条件等待 + 通知” 的一个机制 ,结合互斥量(Mutex)使用 ,条件变量能够高效地解决复杂的线程同步问题,特别是在生产者-消费者模型中。
1.2 Linux C 条件变量相关函数
条件变量初始化与销毁
1 2 3 4 5 6 7 8 9 10 #include <pthread.h> pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_cond_init(&cond, NULL ); pthread_cond_destroy(&cond);
等待条件满足
1 2 3 4 5 6 int pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex) ;int pthread_cond_timedwait (pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime) ;
阻塞等待条件变量满足
解锁已经加锁成功的信号量 (相当于 pthread_mutex_unlock(&mutex))。12两步为一个原子操作
当条件满足,函数返回时,解除阻塞并重新申请获取互斥锁。重新加锁信号量(相当于 pthread_mutex_lock(&mutex))
唤醒等待的线程
1 2 3 4 5 int pthread_cond_signal (pthread_cond_t *cond) ;int pthread_cond_broadcast (pthread_cond_t *cond) ;
2 条件变量的生产者-消费者模型
2.1 实现代码
生产者-消费者模型是一个经典的多线程同步问题,其中生产者生成数据,消费者处理数据,两者通过共享缓冲区进行通信。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #define BUFFER_SIZE 10 int buffer[BUFFER_SIZE];int count = 0 ; int in = 0 , out = 0 ; pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;void * producer (void * arg) { int i, item; for (i = 0 ; i < 30 ; i ++ ) { item = i; pthread_mutex_lock(&mtx); if (count == BUFFER_SIZE) { pthread_cond_wait(¬_full, &mtx); } buffer[in] = item; in = (in + 1 ) % BUFFER_SIZE; count ++ ; printf ("生产者生产: %d, 缓冲区数量: %d\n" , item, count); pthread_mutex_unlock(&mtx); pthread_cond_signal(¬_empty); usleep(10000 ); } return NULL ; }void * consumer (void * arg) { int i, item; for (int i = 0 ; i < 30 ; i ++ ) { item = i; pthread_mutex_lock(&mtx); if (count == 0 ) { pthread_cond_wait(¬_empty, &mtx); } item = buffer[out]; out = (out + 1 ) % BUFFER_SIZE; count -- ; printf ("消费者消费: %d, 缓冲区数量: %d\n" , item, count); pthread_mutex_unlock(&mtx); pthread_cond_signal(¬_full); usleep(15000 ); } return NULL ; }int main () { pthread_t pro_t , con_t ; pthread_create(&pro_t , NULL , producer, NULL ); pthread_create(&con_t , NULL , consumer, NULL ); pthread_join(pro_t , NULL ); pthread_join(con_t , NULL ); return 0 ; }
2.2 条件变量状态变化分析
在单生产者-单消费者模型中,两个条件变量 not_full
和 not_empty
的状态变化可以用二进制状态(0和1)来简化理解。
需要明确的是 ,条件变量本身并不存储状态信息。它们只是线程同步的机制,用于:
让线程在某个条件不满足时进入等待状态
当条件可能满足时唤醒等待的线程
真正的状态信息存储在共享变量(如 count)中 ,条件变量只是基于这些共享变量来控制线程的等待和唤醒。
可以将条件变量关联的条件抽象为二进制状态:
not_full
关联的条件:count < BUFFER_SIZE
(缓冲区未满)
not_empty
关联的条件:count > 0
(缓冲区非空)
状态变化过程分析 :
初始状态
count = 0
(缓冲区为空)
not_full
状态: 1 (真,因为 0 < BUFFER_SIZE)
not_empty
状态: 0 (假,因为 0 == 0)
生产者生产一个项目
生产者获取互斥锁
检查 count == BUFFER_SIZE
? 否,所以不等待
将项目放入缓冲区,count++
现在 count = 1
发送 not_empty
信号
释放互斥锁
状态变化:
not_full
状态: 1 (真,因为 1 < BUFFER_SIZE)
not_empty
状态: 1 (真,因为 1 > 0)
消费者消费一个项目
消费者获取互斥锁
检查 count == 0
? 否,所以不等待
从缓冲区取出项目,count--
现在 count = 0
发送 not_full
信号
释放互斥锁
状态变化:
not_full
状态: 1 (真,因为 0 < BUFFER_SIZE)
not_empty
状态: 0 (假,因为 0 == 0)
缓冲区满时生产者尝试生产
假设缓冲区已满 (count == BUFFER_SIZE
)
生产者获取互斥锁
检查 count == BUFFER_SIZE
? 是,所以等待 not_full
生产者线程阻塞,释放互斥锁
状态变化:
not_full
状态: 0 (假,因为 count == BUFFER_SIZE)
not_empty
状态: 1 (真,因为 count > 0)
消费者消费后唤醒生产者
消费者获取互斥锁
消费一个项目,count--
现在 count = BUFFER_SIZE - 1
发送 not_full
信号,唤醒等待的生产者
释放互斥锁
状态变化:
not_full
状态: 1 (真,因为 count < BUFFER_SIZE)
not_empty
状态: 1 (真,因为 count > 0)
缓冲区空时消费者者尝试消费
假设缓冲区为空 (count == 0
)
消费者获取互斥锁
检查 count == 0
? 是,所以等待 not_empty
消费者线程阻塞,释放互斥锁
状态变化:
not_full
状态: 1 (真,因为 count < BUFFER_SIZE)
not_empty
状态: 0 (假,因为 count == 0)
条件变量特点 :
条件变量不是状态存储机制 :条件变量本身不存储状态,它们只是基于共享变量(如 count
)来控制线程的等待和唤醒。
二元状态是抽象的 :将条件变量视为具有二元状态是一种简化理解的方式,实际上它们反映的是共享变量的状态。
条件变量总是成对出现 :在生产者和消费者模型中,通常需要两个条件变量,一个用于生产者等待(缓冲区不满),一个用于消费者等待(缓冲区不空)。
状态转换是同步的 :条件变量的状态变化总是与共享变量的修改同步发生,并通过互斥锁保护。
唤醒操作可能没有等待者 :发送条件变量信号时,如果没有线程在等待该条件,信号会被忽略,不会产生任何效果。
3 条件变量实现多生产者-多消费者
3.1 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #define BUFFER_SIZE 10 #define PRODUCER_NUM 2 #define CONSUMER_NUM 4 int buffer[BUFFER_SIZE];int count = 0 ; int in = 0 , out = 0 ; pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;void * producer (void * arg) { int producer_id = *(int *)arg; int i, item; for (i = 0 ; i < 20 ; i ++ ) { item = i; pthread_mutex_lock(&mtx); while (count == BUFFER_SIZE) { pthread_cond_wait(¬_full, &mtx); } buffer[in] = item; in = (in + 1 ) % BUFFER_SIZE; count ++ ; printf ("生产者 %d 生产: %d, 缓冲区数量: %d\n" , producer_id, item, count); pthread_mutex_unlock(&mtx); pthread_cond_broadcast(¬_empty); usleep(10000 ); } return NULL ; }void * consumer (void * arg) { int consumer_id = *(int *)arg; int i, item; for (int i = 0 ; i < 10 ; i ++ ) { item = i; pthread_mutex_lock(&mtx); while (count == 0 ) { pthread_cond_wait(¬_empty, &mtx); } item = buffer[out]; out = (out + 1 ) % BUFFER_SIZE; count -- ; printf ("消费者 %d 消费: %d, 缓冲区数量: %d\n" , consumer_id, item, count); pthread_mutex_unlock(&mtx); pthread_cond_broadcast(¬_full); usleep(15000 ); } return NULL ; }int main () { pthread_t pro_t [PRODUCER_NUM]; pthread_t con_t [CONSUMER_NUM]; int pro_id[PRODUCER_NUM]; int con_id[CONSUMER_NUM]; for (int i = 0 ; i < PRODUCER_NUM; i ++ ) { pro_id[i] = i + 1 ; pthread_create(&pro_t [i], NULL , producer, &pro_id[i]); } for (int i = 0 ; i < CONSUMER_NUM; i ++ ) { con_id[i] = i + 1 ; pthread_create(&con_t [i], NULL , consumer, &con_id[i]); } for (int i = 0 ; i < PRODUCER_NUM; i ++ ) { pthread_join(pro_t [i], NULL ); } for (int i = 0 ; i < CONSUMER_NUM; i ++ ) { pthread_join(con_t [i], NULL ); } return 0 ; }
3.2 虚假唤醒
虚假唤醒(Spurious Wakeup)是多线程编程中一个重要的概念,指的是等待在条件变量上的线程 在没有收到明确信号的情况下被意外唤醒 的现象。通常 虚假唤醒 的问题是由于多线程环境下 系统调度、信号传输 等原因导致的。
这是 POSIX 标准允许的行为,主要出于以下原因:
系统调度/中断/信号干扰 (历史原因)
早期 Unix 系统中,当线程被信号(signal,如 SIGUSR1)中断时,可能从 wait 中提前返回。虽然现代系统大多已修复,但标准仍保留此行为以兼容。
多核 CPU 的内存一致性/缓存同步问题
在 SMP(对称多处理器)系统中,不同 CPU 核心的缓存状态可能暂时不一致,导致条件判断出现“幻读”,线程误以为条件满足而醒来。
操作系统实现的权衡
强制要求“绝对不虚假唤醒”会增加系统开销(如加更多锁、更多内存屏障)。POSIX 选择允许虚假唤醒,把正确性责任交给程序员 —— 你必须用 while 重检查条件。
POSIX 标准明确允许虚假唤醒的发生,但需要避免虚假唤醒的发生。通常使用while循环而不是if语句来检查条件:
1 2 3 4 5 6 pthread_mutex_lock(&mutex);while (condition_is_false) { pthread_cond_wait(&cond, &mutex); } pthread_mutex_unlock(&mutex);
使用 while 循环 防止虚假唤醒 (spurious wakeup),确保被 唤醒后再次检查条件是否满足 。
为什么while循环能解决虚假唤醒问题 :
这种方法确保了线程只有在条件真正满足时才会继续执行。
3.3 消费者退出机制
如果生产者生产的数据数量达不到消费者的总取数据数量,那么在所有生产者执行完毕后,剩余的没有执行完的消费者线程会一直等待下去。
因此,可以考虑添加一个消费者的退出机制:当没有生产者会再生产数据时,剩余的消费者直接退出 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #define BUFFER_SIZE 10 #define PRODUCER_NUM 3 #define CONSUMER_NUM 5 int buffer[BUFFER_SIZE];int count = 0 ; int in = 0 , out = 0 ; int production_done = 0 ; pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;void * producer (void * arg) { int producer_id = *(int *)arg; int i, item; for (i = 0 ; i < 10 ; i ++ ) { item = i; pthread_mutex_lock(&mtx); while (count == BUFFER_SIZE) { pthread_cond_wait(¬_full, &mtx); } buffer[in] = item; in = (in + 1 ) % BUFFER_SIZE; count ++ ; printf ("生产者 %d 生产: %d, 缓冲区数量: %d\n" , producer_id, item, count); pthread_mutex_unlock(&mtx); pthread_cond_broadcast(¬_empty); usleep(10000 ); } return NULL ; }void * consumer (void * arg) { int consumer_id = *(int *)arg; int item; for (int i = 0 ; i < 10 ; i ++ ) { pthread_mutex_lock(&mtx); while (count == 0 && !production_done) { pthread_cond_wait(¬_empty, &mtx); } if (count == 0 && production_done) { pthread_mutex_unlock(&mtx); break ; } item = buffer[out]; out = (out + 1 ) % BUFFER_SIZE; count -- ; printf ("消费者 %d 消费: %d, 缓冲区数量: %d\n" , consumer_id, item, count); pthread_mutex_unlock(&mtx); pthread_cond_broadcast(¬_full); usleep(15000 ); } return NULL ; }int main () { pthread_t pro_t [PRODUCER_NUM]; pthread_t con_t [CONSUMER_NUM]; int pro_id[PRODUCER_NUM]; int con_id[CONSUMER_NUM]; for (int i = 0 ; i < PRODUCER_NUM; i ++ ) { pro_id[i] = i + 1 ; pthread_create(&pro_t [i], NULL , producer, &pro_id[i]); } for (int i = 0 ; i < CONSUMER_NUM; i ++ ) { con_id[i] = i + 1 ; pthread_create(&con_t [i], NULL , consumer, &con_id[i]); } for (int i = 0 ; i < PRODUCER_NUM; i ++ ) { pthread_join(pro_t [i], NULL ); } pthread_mutex_lock(&mtx); production_done = 1 ; pthread_mutex_unlock(&mtx); pthread_cond_broadcast(¬_empty); for (int i = 0 ; i < CONSUMER_NUM; i ++ ) { pthread_join(con_t [i], NULL ); } return 0 ; }
添加 production_done
标志 :这是一个共享变量,用于指示所有生产者已经完成。
消费者检查终止条件 :在消费者中,等待条件改为 while (count == 0 && !production_done)
。如果缓冲区为空且生产者已完成,消费者则退出循环。
主线程设置标志 :在主线程中,等待所有生产者完成后,设置 production_done
为1,并广播 not_empty
条件以唤醒所有等待的消费者,让它们检查终止条件。
这样,即使消费者尝试消费的次数多于生产次数,它们也会在生产者结束后正常退出,避免程序卡住。
4 C++ 多线程 条件变量
可见 「C++ 多线程」生产者-消费者问题及 std::condition_variable