使用C++20实现异步任务调度器的设计与实现

在C++20中,协程(coroutines)被正式加入语言标准,提供了一种更为简洁、直观的方式来编写异步代码。本文将通过一个完整的示例,展示如何使用C++20协程实现一个轻量级的异步任务调度器(TaskScheduler)。该调度器可以注册普通函数、lambda表达式以及协程本身,并在事件循环中按需执行,支持任务之间的依赖关系、优先级调度以及错误处理。

1. 设计目标

  1. 轻量级:避免使用额外的线程池或操作系统级别的线程,所有任务均在单线程事件循环中执行。
  2. 通用性:既支持传统同步函数,也支持异步协程。
  3. 错误传播:任务异常应能被捕获并反馈给调用者。
  4. 优先级调度:支持任务优先级,最高优先级任务先执行。

2. 关键技术点

  • std::coroutine_handle:协程句柄,用于挂起与恢复协程。
  • std::experimental::generator:可被 co_yield 的生成器,示例中用作延时任务。
  • std::any / std::variant:统一不同任务返回类型。
  • 事件循环:采用 std::queuestd::priority_queue 存放待执行任务。

3. 代码实现

#include <iostream>
#include <coroutine>
#include <queue>
#include <functional>
#include <vector>
#include <chrono>
#include <future>
#include <exception>
#include <optional>
#include <thread>
#include <atomic>

namespace async {

// -------------------------------------
// 1. Task:包装可执行的单元
// -------------------------------------
class Task {
public:
    using PromiseType = std::coroutine_handle<>;
    using TaskFunc   = std::function<void()>;

    Task(TaskFunc f, int prio = 0)
        : func_(std::move(f)), priority_(prio), done_(false) {}

    void run() {
        if (func_ && !done_) {
            try {
                func_();
            } catch (const std::exception& e) {
                error_ = e.what();
            }
            done_ = true;
        }
    }

    bool done() const { return done_; }
    std::optional<std::string> error() const { return error_; }
    int priority() const { return priority_; }

private:
    TaskFunc func_;
    int priority_;
    bool done_;
    std::optional<std::string> error_;
};

// -------------------------------------
// 2. 任务比较器(优先级越高越先执行)
// -------------------------------------
struct TaskCmp {
    bool operator()(const Task* a, const Task* b) const {
        return a->priority() < b->priority();
    }
};

// -------------------------------------
// 3. Scheduler:事件循环
// -------------------------------------
class Scheduler {
public:
    Scheduler() : stop_(false) {}

    // 注册普通函数
    void post(const Task::TaskFunc& f, int priority = 0) {
        std::lock_guard<std::mutex> lk(mtx_);
        tasks_.emplace(new Task(f, priority));
    }

    // 注册协程
    template<typename Awaitable>
    void post_co(Awaitable&& awaitable, int priority = 0) {
        auto wrapper = [awaitable = std::forward <Awaitable>(awaitable), priority]() mutable {
            awaitable();
        };
        post(wrapper, priority);
    }

    // 事件循环
    void run() {
        while (!stop_) {
            Task* t = nullptr;
            {
                std::lock_guard<std::mutex> lk(mtx_);
                if (!tasks_.empty()) {
                    t = tasks_.top();
                    tasks_.pop();
                }
            }
            if (t) {
                t->run();
                if (t->error()) {
                    std::cerr << "Task error: " << *t->error() << std::endl;
                }
                delete t;
            } else {
                // 没有任务,睡眠一段时间
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
            }
        }
        // 清理剩余任务
        std::lock_guard<std::mutex> lk(mtx_);
        while (!tasks_.empty()) {
            delete tasks_.top();
            tasks_.pop();
        }
    }

