**C++20 中的 std::future 与 async:如何避免竞态条件并实现高效并发?**

在现代 C++(尤其是 C++11 以后)中,标准库提供了强大的并发工具。最常用的组合之一是 std::asyncstd::future,它们可以让你在不同线程上异步执行函数,并在需要时获得结果。本文将从 线程安全错误传播、以及 性能优化 三个方面深入剖析,帮助你在项目中高效、安全地使用这两者。


1. 基础用法回顾

#include <future>
#include <iostream>

int heavy_task(int x) {
    // 例如计算阶乘
    int result = 1;
    for (int i = 1; i <= x; ++i) result *= i;
    return result;
}

int main() {
    // std::launch::async 强制在新线程中执行
    std::future <int> f = std::async(std::launch::async, heavy_task, 10);

    // 主线程做其他工作
    std::cout << "主线程继续工作...\n";

    // 等待结果(如果任务已完成则立即返回)
    int res = f.get();
    std::cout << "结果: " << res << std::endl;
}

上述代码非常直观,但如果要在多线程环境下安全使用,还需要注意以下细节。


2. 竞态条件与 std::future 的安全性

2.1 std::future 只能被一次取值

  • f.get() 调用后,future 被标记为已获取,后续再次调用会抛出 std::future_error
  • 这意味着 只能在一个线程 调用 get(),除非你显式复制 std::futurestd::shared_future 解决此问题)。

2.2 std::async 的调度策略

  • 默认策略:std::launch::async | std::launch::deferred,根据实现决定。
  • 在高并发情况下,默认策略可能导致任务被延迟执行(deferred),这会导致你误以为任务已完成但实际上没有跑。

建议:显式指定 std::launch::async,确保任务立即在新线程执行,避免调度不确定性。

2.3 共享结果:std::shared_future

std::future <int> f = std::async(std::launch::async, heavy_task, 20);
std::shared_future <int> sf = f.share(); // 现在可多次 get()
  • shared_future 允许多个线程并行读取结果,内部使用引用计数实现安全。

3. 异常传播与错误处理

3.1 异常在 async 里如何传递?

  • 如果异步函数抛出异常,std::future::get() 会重新抛出该异常。
  • 在主线程中调用 get() 前,最好使用 try-catch 包裹。
try {
    int val = f.get(); // 若异步函数抛异常,将在此捕获
} catch(const std::exception& e) {
    std::cerr << "异步错误: " << e.what() << '\n';
}

3.2 超时控制

C++ 标准库不提供直接的超时 future::get(),但可以结合 std::future_statuswait_for

if (f.wait_for(std::chrono::seconds(2)) == std::future_status::ready) {
    int val = f.get();
} else {
    std::cerr << "任务超时!\n";
}

这可以避免 get() 阻塞过久。


4. 性能优化技巧

4.1 减少上下文切换

  • 批量提交:如果你有一组需要并行计算的小任务,最好使用线程池(如 std::async 与自定义线程池结合),避免频繁创建销毁线程。
  • 线程亲和性:对高性能计算,使用 std::thread::native_handle() 设置 CPU 亲和性,可提升缓存局部性。

4.2 使用 std::packaged_task

  • 当你需要在运行时决定是否异步执行时,std::packaged_taskstd::future 组合更灵活。
  • packaged_task 可以在任何线程上调用 operator(),而 future 则保持同步访问结果。
std::packaged_task<int()> task(heavy_task);
std::future <int> f = task.get_future();
std::thread(std::move(task), 30).detach();

4.3 避免 std::async 过度使用

  • std::async 内部会根据实现创建线程,使用过多可能导致系统线程数飙升。
  • 对于频繁调用的轻量任务,建议使用同步调用或自定义线程池。

5. 实战案例:并行计算矩阵乘法

#include <vector>
#include <future>
#include <iostream>

using Matrix = std::vector<std::vector<int>>;

// 单行乘法
int row_multiply(const Matrix& A, const Matrix& B, int row, int col, int width) {
    int sum = 0;
    for (int k = 0; k < width; ++k)
        sum += A[row][k] * B[k][col];
    return sum;
}

Matrix parallel_matrix_mul(const Matrix& A, const Matrix& B) {
    int n = A.size();
    Matrix C(n, std::vector <int>(n, 0));

    std::vector<std::future<int>> futures;
    for (int i = 0; i < n; ++i) {
        for (int j = 0; j < n; ++j) {
            futures.push_back(std::async(std::launch::async,
                                          row_multiply, std::cref(A), std::cref(B),
                                          i, j, n));
        }
    }

    int idx = 0;
    for (int i = 0; i < n; ++i) {
        for (int j = 0; j < n; ++j) {
            C[i][j] = futures[idx++].get(); // 线程安全,单线程获取
        }
    }
    return C;
}

int main() {
    Matrix A = {{1,2,3},{4,5,6},{7,8,9}};
    Matrix B = {{9,8,7},{6,5,4},{3,2,1}};
    Matrix C = parallel_matrix_mul(A,B);

    for (auto& row : C) {
        for (int v : row) std::cout << v << ' ';
        std::cout << '\n';
    }
}

此例演示如何把每个矩阵元素的计算交给一个异步任务,并通过 future::get() 安全收集结果。


6. 结语

  • 显式指定 std::launch::async:保证任务立即执行,避免被推迟。
  • 使用 std::shared_future:多线程共享同一结果时避免竞争。
  • 异常传播:利用 future::get() 自动抛出,结合 try-catch 处理。
  • 性能考量:避免过度创建线程,必要时使用线程池。

只要遵循这些原则,std::asyncstd::future 可以成为你 C++ 并发编程的强大助手,既能保持代码简洁,又能确保线程安全。祝编码愉快!

发表评论