如何在C++中实现自定义的多线程任务调度器

在现代软件开发中,异步执行和任务并发已成为不可或缺的技术。C++ 标准库提供了 std::threadstd::futurestd::async 等工具,但如果你需要更细粒度的控制,或者想要一个可插拔、可扩展的任务调度框架,就需要自己实现一个自定义的多线程任务调度器。下面将从设计原则、核心组件、实现细节以及性能调优等方面,系统性地阐述如何在 C++ 中实现一个轻量、易用且高效的任务调度器。


1. 设计目标

目标 说明
低开销 线程池大小可配置,避免频繁创建/销毁线程。
高并发 支持数千个任务的并发排队与执行。
可扩展性 任务可以通过回调、模板、函数对象等多种方式提交。
容错性 任务异常不影响调度器整体运行,异常信息可查询。
优先级调度 支持任务优先级或延迟执行。
生命周期管理 线程池启动、停止、等待全部任务完成等功能。

2. 核心组件

  1. Task(任务)
    任务是一个可调用对象,包装了要执行的函数、参数以及元数据(如优先级、延迟时间、提交时间)。

    struct Task {
        std::function<void()> func;
        std::chrono::steady_clock::time_point execute_at;
        int priority; // 可选
    };
  2. TaskQueue(任务队列)
    用于存放待执行的任务。典型实现是使用 std::priority_queuestd::deque。为支持延迟执行,需要根据 execute_at 字段进行排序。

    struct TaskCompare {
        bool operator()(const Task& a, const Task& b) const {
            return a.execute_at > b.execute_at; // 未来时间靠前
        }
    };
    std::priority_queue<Task, std::vector<Task>, TaskCompare> queue_;
  3. Worker(工作线程)
    线程池中的工作线程循环从 TaskQueue 取出任务并执行。线程在空闲时阻塞,使用 std::condition_variable 等待新的任务或停止信号。

  4. Scheduler(调度器)
    提供接口:submit(), start(), shutdown(), wait_for_all() 等。管理线程池、任务队列和同步原语。


3. 关键实现细节

3.1 线程安全

  • TaskQueue 需要在多线程环境下操作,必须使用互斥量保护:std::mutex queue_mutex_;
  • 条件变量 std::condition_variable cv_; 用于通知工作线程有新任务或需要停止。

3.2 延迟任务与优先级

  • 任务队列按 execute_at 排序,工作线程在取任务前检查时间:若任务未到执行时间,线程 cv_.wait_until() 等待到 execute_at 或出现新的更早任务。
  • 若需要多级优先级,可在 Task 结构中加入 priority 字段,并在 TaskCompare 中结合 execute_at 进行复合比较。

3.3 异常处理

  • 每个任务执行时使用 try-catch 捕获异常,并记录日志或回调给外部。
  • 可以提供 std::futurestd::promise 来返回任务结果,让调用者通过 get() 获取异常。

3.4 生命周期管理

  • start():创建指定数量的工作线程,线程循环从队列取任务。
  • shutdown():设置停止标志,唤醒所有线程,等待它们退出。
  • wait_for_all():阻塞直到队列为空且所有线程已完成当前任务。

3.5 资源回收

  • 使用 RAII 方式管理线程对象,防止泄露。
  • std::unique_lock<std::mutex> 用于锁定,确保在异常时也能正确解锁。

4. 代码示例

#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <functional>
#include <chrono>
#include <mutex>
#include <condition_variable>

class Scheduler {
public:
    using Clock = std::chrono::steady_clock;

    explicit Scheduler(size_t threads = std::thread::hardware_concurrency())
        : shutdown_(false), threads_(threads) {}

    ~Scheduler() { stop(); }

    // 启动线程池
    void start() {
        for (size_t i = 0; i < threads_; ++i) {
            workers_.emplace_back([this]() { this->worker_loop(); });
        }
    }

