如何在C++中实现一个高效的多线程任务调度器?

在现代 C++ 中,std::threadstd::asyncstd::future 等标准库组件已经提供了多线程编程的基础设施。若需要在单个应用程序中处理成百上千个短任务,最常见的做法是实现一个线程池(ThreadPool),即预先创建一定数量的工作线程,然后把任务放入一个线程安全的队列,让线程池中的线程按需取任务执行。下面给出一个最小可用且易于扩展的线程池实现示例,并讨论关键设计点。


1. 线程池的核心数据结构

#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <atomic>

class ThreadPool {
public:
    explicit ThreadPool(size_t threads);
    ~ThreadPool();

    // 提交任务,返回一个 future
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
        -> std::future<typename std::result_of<F(Args...)>::type>;

private:
    // 工作线程集合
    std::vector<std::thread> workers;
    // 任务队列(包装为 std::function<void()>)
    std::queue<std::function<void()>> tasks;

    // 同步原语
    std::mutex queue_mutex;
    std::condition_variable condition;
    std::atomic <bool> stop;
};
  • tasks:使用 std::queue<std::function<void()>> 存放所有待执行的任务,function<void()> 让我们可以包装任何可调用对象(函数、lambda、bind、成员函数等)。
  • stop:一个原子布尔值,用来标记线程池是否正在停止。所有工作线程在检测到 stop 时会退出循环。
  • condition:当任务队列为空时,工作线程会在此等待;当有新任务加入时,调用 notify_one()/notify_all() 唤醒线程。

2. 构造与析构

ThreadPool::ThreadPool(size_t threads) : stop(false) {
    for (size_t i = 0; i < threads; ++i) {
        workers.emplace_back([this] {
            while (true) {
                std::function<void()> task;
                {
                    std::unique_lock<std::mutex> lock(this->queue_mutex);
                    this->condition.wait(lock, [this]{
                        return this->stop || !this->tasks.empty();
                    });
                    if (this->stop && this->tasks.empty())
                        return;
                    task = std::move(this->tasks.front());
                    this->tasks.pop();
                }
                task(); // 执行任务
            }
        });
    }
}

ThreadPool::~ThreadPool() {
    stop.store(true);
    condition.notify_all();   // 唤醒所有线程
    for (std::thread &worker: workers)
        worker.join();        // 等待线程结束
}

关键点说明

  1. 等待条件
    wait(lock, predicate) 让线程在条件不满足时自动释放锁并阻塞,直到条件满足后重新获得锁继续执行。这里的条件是 stop == truetasks 不为空。

  2. 安全退出
    stop 置为 true 后,所有工作线程会立即检查 stop 并退出。务必在析构中先设置 stop,再 notify_all() 唤醒线程,最后 join() 让主线程等待所有子线程结束。


3. 提交任务

template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type> {

    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared<std::packaged_task<return_type()>>(
        std::bind(std::forward <F>(f), std::forward<Args>(args)...)
    );

    std::future <return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // 线程池正在关闭时拒绝新任务
        if(stop.load())
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one(); // 唤醒一个等待的工作线程
    return res;
}
  • std::packaged_task 用来包装任务并关联一个 future,便于调用者获得异步结果。
  • std::bind 把函数与参数绑定,返回一个无参可调用对象,随后存入任务队列。
  • enqueue 返回 `std::future `,让调用者可以像同步调用一样等待结果。

4. 使用示例

int main() {
    ThreadPool pool(4); // 创建 4 个工作线程

    // 提交 10 个计算任务
    std::vector<std::future<int>> results;
    for (int i = 0; i < 10; ++i) {
        results.emplace_back(
            pool.enqueue([i] {
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
                return i * i;
            })
        );
    }

    // 收集结果
    for (auto &&f : results)
        std::cout << f.get() << " "; // 输出 0 1 4 9 16 25 36 49 64 81

    return 0;
}

该程序演示了:

  • 线程池在主线程完成任务提交后仍能继续工作。
  • future.get() 会阻塞直到对应任务执行完毕。

5. 性能优化与高级特性

  1. 任务优先级
    若想让高优先级任务先执行,可将任务队列改为 std::priority_queue,或维护多个队列并在取任务时按优先级顺序检查。

  2. 自适应线程数
    动态增减线程数以应对 CPU 负载变化。可以实现一个 resize(size_t new_size) 方法,使用 join/detach 或新建线程来调整线程池大小。

  3. 阻塞与非阻塞提交
    对于高频率的任务提交,可能出现队列饱和。可以实现 try_enqueue,当队列满时立即返回 false,或者让调用者决定等待。

  4. 任务取消
    标准 future 不支持取消。若需取消,可在任务内部检查共享状态,或使用自定义 CancellationToken

  5. 异常传播
    packaged_task 会捕获异常并存储到 future,调用者可以通过 future.get() 捕获。若想在线程池内部处理异常,可在 worker 中使用 try/catch 打印日志。


6. 与 std::async 的比较

  • std::async

    • 适合一次性启动异步任务,内部会自行决定是否使用线程。
    • 不支持任务队列与线程复用,导致大量短任务会频繁创建/销毁线程,成本高。
  • ThreadPool

    • 线程预先创建并复用,适合高并发、短任务。
    • 可以统一控制线程数、实现任务优先级、批量调度等高级功能。

7. 结语

实现一个线程池是学习 C++ 并发编程的常见练手项目。上述代码在 C++11 及之后的标准库中即可直接编译使用。你可以根据自己的需求进一步扩展功能:比如集成 std::condition_variable_any、支持 std::execution::par 适配器、或者为任务提供时间限制与超时处理。掌握这些技术后,你就能在大型项目中高效地管理并行任务,提高程序吞吐量与资源利用率。

发表评论