设计与实现一个现代 C++ 线程池

在高性能计算和网络服务中,线程池已成为不可或缺的技术。与传统的逐个创建与销毁线程相比,线程池通过复用线程资源,显著降低了上下文切换成本,并且可以更好地控制并发量。本文将演示如何在 C++17/20 代码中实现一个轻量、可扩展的线程池,并讨论其内部工作原理、常见错误与优化技巧。

1. 设计目标

  1. 线程复用:固定数量的工作线程持续等待任务,避免频繁创建和销毁线程的开销。
  2. 任务排队:采用 FIFO 队列,保证任务按提交顺序执行。
  3. 线程安全:使用互斥锁、条件变量等同步原语确保并发访问安全。
  4. 优雅关闭:支持立即停止与平滑停止两种模式,保证已提交任务能够完成。
  5. 灵活接口:支持 lambda、函数对象、std::packaged_task 等多种任务类型。

2. 关键数据结构

class ThreadPool {
public:
    explicit ThreadPool(size_t thread_count);
    ~ThreadPool();

    // 提交普通任务
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
        -> std::future<typename std::invoke_result_t<F, Args...>>;

    // 停止线程池
    void shutdown(bool immediate = false);

private:
    std::vector<std::thread> workers_;
    std::deque<std::function<void()>> tasks_;

    std::mutex queue_mutex_;
    std::condition_variable condition_;
    bool stop_;
};
  • workers_ 存放实际工作线程。
  • tasks_ 是一个任务队列,元素类型为 std::function<void()>,便于统一包装不同的 callable。
  • stop_ 标记线程池是否已关闭。

3. 构造函数

ThreadPool::ThreadPool(size_t thread_count) : stop_(false) {
    for(size_t i = 0; i < thread_count; ++i) {
        workers_.emplace_back([this] {
            for(;;) {
                std::function<void()> task;

                {
                    std::unique_lock<std::mutex> lock(this->queue_mutex_);
                    this->condition_.wait(lock, [this]{
                        return this->stop_ || !this->tasks_.empty();
                    });

                    if(this->stop_ && this->tasks_.empty())
                        return; // 退出线程

                    task = std::move(this->tasks_.front());
                    this->tasks_.pop_front();
                }

                task(); // 执行任务
            }
        });
    }
}
  • 线程循环中先锁住队列,等待 condition_
  • 条件判断 stop_ && tasks_.empty() 用于安全退出。
  • 通过 std::move 将任务移入局部变量后释放锁,减少锁持有时间。

4. 任务提交

template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
    -> std::future<typename std::invoke_result_t<F, Args...>>
{
    using return_type = typename std::invoke_result_t<F, Args...>;
    auto task_ptr = std::make_shared<std::packaged_task<return_type()>>(
        std::bind(std::forward <F>(f), std::forward<Args>(args)...));

    std::future <return_type> res = task_ptr->get_future();

    {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        if(stop_)
            throw std::runtime_error("enqueue on stopped ThreadPool");
        tasks_.emplace_back([task_ptr](){ (*task_ptr)(); });
    }
    condition_.notify_one();
    return res;
}
  • 通过 std::packaged_task 让用户可以拿到 future
  • 任务包装成 void() 以便统一存储。
  • 在提交后立即 notify_one() 唤醒至少一个等待线程。

5. 关闭线程池

void ThreadPool::shutdown(bool immediate) {
    {
        std::unique_lock<std::mutex> lock(queue_mutex_);
        if(stop_)
            return; // 已关闭
        if(immediate) {
            tasks_.clear(); // 丢弃未执行任务
        }
        stop_ = true;
    }
    condition_.notify_all(); // 唤醒所有线程
    for(std::thread &worker : workers_)
        if(worker.joinable())
            worker.join();
}
  • 立即停止 (immediate=true):清空任务队列,丢弃尚未开始的任务。
  • 平滑停止 (immediate=false):等待队列为空后退出,保证已提交任务完成。
  • join_all 确保线程池析构前所有工作线程已退出。

6. 使用示例

int main() {
    ThreadPool pool(4); // 4 个工作线程

    // 提交普通 lambda
    auto f1 = pool.enqueue([]{ std::cout << "Task 1\n"; });

    // 提交带参数的函数
    auto f2 = pool.enqueue([](int x){ std::cout << "Task 2: " << x << "\n"; }, 42);

    // 通过 future 获取结果
    auto f3 = pool.enqueue([]() { return 5 + 7; });
    std::cout << "Result: " << f3.get() << "\n";

    // 立即停止线程池
    pool.shutdown(true);
}

输出示例(线程顺序不确定):

Task 1
Task 2: 42
Result: 12

7. 常见陷阱

  1. 死锁:不要在任务内部锁住与 enqueue 同一把锁,导致线程无法进入队列。
  2. 条件变量失效:使用 while 或 lambda 作为等待条件,避免 spurious wakeup。
  3. 任务泄露:若用户提交了异常抛出的任务,packaged_task 会捕获异常并设置到 future,但不要在工作线程中直接 throw
  4. 析构时阻塞:在析构前一定要调用 shutdown(),否则线程可能会在 std::terminate 之前被强制终止。

8. 性能优化

  • 任务池化:使用对象池存储 std::function,减少内存分配。
  • 预分配线程:根据硬件线程数预设线程池大小,避免频繁伸缩。
  • 线程亲和性:在多核系统上将工作线程绑定到不同 CPU,减少 cache 销毁。
  • 非阻塞队列:使用 lock-free 结构(如 concurrent_queue)提升高并发下的吞吐量。

9. 结语

一个稳健的线程池不仅能提升程序的响应速度,还能让并发代码更易于维护。通过上述实现,你可以快速在自己的 C++ 项目中嵌入线程池,并根据实际需求进一步扩展功能,例如加入任务优先级、工作 stealing 或动态扩容。希望本文对你在 C++ 并发编程中的实践有所帮助。

发表评论