「C++ 多线程」std::shared_future 多线程等待结果

文章大图来源:pixiv_id=73212619

1 std::shared_future

1.1 基本概念

std::shared_future 是 C++11 引入的一个模板类,位于 <future> 头文件中,用于异步任务结果的 共享获取

std::future 对象在获取关联的异步操作结果后就不能再被复用,因为获取结果后其内部的状态改变,且状态是不可逆的。如果我们需要多个线程获取同一个异步任务地结果,那么仅仅使用 std::future 就不可行了。

std::shared_future 可以被多个线程安全地共享,以 并发地获取相同的异步结果

1.2 基本用法

std::shared_future 是从 std::future 转换而来,转换的方法是调用 std::future::share() 从而获取相同存储类型的一个 std::shared_future 对象:

1
2
3
4
// std::future 对象
std::future<int> res = std::async(std::launch::async, 1, 2);
// 通过 std::future::share() 获取共享的对象
std::shared_future<int> shared_res = res.share(); // 涉及 std::move

注意: 当调用 std::future::share() 函数来获取 std::shared_future 时,内部涉及了 移动语义 std::movestd::future 只可移动,不可复制。

在调用 std::future::share(),原本的 std::future 对象也 不再有对异步任务结果的访问权 了。我们也可以用移动语义 std::move 来获取共享的 std::shared_future 对象:

1
2
// 或者采用移动语义
std::shared_future<int> shared_res(std::move(res));

2 std::shared_future 与 std::async

2.1 示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <iostream>
#include <thread>
#include <future>
#include <chrono>

int add(int a, int b){ return a + b; }

void foo(std::shared_future<int>& shared_res){
std::cout << "foo(): thread id = " << std::this_thread::get_id() << "\n";
std::cout << "foo(): get the shared res = " << shared_res.get() << "\n";
}

int main(){
std::cout << "main(): thread id = " << std::this_thread::get_id() << "\n";

// std::async 创建一个异步任务,并获取关联的 std::future 对象
std::future<int> res = std::async(std::launch::async, add, 1, 2);
// 通过 share 获取共享的异步任务 std::future 对象
std::shared_future<int> shared_res = res.share(); // 内部 std::move

// (1) 开启一个子线程 t1 等待获取共享结果
std::thread t1(foo, std::ref(shared_res)); // 调用引用

// (2) 开启另一个子线程 t2 等待获取共享结果
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 休息一会,便于打印
std::thread t2([&](){
std::cout << "lambda(): thread id = " << std::this_thread::get_id() << "\n";
auto data = shared_res.get();
std::cout << "lambda(): get the shared res = " << data << "\n";
});

// (3) 主线程中等待获取共享结果
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 休息一会,便于打印
auto data = shared_res.get();
std::cout << "main(): get the shared res = " << data << "\n";

t1.join();
t2.join();

return 0;
}

在上面的代码中,我们通过 std::async 创建了一个异步任务,执行 add 函数并返回一个两数之和的结果。

然后,通过 std::future::share() 来获取此异步任务关联共享的 std::shared_future 对象 shared_res

接下来分别创建一个子线程 t1(参数传入 shared_res 的引用)、一个子线程 t2(执行 lambda 函数并与主函数共享作用域内的变量)以及主线程中调用 shared_res.get() 来等待共享的异步任务结果。

可能的运行结果如下:

1
2
3
4
5
6
main(): thread id = 0x5c
foo(): thread id = 0xb8
foo(): get the shared res = 3
lambda(): thread id = 0xc4
lambda(): get the shared res = 3
main(): get the shared res = 3

可以看出,三个线程都能够访问并获得结果,且所获得的共享的异步任务的结果是一样的。

2.2 获取结果时间测试

我们可以通过 std::chrono::high_resolution_clock::now() 测试一下三个线程获得共享结果的时间点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <iostream>
#include <thread>
#include <future>
#include <chrono>

int add(int a, int b){ return a + b; }

void foo(std::shared_future<int>& shared_res,
std::chrono::time_point<std::chrono::high_resolution_clock>& start){
auto data = shared_res.get();
// 输出获取结果的时间
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> dura = end - start;
std::cout << "foo(): duration = " << dura.count() << "ms" << "\n";
}

int main(){
std::future<int> res = std::async(std::launch::async, add, 1, 2);
std::shared_future<int> shared_res = res.share(); // 内部 std::move

// 获取起始时间点
auto start = std::chrono::high_resolution_clock::now();

// (1) 开启一个子线程 t1 等待获取共享结果
std::thread t1(foo, std::ref(shared_res), std::ref(start)); // 调用引用

// (2) 开启另一个子线程 t2 等待获取共享结果
std::thread t2([&](){
auto data = shared_res.get();
// 输出获取结果的时间
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> dura = end - start;
std::cout << "lambda(): duration = " << dura.count() << "ms" << "\n";
});

// (3) 主线程中等待获取共享结果
auto data = shared_res.get();
// 输出获取结果的时间
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> dura = end - start;
std::cout << "main(): duration = " << dura.count() << "ms" << "\n";

t1.join();
t2.join();

return 0;
}

