**如何在 C++20 中实现一个轻量级的线程池?**

在现代 C++(特别是 C++20)中,标准库已经提供了许多多线程原语,例如 std::threadstd::mutexstd::condition_variable 等。虽然这些工具足以构建线程池,但我们通常希望有一个更简洁、可复用且易于配置的线程池实现。下面给出一个基于 C++20 标准库的轻量级线程池示例,展示如何:

  1. 使用 std::jthread 代替 std::thread,以获得自动停止与 join 的便利。
  2. 利用 std::generator(C++23 以后) 的概念来实现任务队列。
  3. 提供简单的任务提交 API,支持返回 std::future 以获取结果。
  4. 支持可配置的线程数量,并在构造时自动创建线程。

注意:此实现仅用于学习和演示,生产环境建议使用成熟的线程池库(如 Intel TBB、Boost.Thread 或者 OpenMP)。


1. 头文件与命名空间

#include <iostream>
#include <vector>
#include <queue>
#include <future>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <optional>
#include <atomic>

2. 线程池类定义

class ThreadPool {
public:
    explicit ThreadPool(std::size_t threads = std::thread::hardware_concurrency());
    ~ThreadPool();

    // 通过模板实现任意可调用对象的提交
    template <typename Func, typename... Args>
    auto submit(Func&& f, Args&&... args)
        -> std::future<std::invoke_result_t<Func, Args...>>;

private:
    // 任务类型:包装为可调用的包装器
    using Task = std::function<void()>;

    // 线程循环
    void worker_loop();

    std::vector<std::jthread> workers_;
    std::queue <Task> task_queue_;
    std::mutex queue_mutex_;
    std::condition_variable cv_;
    std::atomic <bool> stop_{false};
};

3. 构造与析构

ThreadPool::ThreadPool(std::size_t threads) {
    workers_.reserve(threads);
    for (std::size_t i = 0; i < threads; ++i) {
        workers_.emplace_back(&ThreadPool::worker_loop, this);
    }
}

ThreadPool::~ThreadPool() {
    stop_.store(true);
    cv_.notify_all();  // 唤醒所有线程
    // std::jthread 的析构会自动 join 所有线程
}

4. worker_loop 实现

void ThreadPool::worker_loop() {
    while (true) {
        Task task;
        {
            std::unique_lock lock(queue_mutex_);
            cv_.wait(lock, [this] { return stop_.load() || !task_queue_.empty(); });

            if (stop_.load() && task_queue_.empty())
                return;  // 退出循环

            task = std::move(task_queue_.front());
            task_queue_.pop();
        }
        task();  // 执行任务
    }
}

5. submit 模板实现

template <typename Func, typename... Args>
auto ThreadPool::submit(Func&& f, Args&&... args)
    -> std::future<std::invoke_result_t<Func, Args...>> {
    using Ret = std::invoke_result_t<Func, Args...>;

    auto task_ptr = std::make_shared<std::packaged_task<Ret()>>(
        std::bind(std::forward <Func>(f), std::forward<Args>(args)...)
    );

    std::future <Ret> fut = task_ptr->get_future();

    {
        std::unique_lock lock(queue_mutex_);
        if (stop_.load())
            throw std::runtime_error("ThreadPool has been stopped");
        task_queue_.emplace([task_ptr](){ (*task_ptr)(); });
    }
    cv_.notify_one();
    return fut;
}

6. 使用示例

int main() {
    ThreadPool pool(4);  // 4 个工作线程

    // 提交一个返回值任务
    auto fut1 = pool.submit([](int a, int b){ return a + b; }, 3, 7);

    // 提交一个无返回值任务
    auto fut2 = pool.submit([](){ std::cout << "Hello from thread pool!\n"; });

    std::cout << "Result: " << fut1.get() << '\n';

    // 等待 fut2 结束(这里没有返回值,使用 dummy future)
    fut2.get();

    return 0;
}

7. 关键点回顾

  1. std::jthread:相较于 std::threadjthread 在析构时会自动调用 join,简化线程生命周期管理。
  2. 任务包装:使用 std::packaged_taskstd::future 组合,使提交任务时既能获得返回值,又能保证线程安全。
  3. 条件变量cv_ 用于阻塞线程,直到有新任务或线程池被停止。
  4. 异常安全:在 submit 中检查 stop_,防止向已停止的线程池提交任务。

小结

以上示例展示了如何使用 C++20 标准库的基本多线程原语实现一个功能完备的线程池。它既简洁又易于维护,适合作为学习和实验项目。若需更高性能或更丰富的功能(如任务优先级、动态扩容等),可以在此基础上进一步扩展或引入第三方成熟库。

发表评论