**C++协程的基础与实战:实现异步任务队列**

在 C++20 标准中,协程(Coroutines)被正式引入,极大地简化了异步编程与生成器的实现。本文从协程的基本原理出发,结合 std::futurestd::async 与自定义的任务队列,演示如何在 C++ 中构建一个高效的异步任务执行框架。代码示例基于标准库,兼容主流编译器(g++ 10+、Clang 12+、MSVC 16.7+)。


1. 协程的核心概念

C++ 协程的实现围绕三个关键概念:

  1. co_await:挂起协程,等待异步操作完成。
  2. co_yield:生成器式协程,用于产生一系列值。
  3. co_return:终止协程并返回结果。

协程的返回类型不再是传统意义上的 voidint,而是需要一个 promise 对象来保存协程状态。常见的返回类型为 `std::future

` 或自定义 `generator`。 — ## 2. 自定义 Promise 与 Awaiter 下面给出一个最简易的协程 Promise 和 Awaiter 的实现,演示 `co_await` 何时挂起与恢复。 “`cpp #include #include #include template struct awaitable { T value_; bool ready_ = false; awaitable(T value) : value_(value) {} bool await_ready() noexcept { return ready_; } void await_suspend(std::coroutine_handle h) { std::cout struct simple_promise { T value_; std::exception_ptr eptr_; simple_promise() : value_{} {} auto get_return_object() { return std::future (std::coroutine_handle::from_promise(*this)); } std::suspend_never initial_suspend() noexcept { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void return_value(T value) { value_ = value; } void unhandled_exception() { eptr_ = std::current_exception(); } }; template using simple_future = std::future ; template simple_future async_add(int a, int b) { co_return a + b; } “` 此代码演示了一个非常基础的协程返回 `std::future `,并在内部使用 `co_return` 返回计算结果。 — ## 3. 协程与异步任务队列 在实际项目中,往往需要将多个协程任务调度到线程池中执行。下面的实现演示了如何构建一个 **异步任务队列**,支持: – 提交协程任务(返回 `std::future`)。 – 支持任务优先级。 – 支持取消任务。 ### 3.1 任务包装结构 “`cpp #include #include #include #include #include #include #include #include struct Task { std::function func; int priority = 0; std::atomic cancelled{false}; Task(std::function f, int p = 0) : func(std::move(f)), priority(p) {} }; struct TaskCompare { bool operator()(const Task* a, const Task* b) const { return a->priority priority; // 大优先级排前面 } }; “` ### 3.2 线程池实现 “`cpp class ThreadPool { public: ThreadPool(size_t threads = std::thread::hardware_concurrency()) : stop_(false) { for (size_t i = 0; i lock(mtx_); stop_ = true; } cv_.notify_all(); for (auto& t : workers_) t.join(); } template auto submit(F&& f, Args&&… args) { using ReturnType = std::invoke_result_t; auto task_ptr = std::make_shared>( std::bind(std::forward (f), std::forward(args)…)); auto fut = task_ptr->get_future(); { std::unique_lock lock(mtx_); queue_.push(new Task([task_ptr](){ (*task_ptr)(); }, 0)); } cv_.notify_one(); return fut; } private: void worker_loop() { while (true) { Task* task = nullptr; { std::unique_lock lock(mtx_); cv_.wait(lock, [this] { return stop_ || !queue_.empty(); }); if (stop_ && queue_.empty()) return; task = queue_.top(); queue_.pop(); } if (!task->cancelled.load()) task->func(); delete task; } } std::priority_queue, TaskCompare> queue_; std::vector workers_; std::mutex mtx_; std::condition_variable cv_; bool stop_; }; “` ### 3.3 通过协程提交任务 “`cpp // 定义一个协程返回值类型 template using coro_future = std::future ; template coro_future async_task(ThreadPool& pool, std::function func) { // 将普通函数包装为协程 struct task_promise { std::function func_; std::promise prom_; auto get_return_object() noexcept { return coro_future (prom_.get_future()); } std::suspend_never initial_suspend() noexcept { return {}; } std::suspend_never final_suspend() noexcept { return {}; } void return_value(T value) { prom_.set_value(value); } void unhandled_exception() { prom_.set_exception(std::current_exception()); } }; auto coro = [=](task_promise&& p) -> std::future { T result = p.func_(); co_return result; }(task_promise{std::move(func)}); // 把协程包装成任务提交到线程池 pool.submit([coro = std::move(coro)]() mutable { coro.wait(); // 等待协程完成 }); return coro.get_future(); } “` ### 3.4 示例:异步计算平方 “`cpp int main() { ThreadPool pool(4); auto fut = async_task (pool, []() { std::this_thread::sleep_for(std::chrono::seconds(1)); return 42 * 42; }); std::cout

发表评论