「C++ 多线程」std::promise 线程通信

文章大图来源:pixiv_id=104287282

1 问题背景

当我们需要实现线程之间的数据传递(线程通信)等操作时,我们通常需要考虑同步机制(互斥锁等)来确保传输数据的正确性。假如一类线程接收结果,另一类线程负责修改并传递结果,在多线程情况下需要互斥访问来确保数据正确,避免数据竞争。

还有一个关键的问题在于:接收结果的线程 往往需要等待 修改并传递结果的线程 完全执行完毕 才能正确地获取数据结果。

以下是一个简单的线程通信示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <iostream>
#include <thread>
#include <chrono>
#include <future>

void foo(int& res, int x){
std::cout << "foo(): starts running, thread id = ";
std::cout << std::this_thread::get_id() << "\n";

// 模拟数据操作所需时间
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
x ++ ;

// 将 res 置为 x,子线程执行完毕后再用于其他线程
res = x;

// 模拟线程函数其他操作
std::this_thread::sleep_for(std::chrono::milliseconds(2000));

std::cout << "foo(): ends running, thread id = ";
std::cout << std::this_thread::get_id() << "\n";
}

int main(){
std::cout << "main(): starts running, thread id = ";
std::cout << std::this_thread::get_id() << "\n";

// 通过 std::promise 实现子线程和主线程之间的线程通信
int res = 0;
std::thread t(foo, std::ref(res), 1); // 创建子线程

// 子线程数据操作需要一段时间,不知道什么时候可以获取正确的数据结果
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "may get the wrong res = " << res << "\n";

// 调用 join(),等待子线程的其他操作执行完毕
t.join();

// 等待子线程执行完毕,可以顺利获取正确的数据结果
std::cout << "finally get the res = " << res << "\n";

std::cout << "main(): ends running, thread id = ";
std::cout << std::this_thread::get_id() << "\n";

return 0;
}

在上面的代码中,我们创建一个子线程用于修改数据和传递数据,而主线程负责接收修改后的数据结果。

  • foo() 函数:

    foo() 子线程函数中,我们通过传递一个整形引用参数来实现子线程内部直接修改在 main() 中定义的 res 变量,用于将子线程处理的结果传递给主线程。

    foo() 子线程函数中,我们通过让线程休眠一段时间来模拟数据的长时间操作。然后对传入的参数 x 进行自增操作(x ++),之后将 x 的值赋给 res 变量。

    在后面,我们又通过让线程休眠一段时间模拟其他的一系列操作。

  • main() 函数:

    在主线程中,我们创建了一个子线程来执行 foo() 函数。

    在主线程调用 join() 之前,子线程还未完全执行完毕。我们可以尝试获取 res 的结果,但是有可能此时子线程还未执行完数据操作,我们也不知道具体什么时候可以执行完。因此 很有可能第一次获取 res 的结果是错误的

    在主线程调用 join() 之后,子线程 执行完毕,我们 此时获取 res 的结果是正确的。但是,子线程后面又执行了一段时间的其他操作,我们等待其执行完其他无关的操作后才获取了 res 的值,这就比较低效了。

可能的运行结果如下:

1
2
3
4
5
6
main(): starts running, thread id = 0xbc
foo(): starts running, thread id = 0xc0
may get the wrong res = 0
foo(): ends running, thread id = 0xc0
finally get the res = 2
main(): ends running, thread id = 0xbc

可以看出,第一次获取结果是不符合预期的。等 foo() 执行完毕后再进行第二次获取结果,此时的结果是正确的。

那么如何能够让主线程在子线程运行的中途顺利获取数据?为此 C++ 11 引入了 std::promise 实现线程之间的通信。

2 std::promise

2.1 基本概念

std::promise 是 C++ 11 引入的一个模板类,用于 在一个线程中设置一个值(或异常),然后 在另一个线程中获取这个值(或处理异常)。它主要用于线程间的同步通信,通常与 std::future 一起使用。

std::promise 提供了一种机制,使得一个线程可以 “承诺” 提供一个结果,而另一个线程可以通过 std::future 来等待这个结果。

std::promise 是不可复制的。如果是可复制的,那么就会出现多个对象可以设置同一个 std::future 关联的值的情况,这会导致语义的混乱。

2.2 基本用法

假如我们要在主线程中接收一个结果,在一个创建的子线程中修改并提供这个结果。

首先,在主线程中创建一个 std::promise 对象,我们需要传递的是一个 int 类型的结果,因此使用 std::promist<int> pro 创建一个传递该类型的对象。

然后,采用 std::future<int> res = pro.get_future(),通过调用 std::promise::get_future() 函数来获取与 pro 关联的 std::future 对象。

之后启动一个新的子线程,子线程函数需要传递 std::promise<int> 对象的引用。在子线程函数 foo() 中通过调用 std::promise::set_value() 来将 std::promise 设置为我们想要的结果。

