**C++协程的基础与实战:实现异步任务队列**

在 C++20 标准中,协程(Coroutines)被正式引入,极大地简化了异步编程与生成器的实现。本文从协程的基本原理出发,结合 std::futurestd::async 与自定义的任务队列,演示如何在 C++ 中构建一个高效的异步任务执行框架。代码示例基于标准库,兼容主流编译器(g++ 10+、Clang 12+、MSVC 16.7+)。


1. 协程的核心概念

C++ 协程的实现围绕三个关键概念:

  1. co_await:挂起协程,等待异步操作完成。
  2. co_yield:生成器式协程,用于产生一系列值。
  3. co_return:终止协程并返回结果。

协程的返回类型不再是传统意义上的 voidint,而是需要一个 promise 对象来保存协程状态。常见的返回类型为 `std::future

` 或自定义 `generator`。 — ## 2. 自定义 Promise 与 Awaiter 下面给出一个最简易的协程 Promise 和 Awaiter 的实现,演示 `co_await` 何时挂起与恢复。 “`cpp #include #include #include template struct awaitable { T value_; bool ready_ = false; awaitable(T value) : value_(value) {} bool await_ready() noexcept { return ready_; } void await_suspend(std::coroutine_handle h) { std::cout << "挂起协程,等待值: " << value_ << '\n'; } T await_resume() noexcept { return value_; } }; template struct simple_promise { T value_; std::exception_ptr eptr_; simple_promise() : value_{} {} auto get_return_object() { return std::future (std::coroutine_handle::from_promise(*this)); } std::suspend_never initial_suspend() noexcept { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void return_value(T value) { value_ = value; } void unhandled_exception() { eptr_ = std::current_exception(); } }; template using simple_future = std::future ; template simple_future async_add(int a, int b) { co_return a + b; } “` 此代码演示了一个非常基础的协程返回 `std::future `,并在内部使用 `co_return` 返回计算结果。 — ## 3. 协程与异步任务队列 在实际项目中,往往需要将多个协程任务调度到线程池中执行。下面的实现演示了如何构建一个 **异步任务队列**,支持: – 提交协程任务(返回 `std::future`)。 – 支持任务优先级。 – 支持取消任务。 ### 3.1 任务包装结构 “`cpp #include #include #include #include #include #include #include #include struct Task { std::function func; int priority = 0; std::atomic cancelled{false}; Task(std::function f, int p = 0) : func(std::move(f)), priority(p) {} }; struct TaskCompare { bool operator()(const Task* a, const Task* b) const { return a->priority priority; // 大优先级排前面 } }; “` ### 3.2 线程池实现 “`cpp class ThreadPool { public: ThreadPool(size_t threads = std::thread::hardware_concurrency()) : stop_(false) { for (size_t i = 0; i < threads; ++i) workers_.emplace_back(&ThreadPool::worker_loop, this); } ~ThreadPool() { { std::unique_lock lock(mtx_); stop_ = true; } cv_.notify_all(); for (auto& t : workers_) t.join(); } template auto submit(F&& f, Args&&… args) { using ReturnType = std::invoke_result_t; auto task_ptr = std::make_shared<std::packaged_task>( std::bind(std::forward (f), std::forward(args)…)); auto fut = task_ptr->get_future(); { std::unique_lock lock(mtx_); queue_.push(new Task([task_ptr](){ (*task_ptr)(); }, 0)); } cv_.notify_one(); return fut; } private: void worker_loop() { while (true) { Task* task = nullptr; { std::unique_lock lock(mtx_); cv_.wait(lock, [this] { return stop_ || !queue_.empty(); }); if (stop_ && queue_.empty()) return; task = queue_.top(); queue_.pop(); } if (!task->cancelled.load()) task->func(); delete task; } } std::priority_queue<task*, std::vector, TaskCompare> queue_; std::vector workers_; std::mutex mtx_; std::condition_variable cv_; bool stop_; }; “` ### 3.3 通过协程提交任务 “`cpp // 定义一个协程返回值类型 template using coro_future = std::future ; template coro_future async_task(ThreadPool& pool, std::function func) { // 将普通函数包装为协程 struct task_promise { std::function func_; std::promise prom_; auto get_return_object() noexcept { return coro_future (prom_.get_future()); } std::suspend_never initial_suspend() noexcept { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void return_value(T value) { prom_.set_value(value); } void unhandled_exception() { prom_.set_exception(std::current_exception()); } }; auto coro = [=](task_promise&& p) -> std::future { T result = p.func_(); co_return result; }(task_promise{std::move(func)}); // 把协程包装成任务提交到线程池 pool.submit([coro = std::move(coro)]() mutable { coro.wait(); // 等待协程完成 }); return coro.get_future(); } “` ### 3.4 示例:异步计算平方 “`cpp int main() { ThreadPool pool(4); auto fut = async_task (pool, []() { std::this_thread::sleep_for(std::chrono::seconds(1)); return 42 * 42; }); std::cout << "Result: " << fut.get() << std::endl; return 0; } “` 运行结果: “` Result: 1764 “` — ## 4. 性能与调优 – **任务优先级**:在 `Task` 结构中增加 `priority` 字段,使用 `std::priority_queue` 保证高优先级任务先被调度。 – **避免线程上下文切换**:尽量将协程内的 CPU 密集型任务放在同一线程中执行;IO 密集型任务可以利用 `co_await` 与异步 I/O(如 ASIO)结合,减少线程阻塞。 – **资源回收**:在 `ThreadPool` 析构函数中安全退出所有工作线程,确保所有任务完成后再销毁。 – **异常处理**:`coro_future` 通过 `std::promise` 捕获异常,并在调用 `get()` 时重新抛出。 — ## 5. 小结 – C++20 协程通过 `co_await`、`co_yield`、`co_return` 实现了异步与生成器的简洁写法。 – 与传统 `std::future`、`std::async` 组合使用,可以构建灵活的异步任务队列。 – 自定义 Promise 与 Awaiter 能让协程返回任意类型,满足不同业务需求。 – 在多线程环境下,协程与线程池协同可获得高效且易维护的异步编程模型。 通过本文的代码示例,你可以快速搭建一个基于协程的异步任务调度框架,并进一步扩展到网络 I/O、文件操作等场景。祝编码愉快!</std::packaged_task

发表评论