1. 需求背景
在现代 C++ 开发中,异步编程已成为提高并发性能的关键手段。常见的做法有 std::async、std::thread、第三方库 boost::asio、libuv 等。但它们往往只满足单一使用场景,缺乏灵活性、易用性和可扩展性。本文旨在展示如何使用 C++20 的协程(co_await、co_return)和 `
` 库,构建一个轻量、可组合、通用的异步任务调度框架。
## 2. 核心概念
| 概念 | 说明 |
|——|——|
| **任务(Task)** | 代表一次可异步执行的操作,返回值或异常。使用 `std::future` 或自定义 `Task
` 包装。 |
| **调度器(Scheduler)** | 负责把任务投递到线程池、事件循环或特定线程。 |
| **协程适配器(CoroutineAdapter)** | 把协程转化为 `Task`,允许在协程中使用 `co_await` 调用异步函数。 |
| **线程池(ThreadPool)** | 基础执行单元,管理固定数量的工作线程。 |
## 3. 设计思路
1. **Task
**
– 通过 `std::shared_ptr>` 与 `std::future` 组合实现。
– 提供 `co_await` 接口:内部实现 `operator co_await()`,返回 `Awaiter`,实现 `await_ready()`、`await_suspend()`、`await_resume()`。
2. **Scheduler**
– 抽象基类 `IScheduler`,接口 `schedule(TaskBase& task)`。
– 具体实现 `ThreadPoolScheduler`:使用 `std::thread` 和 `std::queue` 管理任务。
– 另一个实现 `SingleThreadScheduler`:在单线程中顺序执行,适合 UI 线程。
3. **协程适配器**
– `asyncify` 函数:将普通函数包装为 `Task
`。
– 示例:`Task
read_file_async(const std::string& path);`
4. **错误传播**
– `Task` 内部捕获异常,并将其存入 `std::promise`,随后 `co_await` 时通过 `std::future::get()` 重新抛出。
## 4. 核心代码
“`cpp
// Task.hpp
#pragma once
#include
#include
#include
template
class Task {
std::shared_ptr> promise_;
std::future
future_;
public:
Task() : promise_(std::make_shared>()),
future_(promise_->get_future()) {}
// 用于协程的 Awaiter
struct Awaiter {
std::future
& fut_;
bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle h) {
std::thread([h]() {
h.resume();
}).detach();
}
T await_resume() {
return fut_.get(); // 若异常会在此抛出
}
};
Awaiter operator co_await() { return Awaiter{future_}; }
// 让外部手动设置结果
void set_value(const T& val) { promise_->set_value(val); }
void set_exception(std::exception_ptr eptr) { promise_->set_exception(eptr); }
};
template
class Task
{
std::shared_ptr> promise_;
std::future
future_;
public:
Task() : promise_(std::make_shared>()),
future_(promise_->get_future()) {}
struct Awaiter {
std::future
& fut_;
bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle h) {
std::thread([h]() { h.resume(); }).detach();
}
void await_resume() { fut_.get(); }
};
Awaiter operator co_await() { return Awaiter{future_}; }
void set_value() { promise_->set_value(); }
void set_exception(std::exception_ptr eptr) { promise_->set_exception(eptr); }
};
“`
“`cpp
// Scheduler.hpp
#pragma once
#include
#include
#include
#include
#include
#include
class IScheduler {
public:
virtual void schedule(std::function job) = 0;
virtual ~IScheduler() = default;
};
class ThreadPoolScheduler : public IScheduler {
std::queue> tasks_;
std::vector workers_;
std::mutex mtx_;
std::condition_variable cv_;
bool stop_ = false;
void worker() {
while (true) {
std::function job;
{
std::unique_lock lock(mtx_);
cv_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) return;
job = std::move(tasks_.front());
tasks_.pop();
}
job();
}
}
public:
ThreadPoolScheduler(size_t threads = std::thread::hardware_concurrency()) {
for (size_t i=0;i job) override {
{
std::lock_guard lock(mtx_);
tasks_.push(std::move(job));
}
cv_.notify_one();
}
~ThreadPoolScheduler() {
{
std::lock_guard lock(mtx_);
stop_ = true;
}
cv_.notify_all();
for (auto& t : workers_) t.join();
}
};
“`
“`cpp
// asyncify.hpp
#pragma once
#include “Task.hpp”
#include “Scheduler.hpp”
#include
#include
#include
#include
template
auto asyncify(Func f, Args&&… args) -> Task {
Task task;
IScheduler* scheduler = new ThreadPoolScheduler(); // 简化示例,实际可注入
scheduler->schedule([task, f, args…]() mutable {
try {
auto res = f(args…);
task.set_value(res);
} catch (…) {
task.set_exception(std::current_exception());
}
});
return task;
}
inline Task read_file_async(const std::string& path) {
return asyncify([](const std::string& p) {
std::ifstream in(p);
std::stringstream ss;
ss
Task
compute_sum(int a, int b) {
co_return a + b; // 直接返回即可
}
Task
demo() {
std::string content = co_await read_file_async(“example.txt”);
std::cout