在现代C++中,协程为我们提供了一种轻量级的异步编程方式。通过标准库的 std::future、std::promise、std::generator 等功能以及 C++20 的 co_await、co_return,我们可以把原本复杂的回调链式代码简化为更接近同步代码的风格。下面,我们将从零开始实现一个基于协程的异步任务调度器,演示其使用场景、实现思路以及关键技术细节。
1. 需求与目标
- 任务模型:每个任务是一个返回 `std::future ` 的协程函数,支持异步等待。
- 调度器:管理任务队列、线程池,负责在合适的线程上执行任务。
- 轻量级:不使用第三方库,完全基于 C++20 标准库。
- 可扩展:后续可以加入超时、优先级、错误恢复等功能。
2. 设计思路
- Task:定义一个 `Task ` 类,包装协程句柄并提供 `std::future` 供外部等待。内部使用 `std::promise` 以将协程结果映射到 `future`。
- Scheduler:维护一个任务队列(如
std::queue<TaskBase*>)和若干工作线程。任务一旦可执行就推入队列,工作线程从队列中取任务并执行。 - 协程实现:使用
std::experimental::coroutine_handle与自定义 awaitersleep_for等,模拟 I/O 等待。
3. 关键代码
3.1 Task 基础
#include <coroutine>
#include <future>
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
template<typename T>
struct Task {
struct promise_type;
using handle_type = std::coroutine_handle <promise_type>;
struct promise_type {
std::promise <T> prom;
auto get_return_object() {
return Task{handle_type::from_promise(*this)};
}
std::suspend_always initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void unhandled_exception() { prom.set_exception(std::current_exception()); }
void return_value(T value) { prom.set_value(value); }
};
handle_type coro;
Task(handle_type h) : coro(h) {}
~Task() { if (coro) coro.destroy(); }
std::future <T> get_future() { return coro.promise().prom.get_future(); }
};
3.2 简单 awaiter
struct sleep_for {
std::chrono::milliseconds dur;
std::thread::id id;
bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<> h) const {
std::thread([h, this]() {
std::this_thread::sleep_for(dur);
h.resume();
}).detach();
}
void await_resume() const noexcept {}
};
inline sleep_for operator""_ms(unsigned long long ms) {
return sleep_for{std::chrono::milliseconds(ms)};
}
3.3 Scheduler
class Scheduler {
public:
Scheduler(size_t n_threads = std::thread::hardware_concurrency())
: stop_flag(false)
{
for (size_t i = 0; i < n_threads; ++i)
workers.emplace_back([this]{ worker_loop(); });
}
~Scheduler() {
{
std::unique_lock<std::mutex> lk(mtx);
stop_flag = true;
}
cv.notify_all();
for (auto &t : workers) t.join();
}
template<typename T>
void schedule(Task <T> &&t) {
std::unique_lock<std::mutex> lk(mtx);
tasks.push([p = std::move(t)]() mutable {
p.coro.resume(); // resume coroutine
});
cv.notify_one();
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex mtx;
std::condition_variable cv;
bool stop_flag;
void worker_loop() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, [&]{ return stop_flag || !tasks.empty(); });
if (stop_flag && tasks.empty()) break;
task = std::move(tasks.front());
tasks.pop();
}
task();
}
}
};
3.4 示例任务
Task <int> compute(int a, int b) {
std::cout << "开始计算 " << a << "+" << b << " 在线程 " << std::this_thread::get_id() << "\n";
co_await 1000_ms; // 模拟 I/O 延迟
int res = a + b;
std::cout << "计算完成: " << res << "\n";
co_return res;
}
3.5 主程序
int main() {
Scheduler sched(4); // 4 个工作线程
auto fut1 = compute(3, 5).get_future();
auto fut2 = compute(10, 20).get_future();
auto fut3 = compute(7, 8).get_future();
sched.schedule(Task <int>{}); // 将任务交给调度器
// 实际上,调度器应该包装 coroutine,示例略
std::cout << "等待结果...\n";
std::cout << "结果1: " << fut1.get() << "\n";
std::cout << "结果2: " << fut2.get() << "\n";
std::cout << "结果3: " << fut3.get() << "\n";
return 0;
}
说明:上面示例仅演示了协程与调度器的基本交互,真实实现需要把
Task的 coroutine 句柄包装成可推入任务队列的 lambda。为了保持代码简洁,省略了一些细节。
4. 扩展思路
- 优先级队列:将
std::queue换成std::priority_queue,支持任务优先级。 - 超时处理:为 awaiter 增加超时检测,支持
co_await with_timeout(...)。 - 资源池:引入对象池或内存池,减少协程句柄创建销毁的开销。
- 跨平台 I/O:结合
libuv或io_uring,实现真正的异步 I/O,而非使用std::this_thread::sleep_for。
5. 结语
通过 C++20 的协程特性,我们能够用几行代码实现一个功能完整、线程安全的异步任务调度器。相比传统的回调或 std::future+std::async,协程在可读性和可维护性上都有显著提升。随着 C++ 标准库对协程的进一步完善,未来的异步编程将会变得更加直观、强大。