在现代 C++ 开发中,异步编程是提高程序性能与响应性的关键手段。C++17 为我们提供了强大的工具集,包括 std::async、std::future、std::packaged_task 等,让我们能够以最简洁的方式构建并发程序。本文将通过一个完整的示例,展示如何使用这些标准库功能实现一个高效的异步计算框架,并说明其工作原理和最佳实践。
1. 需求场景
假设我们需要对一大批数据进行计算密集型处理(例如对图片进行滤镜、对数值进行统计等)。若将所有计算同步执行,CPU 将在一次性完成所有任务后才进入下一阶段,导致明显的性能瓶颈。相反,使用异步计算可以在后台线程池中并行完成任务,同时主线程可以继续处理用户交互或其他轻量级任务,从而显著提升系统吞吐量。
2. 基础工具回顾
| 名称 | 作用 | 典型用法 |
|---|---|---|
std::async |
在后台线程启动一个函数,并返回 std::future |
auto f = std::async(std::launch::async, func, args...); |
std::future |
表示未来某时刻会返回的值,支持同步等待 (get()) 或非阻塞查询 (wait_for) |
f.get(); |
std::packaged_task |
将一个函数包装成可在任意线程中执行的对象,并提供 future 供结果获取 |
std::packaged_task<int()> task{func}; |
std::promise |
与 future 结合使用,手动设置结果 |
`std::promise |
| p; auto f = p.get_future(); p.set_value(42);` |
Tip:
std::async的默认启动策略是std::launch::async | std::launch::deferred。如果你想确保在新线程中执行,显式指定std::launch::async。
3. 设计思路
我们将实现一个简易的 异步任务调度器,核心功能包括:
- 任务提交:将任意可调用对象提交给调度器,返回一个
std::future供结果获取。 - 线程池:使用
std::thread动态维护一组工作线程,避免频繁创建销毁线程。 - 任务队列:使用线程安全的
std::queue结合std::condition_variable,实现生产者-消费者模式。 - 结果回调:支持在任务完成后执行回调函数(可选)。
4. 代码实现
下面给出完整的实现代码(C++17 兼容):
#include <iostream>
#include <thread>
#include <future>
#include <vector>
#include <queue>
#include <functional>
#include <condition_variable>
#include <atomic>
class AsyncTaskScheduler {
public:
explicit AsyncTaskScheduler(std::size_t thread_count = std::thread::hardware_concurrency())
: stop_flag(false)
{
for (std::size_t i = 0; i < thread_count; ++i) {
workers.emplace_back([this] { this->worker_thread(); });
}
}
~AsyncTaskScheduler() {
stop();
}
// 提交任务并返回 future
template <typename Func, typename... Args>
auto submit(Func&& f, Args&&... args)
-> std::future<decltype(f(args...))>
{
using RetType = decltype(f(args...));
auto task_ptr = std::make_shared<std::packaged_task<RetType()>>(
std::bind(std::forward <Func>(f), std::forward<Args>(args)...));
std::future <RetType> fut = task_ptr->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace([task_ptr]() { (*task_ptr)(); });
}
queue_cond.notify_one();
return fut;
}
// 终止所有线程
void stop() {
if (!stop_flag.exchange(true)) {
queue_cond.notify_all();
for (auto& th : workers) {
if (th.joinable()) th.join();
}
}
}
private:
void worker_thread() {
while (!stop_flag) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
queue_cond.wait(lock, [this] { return stop_flag || !tasks.empty(); });
if (stop_flag && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
try {
task();
} catch (...) {
// 任务内部异常已由 packaged_task 处理
}
}
}
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable queue_cond;
std::atomic <bool> stop_flag;
};
关键点说明
packaged_task包装:将任意函数与其参数绑定后,转换为无参数的可调用对象,便于放入统一的任务队列。- 线程安全:使用
mutex+condition_variable保障对任务队列的同步访问。 - 自动销毁:在析构函数中调用
stop(),确保所有线程安全退出。
5. 使用示例
int heavy_computation(int x, int y) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return x * y;
}
int main() {
AsyncTaskScheduler scheduler(4); // 4 个工作线程
// 提交 10 个任务
std::vector<std::future<int>> results;
for (int i = 0; i < 10; ++i) {
results.emplace_back(scheduler.submit(heavy_computation, i, i + 1));
}
// 读取结果
for (auto& fut : results) {
std::cout << "Result: " << fut.get() << std::endl;
}
// Scheduler 自动销毁
return 0;
}
运行时,你会看到程序在约 300-400 毫秒内完成所有任务(相比串行约 1 秒),证明了并行优势。
6. 性能调优与注意事项
| 方向 | 建议 |
|---|---|
| 线程数量 | 通常设置为 std::thread::hardware_concurrency() 或 * 2,避免线程上下文切换过多。 |
| 任务粒度 | 任务应足够“重”,以抵消线程创建与调度开销。若任务过于轻量,考虑批量合并。 |
| 异常处理 | packaged_task 会捕获任务内部异常并在 future.get() 时重新抛出,保持异常安全。 |
| 回调机制 | 若需要在主线程更新 UI,可使用 std::promise 与主线程的事件循环结合。 |
| 内存占用 | 线程池固定大小避免内存膨胀;使用 reserve 预分配任务队列大小可降低动态分配成本。 |
7. 进一步探索
- C++20 协程:使用
co_await与std::future结合,可实现更直观的异步链式调用。 - 第三方库:如 Boost.Asio、TBB、folly 等提供更丰富的异步工具。
- 任务优先级:在队列中加入优先级字段,使用
std::priority_queue改进任务调度。
8. 结语
通过标准库中的 std::async、std::future 与自建的线程池,C++ 开发者可以轻松构建高性能的异步计算框架。上述实现展示了从基本概念到完整代码的完整链路,既满足了性能需求,又保持了代码可维护性。希望本文能帮助你在项目中更好地利用 C++17 的并发特性,提升程序效率与响应速度。