在C++20中,协程(coroutines)被正式加入语言标准,提供了一种更为简洁、直观的方式来编写异步代码。本文将通过一个完整的示例,展示如何使用C++20协程实现一个轻量级的异步任务调度器(TaskScheduler)。该调度器可以注册普通函数、lambda表达式以及协程本身,并在事件循环中按需执行,支持任务之间的依赖关系、优先级调度以及错误处理。
1. 设计目标
- 轻量级:避免使用额外的线程池或操作系统级别的线程,所有任务均在单线程事件循环中执行。
- 通用性:既支持传统同步函数,也支持异步协程。
- 错误传播:任务异常应能被捕获并反馈给调用者。
- 优先级调度:支持任务优先级,最高优先级任务先执行。
2. 关键技术点
std::coroutine_handle:协程句柄,用于挂起与恢复协程。std::experimental::generator:可被co_yield的生成器,示例中用作延时任务。std::any/std::variant:统一不同任务返回类型。- 事件循环:采用
std::queue或std::priority_queue存放待执行任务。
3. 代码实现
#include <iostream>
#include <coroutine>
#include <queue>
#include <functional>
#include <vector>
#include <chrono>
#include <future>
#include <exception>
#include <optional>
#include <thread>
#include <atomic>
namespace async {
// -------------------------------------
// 1. Task:包装可执行的单元
// -------------------------------------
class Task {
public:
using PromiseType = std::coroutine_handle<>;
using TaskFunc = std::function<void()>;
Task(TaskFunc f, int prio = 0)
: func_(std::move(f)), priority_(prio), done_(false) {}
void run() {
if (func_ && !done_) {
try {
func_();
} catch (const std::exception& e) {
error_ = e.what();
}
done_ = true;
}
}
bool done() const { return done_; }
std::optional<std::string> error() const { return error_; }
int priority() const { return priority_; }
private:
TaskFunc func_;
int priority_;
bool done_;
std::optional<std::string> error_;
};
// -------------------------------------
// 2. 任务比较器(优先级越高越先执行)
// -------------------------------------
struct TaskCmp {
bool operator()(const Task* a, const Task* b) const {
return a->priority() < b->priority();
}
};
// -------------------------------------
// 3. Scheduler:事件循环
// -------------------------------------
class Scheduler {
public:
Scheduler() : stop_(false) {}
// 注册普通函数
void post(const Task::TaskFunc& f, int priority = 0) {
std::lock_guard<std::mutex> lk(mtx_);
tasks_.emplace(new Task(f, priority));
}
// 注册协程
template<typename Awaitable>
void post_co(Awaitable&& awaitable, int priority = 0) {
auto wrapper = [awaitable = std::forward <Awaitable>(awaitable), priority]() mutable {
awaitable();
};
post(wrapper, priority);
}
// 事件循环
void run() {
while (!stop_) {
Task* t = nullptr;
{
std::lock_guard<std::mutex> lk(mtx_);
if (!tasks_.empty()) {
t = tasks_.top();
tasks_.pop();
}
}
if (t) {
t->run();
if (t->error()) {
std::cerr << "Task error: " << *t->error() << std::endl;
}
delete t;
} else {
// 没有任务,睡眠一段时间
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
// 清理剩余任务
std::lock_guard<std::mutex> lk(mtx_);
while (!tasks_.empty()) {
delete tasks_.top();
tasks_.pop();
}
}
void stop() { stop_ = true; }
private:
std::priority_queue<Task*, std::vector<Task*>, TaskCmp> tasks_;
std::mutex mtx_;
std::atomic <bool> stop_;
};
// -------------------------------------
// 4. 简单协程示例:异步睡眠
// -------------------------------------
struct Sleep {
struct promise_type {
std::coroutine_handle<> next_;
std::chrono::steady_clock::time_point wake_time_;
Sleep get_return_object() {
return Sleep{std::coroutine_handle <promise_type>::from_promise(*this)};
}
std::suspend_always initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void unhandled_exception() { std::terminate(); }
void return_void() {}
};
std::coroutine_handle <promise_type> coro_;
Sleep(std::coroutine_handle <promise_type> h) : coro_(h) {}
~Sleep() { if (coro_) coro_.destroy(); }
bool await_ready() noexcept { return false; }
void await_suspend(std::coroutine_handle<> awaiting) {
coro_.promise().next_ = awaiting;
auto now = std::chrono::steady_clock::now();
auto until = now + std::chrono::milliseconds(1000);
coro_.promise().wake_time_ = until;
}
void await_resume() noexcept {}
};
Sleep async_sleep(std::chrono::milliseconds ms) {
struct Awaiter {
std::chrono::milliseconds ms_;
bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<> h) const {
auto until = std::chrono::steady_clock::now() + ms_;
h.promise().next_ = h;
h.promise().wake_time_ = until;
}
void await_resume() const noexcept {}
};
return Sleep{Awaiter{ms_}.await_suspend};
}
// -------------------------------------
// 5. 主程序
// -------------------------------------
int main() {
async::Scheduler sched;
// 线程1:运行调度器
std::thread eventLoop([&sched](){ sched.run(); });
// 线程2:提交任务
std::thread producer([&sched](){
// 普通任务
sched.post([](){ std::cout << "Hello, world!\n"; }, 1);
// lambda
int x = 42;
sched.post([x](){ std::cout << "Captured x = " << x << "\n"; }, 2);
// 协程任务:异步睡眠后打印
auto coroutineTask = []() -> std::generator <void> {
std::cout << "Coroutine start\n";
co_await async::async_sleep(std::chrono::milliseconds(500));
std::cout << "Coroutine after sleep\n";
};
sched.post_co(coroutineTask, 3);
// 延时任务
std::this_thread::sleep_for(std::chrono::seconds(1));
sched.post([](){ std::cout << "Delayed task\n"; });
// 停止事件循环
sched.stop();
});
producer.join();
eventLoop.join();
return 0;
}
代码说明
- Task
Task封装了一个可执行的std::function<void()>,并记录优先级和错误信息。 - Scheduler
采用std::priority_queue根据任务优先级排序;post()接收普通函数,post_co()接收协程对象。事件循环在单线程中轮询执行。 - Sleep
一个最简协程实现,用于演示异步等待。它在await_suspend时记录唤醒时间,事件循环在此时间点重新调度。 - 主程序
两个线程分别负责启动调度器和提交任务,演示了同步、异步以及延时任务的注册。
4. 性能与可扩展性
- 单线程事件循环保证了任务之间的同步性,避免了锁竞争。
- 通过
priority_queue可轻松实现优先级任务。 - 若需要支持多线程可在
Scheduler内部添加std::thread_pool或使用std::async。 - 错误处理通过
std::exception捕获并保存在Task对象中,主线程可自行决定是否记录、重试或终止。
5. 小结
本文通过一个完整示例,展示了如何在C++20中使用协程实现轻量级异步任务调度器。该调度器支持同步函数、协程以及优先级调度,并提供了错误传播机制。借助C++20的新特性,代码保持简洁,同时实现了高效、可维护的异步逻辑。