可能的运行结果如下:

1
2
3
lambda(): duration = foo(): duration = main(): duration = 0.127ms
0.127ms
0.127ms

大致可以看出,三个线程获取共享任务的时间点几乎一样。如果函数中执行了其他的操作,可能会有细微的延迟。

3 std::shared_future 与 std::promise

也可以写一个 std::promise 来进行测试,创建一个可以中途传递数据的异步任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <iostream>
#include <thread>
#include <future>

void foo_set_value(std::promise<int>& pro, int x){
std::cout << "foo_set_value(): starts running, thread id = ";
std::cout << std::this_thread::get_id() << "\n";

// 模拟数据操作需要的时间
std::this_thread::sleep_for(std::chrono::milliseconds(2000));

// 中途 set_value 来传递操作后的数据给其他线程
pro.set_value(x);

// 线程函数模拟进行其他操作
std::this_thread::sleep_for(std::chrono::milliseconds(2000));

std::cout << "foo_set_value(): ends running, thread id = ";
std::cout << std::this_thread::get_id() << "\n";
}

void foo_get_res(std::shared_future<int>& shared_res){
std::cout << "foo_get_res(): starts running, thread id = ";
std::cout << std::this_thread::get_id() << "\n";

// 等待获取结果
int data = shared_res.get();
std::cout << "foo_get_res(): get the res = " << data << "\n";

std::cout << "foo_get_res(): ends running, thread id = ";
std::cout << std::this_thread::get_id() << "\n";
}

int main() {
std::cout << "main(): starts running, thread id = ";
std::cout << std::this_thread::get_id() << "\n";

// 通过 std::promise 实现子线程和主线程之间的线程通信
std::promise<int> pro; // 声明一个 promise 对象,保存值类型为 int
std::future<int> res = pro.get_future(); // 创建 std::future 获取数据
std::shared_future<int> shared_res = res.share(); // 共享 future 对象
std::thread t_set_value(foo_set_value, std::ref(pro), 1314); // 创建子线程

// 主线程调用 get() 等待获取结果。若子线程内部还未 set_value,会阻塞当前主线程
int data = shared_res.get();
std::cout << "main(): get the res = " << data << "\n";

// 开启一个新的子线程,也等待获取结果
std::thread t_get_res(foo_get_res, std::ref(shared_res));

// 主线程模拟进行其他操作
std::cout << "main(): starts doing something else..." << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
std::cout << "main(): ends doing something else..." << "\n";

// join(),等待子线程的其他操作执行完毕
t_get_res.join();
t_set_value.join();

std::cout << "main(): ends running, thread id = ";
std::cout << std::this_thread::get_id() << "\n";

return 0;
}

在上面的代码中,foo_set_value 函数是异步任务执行的函数,传入一个 std::promise 参数用于在函数中设置结果的值并中途传递给其他线程。同时我们开启一个 t_set_value 子线程来执行异步任务。

foo_get_res 函数由一个子线程 t_get_res 执行,我们将开启一个子线程来获取 std::promise 所关联的 std::shared_future 的异步任务结果值。

在主函数中,我们同样也进行一个来获取异步任务结果值,同时,之后在进行一些其他操作。

可能的运行结果如下:

1
2
3
4
5
6
7
8
9
10
main(): starts running, thread id = 0xac
foo_set_value(): starts running, thread id = 0xb8
main(): get the res = 1314
main(): starts doing something else...
foo_get_res(): starts running, thread id = 0xc8
foo_get_res(): get the res = 1314
foo_get_res(): ends running, thread id = 0xc8
foo_set_value(): ends running, thread id = 0xb8
main(): ends doing something else...
main(): ends running, thread id = 0xac

可以看出,由于主线程中获取结果的步骤放在了开启一个子线程获取结果这步的上面,所以主线程先获得结果了。

然后主线程将开始做一些其他事情,在进行其他事情的操作期间,开启的子线程 t_get_res 获得异步任务的结果。

foo_set_value 中,调用 std::promise::set_value() 之后还进行了其他操作。故当主线程和子线程先后都获得结果后的一段时间,执行 foo_set_value() 的异步任务子线程 t_set_value 才结束。

参考

  1. std::shared_future

「C++ 多线程」std::shared_future 多线程等待结果
https://marisamagic.github.io/2025/01/05/20250105/
作者
MarisaMagic
发布于
2025年1月5日
许可协议