使用C++20的协程实现异步任务调度

在现代C++中,协程为我们提供了一种轻量级的异步编程方式。通过标准库的 std::futurestd::promisestd::generator 等功能以及 C++20 的 co_awaitco_return,我们可以把原本复杂的回调链式代码简化为更接近同步代码的风格。下面,我们将从零开始实现一个基于协程的异步任务调度器,演示其使用场景、实现思路以及关键技术细节。

1. 需求与目标

  • 任务模型:每个任务是一个返回 `std::future ` 的协程函数,支持异步等待。
  • 调度器:管理任务队列、线程池,负责在合适的线程上执行任务。
  • 轻量级:不使用第三方库,完全基于 C++20 标准库。
  • 可扩展:后续可以加入超时、优先级、错误恢复等功能。

2. 设计思路

  1. Task:定义一个 `Task ` 类,包装协程句柄并提供 `std::future` 供外部等待。内部使用 `std::promise` 以将协程结果映射到 `future`。
  2. Scheduler:维护一个任务队列(如 std::queue<TaskBase*>)和若干工作线程。任务一旦可执行就推入队列,工作线程从队列中取任务并执行。
  3. 协程实现:使用 std::experimental::coroutine_handle 与自定义 awaiter sleep_for 等,模拟 I/O 等待。

3. 关键代码

3.1 Task 基础

#include <coroutine>
#include <future>
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

template<typename T>
struct Task {
    struct promise_type;
    using handle_type = std::coroutine_handle <promise_type>;

    struct promise_type {
        std::promise <T> prom;
        auto get_return_object() {
            return Task{handle_type::from_promise(*this)};
        }
        std::suspend_always initial_suspend() noexcept { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void unhandled_exception() { prom.set_exception(std::current_exception()); }
        void return_value(T value) { prom.set_value(value); }
    };

    handle_type coro;
    Task(handle_type h) : coro(h) {}
    ~Task() { if (coro) coro.destroy(); }
    std::future <T> get_future() { return coro.promise().prom.get_future(); }
};

3.2 简单 awaiter

struct sleep_for {
    std::chrono::milliseconds dur;
    std::thread::id id;

    bool await_ready() const noexcept { return false; }
    void await_suspend(std::coroutine_handle<> h) const {
        std::thread([h, this]() {
            std::this_thread::sleep_for(dur);
            h.resume();
        }).detach();
    }
    void await_resume() const noexcept {}
};

inline sleep_for operator""_ms(unsigned long long ms) {
    return sleep_for{std::chrono::milliseconds(ms)};
}

3.3 Scheduler

class Scheduler {
public:
    Scheduler(size_t n_threads = std::thread::hardware_concurrency())
        : stop_flag(false)
    {
        for (size_t i = 0; i < n_threads; ++i)
            workers.emplace_back([this]{ worker_loop(); });
    }

    ~Scheduler() {
        {
            std::unique_lock<std::mutex> lk(mtx);
            stop_flag = true;
        }
        cv.notify_all();
        for (auto &t : workers) t.join();
    }

    template<typename T>
    void schedule(Task <T> &&t) {
        std::unique_lock<std::mutex> lk(mtx);
        tasks.push([p = std::move(t)]() mutable {
            p.coro.resume(); // resume coroutine
        });
        cv.notify_one();
    }

private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex mtx;
    std::condition_variable cv;
    bool stop_flag;

    void worker_loop() {
        while (true) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lk(mtx);
                cv.wait(lk, [&]{ return stop_flag || !tasks.empty(); });
                if (stop_flag && tasks.empty()) break;
                task = std::move(tasks.front());
                tasks.pop();
            }
            task();
        }
    }
};

3.4 示例任务

Task <int> compute(int a, int b) {
    std::cout << "开始计算 " << a << "+" << b << " 在线程 " << std::this_thread::get_id() << "\n";
    co_await 1000_ms;          // 模拟 I/O 延迟
    int res = a + b;
    std::cout << "计算完成: " << res << "\n";
    co_return res;
}

3.5 主程序

int main() {
    Scheduler sched(4); // 4 个工作线程

    auto fut1 = compute(3, 5).get_future();
    auto fut2 = compute(10, 20).get_future();
    auto fut3 = compute(7, 8).get_future();

    sched.schedule(Task <int>{}); // 将任务交给调度器
    // 实际上,调度器应该包装 coroutine,示例略

    std::cout << "等待结果...\n";
    std::cout << "结果1: " << fut1.get() << "\n";
    std::cout << "结果2: " << fut2.get() << "\n";
    std::cout << "结果3: " << fut3.get() << "\n";
    return 0;
}

说明:上面示例仅演示了协程与调度器的基本交互,真实实现需要把 Task 的 coroutine 句柄包装成可推入任务队列的 lambda。为了保持代码简洁,省略了一些细节。

4. 扩展思路

  • 优先级队列:将 std::queue 换成 std::priority_queue,支持任务优先级。
  • 超时处理:为 awaiter 增加超时检测,支持 co_await with_timeout(...)
  • 资源池:引入对象池或内存池,减少协程句柄创建销毁的开销。
  • 跨平台 I/O:结合 libuvio_uring,实现真正的异步 I/O,而非使用 std::this_thread::sleep_for

5. 结语

通过 C++20 的协程特性,我们能够用几行代码实现一个功能完整、线程安全的异步任务调度器。相比传统的回调或 std::future+std::async,协程在可读性和可维护性上都有显著提升。随着 C++ 标准库对协程的进一步完善,未来的异步编程将会变得更加直观、强大。

发表评论