**标题:在 C++20 中实现通用的异步任务调度框架**

1. 需求背景

在现代 C++ 开发中,异步编程已成为提高并发性能的关键手段。常见的做法有 std::asyncstd::thread、第三方库 boost::asiolibuv 等。但它们往往只满足单一使用场景,缺乏灵活性、易用性和可扩展性。本文旨在展示如何使用 C++20 的协程(co_awaitco_return)和 `

` 库,构建一个轻量、可组合、通用的异步任务调度框架。 ## 2. 核心概念 | 概念 | 说明 | |——|——| | **任务(Task)** | 代表一次可异步执行的操作,返回值或异常。使用 `std::future` 或自定义 `Task ` 包装。 | | **调度器(Scheduler)** | 负责把任务投递到线程池、事件循环或特定线程。 | | **协程适配器(CoroutineAdapter)** | 把协程转化为 `Task`,允许在协程中使用 `co_await` 调用异步函数。 | | **线程池(ThreadPool)** | 基础执行单元,管理固定数量的工作线程。 | ## 3. 设计思路 1. **Task ** – 通过 `std::shared_ptr>` 与 `std::future` 组合实现。 – 提供 `co_await` 接口:内部实现 `operator co_await()`,返回 `Awaiter`,实现 `await_ready()`、`await_suspend()`、`await_resume()`。 2. **Scheduler** – 抽象基类 `IScheduler`,接口 `schedule(TaskBase& task)`。 – 具体实现 `ThreadPoolScheduler`:使用 `std::thread` 和 `std::queue` 管理任务。 – 另一个实现 `SingleThreadScheduler`:在单线程中顺序执行,适合 UI 线程。 3. **协程适配器** – `asyncify` 函数:将普通函数包装为 `Task `。 – 示例:`Task read_file_async(const std::string& path);` 4. **错误传播** – `Task` 内部捕获异常,并将其存入 `std::promise`,随后 `co_await` 时通过 `std::future::get()` 重新抛出。 ## 4. 核心代码 “`cpp // Task.hpp #pragma once #include #include #include template class Task { std::shared_ptr> promise_; std::future future_; public: Task() : promise_(std::make_shared>()), future_(promise_->get_future()) {} // 用于协程的 Awaiter struct Awaiter { std::future & fut_; bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle h) { std::thread([h]() { h.resume(); }).detach(); } T await_resume() { return fut_.get(); // 若异常会在此抛出 } }; Awaiter operator co_await() { return Awaiter{future_}; } // 让外部手动设置结果 void set_value(const T& val) { promise_->set_value(val); } void set_exception(std::exception_ptr eptr) { promise_->set_exception(eptr); } }; template class Task { std::shared_ptr> promise_; std::future future_; public: Task() : promise_(std::make_shared>()), future_(promise_->get_future()) {} struct Awaiter { std::future & fut_; bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle h) { std::thread([h]() { h.resume(); }).detach(); } void await_resume() { fut_.get(); } }; Awaiter operator co_await() { return Awaiter{future_}; } void set_value() { promise_->set_value(); } void set_exception(std::exception_ptr eptr) { promise_->set_exception(eptr); } }; “` “`cpp // Scheduler.hpp #pragma once #include #include #include #include #include #include class IScheduler { public: virtual void schedule(std::function job) = 0; virtual ~IScheduler() = default; }; class ThreadPoolScheduler : public IScheduler { std::queue> tasks_; std::vector workers_; std::mutex mtx_; std::condition_variable cv_; bool stop_ = false; void worker() { while (true) { std::function job; { std::unique_lock lock(mtx_); cv_.wait(lock, [this] { return stop_ || !tasks_.empty(); }); if (stop_ && tasks_.empty()) return; job = std::move(tasks_.front()); tasks_.pop(); } job(); } } public: ThreadPoolScheduler(size_t threads = std::thread::hardware_concurrency()) { for (size_t i=0;i job) override { { std::lock_guard lock(mtx_); tasks_.push(std::move(job)); } cv_.notify_one(); } ~ThreadPoolScheduler() { { std::lock_guard lock(mtx_); stop_ = true; } cv_.notify_all(); for (auto& t : workers_) t.join(); } }; “` “`cpp // asyncify.hpp #pragma once #include “Task.hpp” #include “Scheduler.hpp” #include #include #include #include template auto asyncify(Func f, Args&&… args) -> Task { Task task; IScheduler* scheduler = new ThreadPoolScheduler(); // 简化示例,实际可注入 scheduler->schedule([task, f, args…]() mutable { try { auto res = f(args…); task.set_value(res); } catch (…) { task.set_exception(std::current_exception()); } }); return task; } inline Task read_file_async(const std::string& path) { return asyncify([](const std::string& p) { std::ifstream in(p); std::stringstream ss; ss Task compute_sum(int a, int b) { co_return a + b; // 直接返回即可 } Task demo() { std::string content = co_await read_file_async(“example.txt”); std::cout

发表评论