注意: std::promise 对象只能成功地 set_value 一次。通过 set_value 之后,其内部状态就从 “未完成” 变为 “已完成”,这种状态的改变是不可逆的,并且会同步更新与之关联的 std::future 对象的状态。

最后,在主线程中我们通过调用 std::future::get() 来获取结果。

以下是一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <iostream>
#include <thread>
#include <future>

void foo(std::promise<int>& pro) {
int result = 1314;
pro.set_value(result); // 设置我们想要的结果
// pro.set_value(233); // set_value twice, ERROR
}

int main() {
std::promise<int> pro; // 创建 promise 对象
std::future<int> res = pro.get_future(); // 获取 pro 关联的 future 对象
std::thread t(foo, std::ref(pro)); // 开启子线程,传入 pro

// 调用 future::get() 等待获取结果
int data = res.get();
std::cout << "finally the res = " << data << std::endl;

t.join();

return 0;
}

运行结果如下:

1
finally the res = 1314

上面的示例缺乏一个场景,看起来比较抽象。接下来我们可以对比 1 问题背景 中的示例,并实现一个使用 std::promise 的线程通信代码。

2.3 实现线程通信

1 问题背景 中的示例中,当子线程还没有处理完数据,我们进行第一次尝试获取结果时,结果是不符合预期的。如果子线程完整地执行完毕,我们方可获取正确的结果。

但是,子线程中数据处理完之后的操作和我们想要获取结果的部分没有什么关系。我们或许可以尝试在主线程中等待一段时间再中途获取结果,但是 无法确切地知道什么时候可以获取正确的结果

这时候 std::promise 配合 std::future 的优势就凸显出来了。

我们可以像 2.2 std::promise 基本用法 中类似地做法:

  • 创建一个 std::promise<int> 类型的对象 pro,用于保存传递给当前主线程的数据;
  • 通过 std::future<int> res = pro.get_future() 来获取与 pro 关联的 std::future 对象,后续主线程可以通过这个 std::future 对象来获取子线程传递过来的数据;
  • 创建一个子线程 t,线程接口函数为 foo() ,也就是处理数据并设置数据值的函数。同时,通过 std::ref(pro) 来传递 pro 对象的引用,使得子线程可以通过调用 std::promise::set_value() 来设置这个对象中保存的值。

在主线程中调用 std::future::get() 函数获取结果时:

  • 如果子线程已经处理数据结束了,那么就可以立即顺利地获得结果;
  • 如果子线程还没有处理数据结束(还没有运行到 set_value() 设置数值的步骤),那么 主线程会被阻塞,等待子线程运行到 set_value() 设置数值,然后获取结果。

最后调用 t.join() 等待子线程完整地执行完毕(子线程可能还有一些其他操作未完成)。

这样我们就实现了线程间的通信,而且主线程可以在中途恰当的时机获取想要的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <thread>
#include <chrono>
#include <future>

void foo(std::promise<int> &pro, int x){
std::cout << "foo(): starts running, thread id = ";
std::cout << std::this_thread::get_id() << "\n";

// 模拟数据操作所需时间
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
x ++ ;

// 中途 set_value 来传递操作后的数据给其他线程
pro.set_value(x);

// 模拟线程函数其他操作
std::this_thread::sleep_for(std::chrono::milliseconds(2000));

std::cout << "foo(): ends running, thread id = ";
std::cout << std::this_thread::get_id() << "\n";
}

int main(){
std::cout << "main(): starts running, thread id = ";
std::cout << std::this_thread::get_id() << "\n";

// 通过 std::promise 实现子线程和主线程之间的线程通信
std::promise<int> pro; // 声明一个 promise 对象,保存值类型为 int
std::future<int> res = pro.get_future(); // 创建 std::future 获取数据
std::thread t(foo, std::ref(pro), 1); // 创建子线程

// 主线程调用 get() 获取结果。若子线程内部还未 set_value,会阻塞当前主线程
int data = res.get();
std::cout << "finally get the res = " << data << "\n";

// 获取结果才 join(),等待子线程的其他操作执行完毕
t.join();

std::cout << "main(): ends running, thread id = ";
std::cout << std::this_thread::get_id() << "\n";

return 0;
}

可能的运行结果如下:

1
2
3
4
5
main(): starts running, thread id = 0xac
foo(): starts running, thread id = 0xb4
finally get the res = 2
foo(): ends running, thread id = 0xb4
main(): ends running, thread id = 0xac

可以看出,在子线程还没完全运行结束时,主线程中已经顺利地获取了想要的结果。主线程和子线程之间的线程通信完美实现了⭐

参考

  1. std::promise::get_future

  2. std::promise::set_value


「C++ 多线程」std::promise 线程通信
https://marisamagic.github.io/2025/01/03/20250103/
作者
MarisaMagic
发布于
2025年1月3日
许可协议