    void stop() { stop_ = true; }

private:
    std::priority_queue<Task*, std::vector<Task*>, TaskCmp> tasks_;
    std::mutex mtx_;
    std::atomic <bool> stop_;
};

// -------------------------------------
// 4. 简单协程示例:异步睡眠
// -------------------------------------
struct Sleep {
    struct promise_type {
        std::coroutine_handle<> next_;
        std::chrono::steady_clock::time_point wake_time_;

        Sleep get_return_object() {
            return Sleep{std::coroutine_handle <promise_type>::from_promise(*this)};
        }
        std::suspend_always initial_suspend() noexcept { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void unhandled_exception() { std::terminate(); }
        void return_void() {}
    };

    std::coroutine_handle <promise_type> coro_;

    Sleep(std::coroutine_handle <promise_type> h) : coro_(h) {}
    ~Sleep() { if (coro_) coro_.destroy(); }

    bool await_ready() noexcept { return false; }
    void await_suspend(std::coroutine_handle<> awaiting) {
        coro_.promise().next_ = awaiting;
        auto now = std::chrono::steady_clock::now();
        auto until = now + std::chrono::milliseconds(1000);
        coro_.promise().wake_time_ = until;
    }
    void await_resume() noexcept {}
};

Sleep async_sleep(std::chrono::milliseconds ms) {
    struct Awaiter {
        std::chrono::milliseconds ms_;
        bool await_ready() const noexcept { return false; }
        void await_suspend(std::coroutine_handle<> h) const {
            auto until = std::chrono::steady_clock::now() + ms_;
            h.promise().next_ = h;
            h.promise().wake_time_ = until;
        }
        void await_resume() const noexcept {}
    };
    return Sleep{Awaiter{ms_}.await_suspend};
}

// -------------------------------------
// 5. 主程序
// -------------------------------------
int main() {
    async::Scheduler sched;

    // 线程1:运行调度器
    std::thread eventLoop([&sched](){ sched.run(); });

    // 线程2:提交任务
    std::thread producer([&sched](){
        // 普通任务
        sched.post([](){ std::cout << "Hello, world!\n"; }, 1);

        // lambda
        int x = 42;
        sched.post([x](){ std::cout << "Captured x = " << x << "\n"; }, 2);

        // 协程任务:异步睡眠后打印
        auto coroutineTask = []() -> std::generator <void> {
            std::cout << "Coroutine start\n";
            co_await async::async_sleep(std::chrono::milliseconds(500));
            std::cout << "Coroutine after sleep\n";
        };
        sched.post_co(coroutineTask, 3);

        // 延时任务
        std::this_thread::sleep_for(std::chrono::seconds(1));
        sched.post([](){ std::cout << "Delayed task\n"; });

        // 停止事件循环
        sched.stop();
    });

    producer.join();
    eventLoop.join();

    return 0;
}

代码说明

  1. Task
    Task 封装了一个可执行的 std::function<void()>,并记录优先级和错误信息。
  2. Scheduler
    采用 std::priority_queue 根据任务优先级排序;post() 接收普通函数,post_co() 接收协程对象。事件循环在单线程中轮询执行。
  3. Sleep
    一个最简协程实现,用于演示异步等待。它在 await_suspend 时记录唤醒时间,事件循环在此时间点重新调度。
  4. 主程序
    两个线程分别负责启动调度器和提交任务,演示了同步、异步以及延时任务的注册。

4. 性能与可扩展性

  • 单线程事件循环保证了任务之间的同步性,避免了锁竞争。
  • 通过 priority_queue 可轻松实现优先级任务。
  • 若需要支持多线程可在 Scheduler 内部添加 std::thread_pool 或使用 std::async
  • 错误处理通过 std::exception 捕获并保存在 Task 对象中,主线程可自行决定是否记录、重试或终止。

5. 小结

本文通过一个完整示例,展示了如何在C++20中使用协程实现轻量级异步任务调度器。该调度器支持同步函数、协程以及优先级调度,并提供了错误传播机制。借助C++20的新特性,代码保持简洁,同时实现了高效、可维护的异步逻辑。

发表评论