    // 提交普通任务
    void submit(const std::function<void()>& func,
                const Clock::time_point& at = Clock::now()) {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            queue_.emplace(Task{func, at, 0});
        }
        cv_.notify_one();
    }

    // 停止调度器,等待所有线程结束
    void stop() {
        {
            std::lock_guard<std::mutex> lock(mutex_);
            shutdown_ = true;
        }
        cv_.notify_all();
        for (auto& t : workers_) {
            if (t.joinable())
                t.join();
        }
    }

    // 等待所有任务完成
    void wait_for_all() {
        std::unique_lock<std::mutex> lock(mutex_);
        cv_all_done_.wait(lock, [this]() { return queue_.empty() && !running_; });
    }

private:
    struct Task {
        std::function<void()> func;
        Clock::time_point execute_at;
        int priority;
    };

    struct TaskCompare {
        bool operator()(const Task& a, const Task& b) const {
            return a.execute_at > b.execute_at; // 先执行时间近的
        }
    };

    void worker_loop() {
        while (true) {
            Task task;
            {
                std::unique_lock<std::mutex> lock(mutex_);
                cv_.wait(lock, [this]() { return shutdown_ || !queue_.empty(); });

                if (shutdown_ && queue_.empty())
                    break;

                // 取最早执行的任务
                task = queue_.top();
                if (Clock::now() < task.execute_at) {
                    cv_.wait_until(lock, task.execute_at);
                    continue; // 重新检查
                }
                queue_.pop();
                running_ = true;
            }

            try {
                task.func();
            } catch (const std::exception& e) {
                std::cerr << "Task exception: " << e.what() << std::endl;
            } catch (...) {
                std::cerr << "Task unknown exception" << std::endl;
            }

            {
                std::lock_guard<std::mutex> lock(mutex_);
                running_ = false;
                if (queue_.empty())
                    cv_all_done_.notify_all();
            }
        }
    }

    std::vector<std::thread> workers_;
    std::priority_queue<Task, std::vector<Task>, TaskCompare> queue_;
    std::mutex mutex_;
    std::condition_variable cv_;
    std::condition_variable cv_all_done_;
    bool shutdown_;
    bool running_{false};
    size_t threads_;
};

int main() {
    Scheduler scheduler(4);
    scheduler.start();

    scheduler.submit([]() {
        std::cout << "Immediate task executed\n";
    });

    scheduler.submit([]() {
        std::cout << "Delayed task executed after 1s\n";
    }, Scheduler::Clock::now() + std::chrono::seconds(1));

    scheduler.wait_for_all();
    scheduler.stop();
    return 0;
}

5. 性能调优技巧

  1. 减少锁竞争

    • queue_cv_ 的锁粒度拆分:仅对 queue_ 进行短时锁定;把等待条件放在外层。
    • 使用 std::atomic 标志代替布尔变量,减少锁。
  2. 线程池大小

    • 经验值是 CPU 核数 * 2,或根据任务 IO 阻塞比例调整。
    • 可以动态扩容/缩容,但需注意线程创建销毁的成本。
  3. 批量调度

    • 当提交大量任务时,可一次性将多个任务放入队列,减少 notify_one() 的调用次数。
  4. 延迟任务

    • 对于大量延迟任务,考虑使用时间轮(Time Wheel)或多级时间量化技术,避免 std::condition_variable 每次都需要比较时间。
  5. 内存分配

    • Task 对象预留池化(如 std::pmr::monotonic_buffer_resource),降低 new/delete 的频率。

6. 结语

通过上述设计与实现,你可以得到一个功能完整、可维护、易于集成的 C++ 多线程任务调度器。它既能满足轻量级应用的需求,也足以在高并发服务器或实时系统中发挥作用。根据具体业务场景,你可以进一步扩展调度策略(如基于权重的负载均衡、可重复任务、超时机制等),使调度器更贴合真实需求。祝你编码愉快!

发表评论