### 如何在 C++17 中实现一个简单的异步任务调度器

在现代 C++ 开发中,异步任务调度成为提高程序并发性和响应性的关键手段。本文将演示如何利用 C++17 标准库中的 <thread><future><chrono> 等组件,搭建一个轻量级的任务调度器。该调度器支持:

  1. 定时执行:按照设定的周期或固定时间点执行任务。
  2. 任务优先级:简单的基于整数的优先级排序。
  3. 线程池:共享固定数量的工作线程,避免频繁创建销毁线程带来的性能损耗。

1. 基础架构

#include <iostream>
#include <queue>
#include <functional>
#include <thread>
#include <future>
#include <chrono>
#include <condition_variable>
#include <vector>
#include <atomic>
#include <optional>
  • Task:封装要执行的函数、执行时间以及优先级。
  • TaskQueue:线程安全的任务优先队列。
  • Scheduler:负责调度、线程池管理与定时执行。

2. 任务结构

struct Task {
    std::function<void()> func;
    std::chrono::steady_clock::time_point execute_at;
    int priority; // 低值优先

    bool operator<(const Task& other) const {
        if (execute_at == other.execute_at)
            return priority > other.priority; // 低值优先
        return execute_at > other.execute_at; // 早先时间优先
    }
};

3. 线程安全优先队列

class TaskQueue {
public:
    void push(Task t) {
        std::lock_guard<std::mutex> lock(m_);
        q_.push(std::move(t));
        cv_.notify_one();
    }

    std::optional <Task> pop() {
        std::unique_lock<std::mutex> lock(m_);
        while (q_.empty() && !stop_) cv_.wait(lock);
        if (stop_ && q_.empty()) return std::nullopt;
        Task t = q_.top();
        q_.pop();
        return t;
    }

    void stop() {
        std::lock_guard<std::mutex> lock(m_);
        stop_ = true;
        cv_.notify_all();
    }

private:
    std::priority_queue <Task> q_;
    std::mutex m_;
    std::condition_variable cv_;
    bool stop_{false};
};

4. Scheduler 实现

class Scheduler {
public:
    explicit Scheduler(size_t worker_count = std::thread::hardware_concurrency())
        : workers_(worker_count) {
        for (auto& w : workers_) {
            w = std::thread([this] { worker_loop(); });
        }
    }

    ~Scheduler() {
        task_queue_.stop();
        for (auto& w : workers_) {
            if (w.joinable()) w.join();
        }
    }

    // 立即执行
    void schedule(std::function<void()> f, int priority = 0) {
        task_queue_.push(Task{std::move(f),
                              std::chrono::steady_clock::now(),
                              priority});
    }

    // 延迟执行
    void schedule_at(std::function<void()> f,
                     std::chrono::steady_clock::time_point when,
                     int priority = 0) {
        task_queue_.push(Task{std::move(f), when, priority});
    }

    // 周期性执行
    void schedule_every(std::function<void()> f,
                        std::chrono::milliseconds interval,
                        int priority = 0) {
        auto next = std::chrono::steady_clock::now() + interval;
        std::function<void()> wrapper;
        wrapper = [=, &wrapper, interval, f]() mutable {
            f();
            schedule_at(wrapper, next + interval, priority);
        };
        schedule_at(wrapper, next, priority);
    }

private:
    void worker_loop() {
        while (true) {
            auto opt_task = task_queue_.pop();
            if (!opt_task) break; // 队列已停止
            auto now = std::chrono::steady_clock::now();
            if (opt_task->execute_at > now) {
                std::this_thread::sleep_until(opt_task->execute_at);
            }
            opt_task->func();
        }
    }

    TaskQueue task_queue_;
    std::vector<std::thread> workers_;
};

5. 使用示例

int main() {
    Scheduler sched(4); // 4 个工作线程

    // 立即执行
    sched.schedule([] { std::cout << "立即任务 1\n"; }, 1);

    // 延迟 2 秒执行
    sched.schedule_at([] { std::cout << "延迟任务 2\n"; }, 
                      std::chrono::steady_clock::now() + std::chrono::seconds(2), 0);

    // 每隔 1 秒打印一次
    sched.schedule_every([] {
        static int cnt = 0;
        std::cout << "周期任务 " << ++cnt << "\n";
    }, std::chrono::milliseconds(1000), 2);

    // 主线程休眠 5 秒后结束,Scheduler 会在析构时等待线程退出
    std::this_thread::sleep_for(std::chrono::seconds(5));
    return 0;
}

6. 性能与扩展

  • 线程池大小:默认使用硬件线程数,若需要更多并发可自行调整。
  • 任务类型:当前仅支持无返回值的 void() 函数。若需要返回值,可结合 std::futurestd::packaged_task
  • 异常处理:当前实现忽略任务内部异常。可在 worker_loop 中使用 try-catch 捕获并记录日志。
  • 优先级实现:优先级仅是简单整数,实际项目中可结合工作负载特性细化。

7. 结语

通过上述代码,即可在 C++17 环境下快速搭建一个轻量级、可维护的异步任务调度器。它兼顾了定时、优先级与线程池三大核心需求,既可用于后台任务调度,也适合嵌入到实时系统或网络服务器中。你可以根据具体业务进一步扩展功能,例如添加任务取消、动态调整线程池大小或支持异步回调等。祝你编码愉快!

发表评论