在现代 C++ 开发中,线程池是处理并发任务的常用技术。相比直接创建和销毁大量 std::thread,线程池可以显著降低系统开销、提高任务吞吐量,并简化并发管理。本文将从设计思路、核心组件实现、性能优化以及常见问题三个层面,为你展示如何用 C++20 标准库构建一个轻量级但功能完善的线程池。
1. 设计思路
| 目标 | 说明 |
|---|---|
| 可复用性 | 线程池应支持动态调整线程数量,满足不同负载需求。 |
| 任务统一性 | 采用模板或 std::function 接收任意可调用对象,支持返回值。 |
| 安全性 | 所有内部状态必须线程安全,防止竞争条件与死锁。 |
| 易用性 | API 简洁,使用者只需提交任务即可获得 std::future。 |
核心组件:
- 任务队列:线程安全的 FIFO 队列,存放待执行的任务。
- 工作线程:池中的线程循环取任务并执行。
- 同步原语:
std::mutex+std::condition_variable用于线程间同步。 - 停止机制:标志位 + 条件变量,确保优雅退出。
2. 核心实现
#include <atomic>
#include <condition_variable>
#include <future>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
template<typename T = void>
class ThreadPool {
public:
using Task = std::function<T()>;
explicit ThreadPool(std::size_t threadCount = std::thread::hardware_concurrency())
: stopFlag(false) {
for (std::size_t i = 0; i < threadCount; ++i)
workers.emplace_back(&ThreadPool::workerThread, this);
}
~ThreadPool() {
shutdown();
}
// 禁止拷贝与移动
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
// 提交任务,返回 std::future
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<std::invoke_result_t<F, Args...>> {
using Ret = std::invoke_result_t<F, Args...>;
auto boundTask = std::bind(std::forward <F>(f), std::forward<Args>(args)...);
auto packagedTask = std::make_shared<std::packaged_task<Ret()>>(std::move(boundTask));
std::future <Ret> res = packagedTask->get_future();
{
std::lock_guard<std::mutex> lock(queueMutex);
if (stopFlag)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([packagedTask](){ (*packagedTask)(); });
}
queueCond.notify_one();
return res;
}
// 立即停止:等待已提交任务完成
void shutdown() {
{
std::lock_guard<std::mutex> lock(queueMutex);
stopFlag = true;
}
queueCond.notify_all();
for (auto& th : workers)
if (th.joinable())
th.join();
}
private:
// 工作线程主体
void workerThread() {
while (true) {
Task task;
{
std::unique_lock<std::mutex> lock(queueMutex);
queueCond.wait(lock, [this] { return stopFlag || !tasks.empty(); });
if (stopFlag && tasks.empty())
return; // 退出
task = std::move(tasks.front());
tasks.pop();
}
task();
}
}
std::vector<std::thread> workers;
std::queue <Task> tasks;
std::mutex queueMutex;
std::condition_variable queueCond;
std::atomic_bool stopFlag;
};
关键点说明
Task统一为std::function<T()>:模板参数T用来支持不同返回类型。若不需要返回值,可使用ThreadPool<>。enqueue:通过std::bind将函数与参数预先绑定,然后包装为std::packaged_task,最后将std::function形式的任务推入队列。调用者通过future获取结果或等待完成。- 线程安全:
queueMutex用来保护任务队列,queueCond用于等待与通知。stopFlag为原子布尔,避免在多线程间读写不一致。 - 优雅退出:
shutdown设置停止标志,通知所有线程,随后join等待线程结束。
3. 性能与优化
| 方向 | 实现细节 |
|---|---|
| 减少上下文切换 | 线程池线程数不宜过多,建议不超过硬件线程数 * 2。 |
| 任务批量执行 | 可在 workerThread 内部一次性取出若干任务,减少 wait/notify 次数。 |
| 自适应扩容 | 监控队列长度动态创建或销毁线程,但需注意锁竞争与线程创建成本。 |
使用 std::jthread |
C++20 的 jthread 支持自动停止,可简化停止逻辑。 |
4. 常见错误与解决方案
-
“enqueue on stopped ThreadPool” 异常
- 原因:在调用
shutdown()后继续提交任务。 - 解决:在提交前检查是否已停止,或者使用 `std::shared_ptr ` 管理生命周期。
- 原因:在调用
-
死锁或程序崩溃
- 原因:任务内部再提交任务导致无限递归或未捕获异常。
- 解决:在
workerThread里包裹try/catch,捕获异常并记录。
-
性能瓶颈
- 原因:使用
std::function包装导致堆分配。 - 解决:使用
std::packaged_task的operator()直接执行,或使用std::deque+std::function进行优化。
- 原因:使用
5. 示例:并行计算斐波那契
int fib(int n) {
return n < 2 ? n : fib(n-1) + fib(n-2);
}
int main() {
ThreadPool<> pool(4); // 4 个工作线程
std::vector<std::future<int>> futures;
for (int i = 30; i < 35; ++i) {
futures.emplace_back(pool.enqueue(fib, i));
}
for (auto& f : futures)
std::cout << "fib = " << f.get() << std::endl;
pool.shutdown(); // 可省略,析构会自动调用
}
运行结果(示例):
fib = 832040
fib = 1346269
fib = 2178309
fib = 3524578
fib = 5702887
6. 结语
自定义线程池在 C++20 生态下实现相对简单,却能为并发程序带来显著优势。上述实现仅为基础模板,实际项目中可根据业务场景进一步扩展,例如添加任务优先级、定时任务、监控接口等。希望这篇文章能帮助你快速上手并为你的项目增添高效的并发能力。