如何在C++中实现一个简易的多线程任务调度器

在现代 C++ 开发中,多线程已经成为提高程序性能的常用手段。然而,直接使用 std::thread 和互斥锁往往容易导致死锁、资源泄漏或调度失效。本文将通过一个可复用的 TaskScheduler 类来演示如何在 C++17 或更高版本中实现一个高效、易用的多线程任务调度器。

1. 设计目标

  1. 易于使用:用户只需调用 enqueue 添加任务。
  2. 线程安全:内部使用互斥锁和条件变量确保并发安全。
  3. 动态扩容:根据任务数量自动调整线程池大小。
  4. 优雅停止:支持平滑关闭,等待所有已提交任务完成。

2. 核心数据结构

class TaskScheduler {
public:
    using Task = std::function<void()>;
    TaskScheduler(size_t coreThreads = std::thread::hardware_concurrency());
    ~TaskScheduler();

    // 直接提交任务
    void enqueue(Task task);

    // 关闭调度器,等待已提交任务执行完毕
    void shutdown();

private:
    void workerLoop();
    void adjustPoolSize();

    std::vector<std::thread> workers_;
    std::queue <Task> tasks_;
    std::mutex mtx_;
    std::condition_variable cv_;
    bool stop_{false};
    size_t coreThreads_;
};
  • workers_ 存放工作线程。
  • tasks_ 队列保存待执行任务。
  • stop_ 标志用于停止线程。

3. 关键实现细节

3.1 构造函数

TaskScheduler::TaskScheduler(size_t coreThreads)
    : coreThreads_(coreThreads) {
    for (size_t i = 0; i < coreThreads_; ++i) {
        workers_.emplace_back(&TaskScheduler::workerLoop, this);
    }
}

在构造时直接创建核心线程数。

3.2 任务执行循环

void TaskScheduler::workerLoop() {
    while (true) {
        Task task;
        {
            std::unique_lock<std::mutex> lock(mtx_);
            cv_.wait(lock, [this]{ return stop_ || !tasks_.empty(); });
            if (stop_ && tasks_.empty()) return;
            task = std::move(tasks_.front());
            tasks_.pop();
        }
        task(); // 执行任务
    }
}

使用条件变量等待任务到来或停止信号。

3.3 动态扩容

我们可以在 enqueue 中检测任务队列长度与活跃线程数,按需创建新线程,或在 workerLoop 里检测空闲时间后退出,释放资源。下面给出一个简化实现:

void TaskScheduler::enqueue(Task task) {
    {
        std::lock_guard<std::mutex> lock(mtx_);
        tasks_.push(std::move(task));
    }
    cv_.notify_one();
}

如果需要动态扩容,可在 enqueue 中检查 tasks_.size() > workers_.size(),并创建额外线程。

3.4 停止与销毁

void TaskScheduler::shutdown() {
    {
        std::lock_guard<std::mutex> lock(mtx_);
        stop_ = true;
    }
    cv_.notify_all();
    for (auto& th : workers_) {
        if (th.joinable()) th.join();
    }
}
TaskScheduler::~TaskScheduler() {
    shutdown();
}

这样可以确保所有线程安全退出。

4. 使用示例

int main() {
    TaskScheduler scheduler(4); // 4 核心线程

    for (int i = 0; i < 10; ++i) {
        scheduler.enqueue([i]{
            std::cout << "任务 " << i << " 开始\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            std::cout << "任务 " << i << " 结束\n";
        });
    }

    scheduler.shutdown(); // 等待所有任务完成后退出
    return 0;
}

输出示例(并发执行):

任务 0 开始
任务 1 开始
任务 2 开始
任务 3 开始
任务 0 结束
任务 4 开始
...

5. 进一步优化

  1. 线程池最大线程数:通过计数器与条件变量限制最大线程数,防止过度创建。
  2. 优先级任务:改用 std::priority_queueboost::heap 来实现优先级调度。
  3. 任务返回值:改用 std::future 包装任务,使调用者能获取执行结果。
  4. 错误处理:在 workerLoop 捕获异常并记录日志,避免线程异常退出。

6. 结语

本文展示了一个基础但功能完整的 C++ 任务调度器实现,演示了线程池、任务队列、条件变量和互斥锁的协同工作方式。实际项目中,可以根据业务需求进一步扩展优先级调度、任务取消、限流等高级特性。掌握这些技术后,你就能在 C++ 项目中灵活地管理并发任务,显著提升程序性能与可维护性。

发表评论