在现代 C++ 中,std::thread、std::async、std::future 等标准库组件已经提供了多线程编程的基础设施。若需要在单个应用程序中处理成百上千个短任务,最常见的做法是实现一个线程池(ThreadPool),即预先创建一定数量的工作线程,然后把任务放入一个线程安全的队列,让线程池中的线程按需取任务执行。下面给出一个最小可用且易于扩展的线程池实现示例,并讨论关键设计点。
1. 线程池的核心数据结构
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <atomic>
class ThreadPool {
public:
explicit ThreadPool(size_t threads);
~ThreadPool();
// 提交任务,返回一个 future
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
private:
// 工作线程集合
std::vector<std::thread> workers;
// 任务队列(包装为 std::function<void()>)
std::queue<std::function<void()>> tasks;
// 同步原语
std::mutex queue_mutex;
std::condition_variable condition;
std::atomic <bool> stop;
};
- tasks:使用
std::queue<std::function<void()>>存放所有待执行的任务,function<void()>让我们可以包装任何可调用对象(函数、lambda、bind、成员函数等)。 - stop:一个原子布尔值,用来标记线程池是否正在停止。所有工作线程在检测到
stop时会退出循环。 - condition:当任务队列为空时,工作线程会在此等待;当有新任务加入时,调用
notify_one()/notify_all()唤醒线程。
2. 构造与析构
ThreadPool::ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this]{
return this->stop || !this->tasks.empty();
});
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task(); // 执行任务
}
});
}
}
ThreadPool::~ThreadPool() {
stop.store(true);
condition.notify_all(); // 唤醒所有线程
for (std::thread &worker: workers)
worker.join(); // 等待线程结束
}
关键点说明
-
等待条件
wait(lock, predicate)让线程在条件不满足时自动释放锁并阻塞,直到条件满足后重新获得锁继续执行。这里的条件是stop == true或tasks不为空。 -
安全退出
在stop置为true后,所有工作线程会立即检查stop并退出。务必在析构中先设置stop,再notify_all()唤醒线程,最后join()让主线程等待所有子线程结束。
3. 提交任务
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward <F>(f), std::forward<Args>(args)...)
);
std::future <return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// 线程池正在关闭时拒绝新任务
if(stop.load())
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one(); // 唤醒一个等待的工作线程
return res;
}
std::packaged_task用来包装任务并关联一个future,便于调用者获得异步结果。std::bind把函数与参数绑定,返回一个无参可调用对象,随后存入任务队列。enqueue返回 `std::future `,让调用者可以像同步调用一样等待结果。
4. 使用示例
int main() {
ThreadPool pool(4); // 创建 4 个工作线程
// 提交 10 个计算任务
std::vector<std::future<int>> results;
for (int i = 0; i < 10; ++i) {
results.emplace_back(
pool.enqueue([i] {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return i * i;
})
);
}
// 收集结果
for (auto &&f : results)
std::cout << f.get() << " "; // 输出 0 1 4 9 16 25 36 49 64 81
return 0;
}
该程序演示了:
- 线程池在主线程完成任务提交后仍能继续工作。
future.get()会阻塞直到对应任务执行完毕。
5. 性能优化与高级特性
-
任务优先级
若想让高优先级任务先执行,可将任务队列改为std::priority_queue,或维护多个队列并在取任务时按优先级顺序检查。 -
自适应线程数
动态增减线程数以应对 CPU 负载变化。可以实现一个resize(size_t new_size)方法,使用join/detach或新建线程来调整线程池大小。 -
阻塞与非阻塞提交
对于高频率的任务提交,可能出现队列饱和。可以实现try_enqueue,当队列满时立即返回false,或者让调用者决定等待。 -
任务取消
标准future不支持取消。若需取消,可在任务内部检查共享状态,或使用自定义CancellationToken。 -
异常传播
packaged_task会捕获异常并存储到future,调用者可以通过future.get()捕获。若想在线程池内部处理异常,可在 worker 中使用try/catch打印日志。
6. 与 std::async 的比较
-
std::async
- 适合一次性启动异步任务,内部会自行决定是否使用线程。
- 不支持任务队列与线程复用,导致大量短任务会频繁创建/销毁线程,成本高。
-
ThreadPool
- 线程预先创建并复用,适合高并发、短任务。
- 可以统一控制线程数、实现任务优先级、批量调度等高级功能。
7. 结语
实现一个线程池是学习 C++ 并发编程的常见练手项目。上述代码在 C++11 及之后的标准库中即可直接编译使用。你可以根据自己的需求进一步扩展功能:比如集成 std::condition_variable_any、支持 std::execution::par 适配器、或者为任务提供时间限制与超时处理。掌握这些技术后,你就能在大型项目中高效地管理并行任务,提高程序吞吐量与资源利用率。