在现代 C++ 开发中,多线程已经成为提高程序性能的常用手段。然而,直接使用 std::thread 和互斥锁往往容易导致死锁、资源泄漏或调度失效。本文将通过一个可复用的 TaskScheduler 类来演示如何在 C++17 或更高版本中实现一个高效、易用的多线程任务调度器。
1. 设计目标
- 易于使用:用户只需调用
enqueue添加任务。 - 线程安全:内部使用互斥锁和条件变量确保并发安全。
- 动态扩容:根据任务数量自动调整线程池大小。
- 优雅停止:支持平滑关闭,等待所有已提交任务完成。
2. 核心数据结构
class TaskScheduler {
public:
using Task = std::function<void()>;
TaskScheduler(size_t coreThreads = std::thread::hardware_concurrency());
~TaskScheduler();
// 直接提交任务
void enqueue(Task task);
// 关闭调度器,等待已提交任务执行完毕
void shutdown();
private:
void workerLoop();
void adjustPoolSize();
std::vector<std::thread> workers_;
std::queue <Task> tasks_;
std::mutex mtx_;
std::condition_variable cv_;
bool stop_{false};
size_t coreThreads_;
};
workers_存放工作线程。tasks_队列保存待执行任务。stop_标志用于停止线程。
3. 关键实现细节
3.1 构造函数
TaskScheduler::TaskScheduler(size_t coreThreads)
: coreThreads_(coreThreads) {
for (size_t i = 0; i < coreThreads_; ++i) {
workers_.emplace_back(&TaskScheduler::workerLoop, this);
}
}
在构造时直接创建核心线程数。
3.2 任务执行循环
void TaskScheduler::workerLoop() {
while (true) {
Task task;
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this]{ return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task(); // 执行任务
}
}
使用条件变量等待任务到来或停止信号。
3.3 动态扩容
我们可以在 enqueue 中检测任务队列长度与活跃线程数,按需创建新线程,或在 workerLoop 里检测空闲时间后退出,释放资源。下面给出一个简化实现:
void TaskScheduler::enqueue(Task task) {
{
std::lock_guard<std::mutex> lock(mtx_);
tasks_.push(std::move(task));
}
cv_.notify_one();
}
如果需要动态扩容,可在 enqueue 中检查 tasks_.size() > workers_.size(),并创建额外线程。
3.4 停止与销毁
void TaskScheduler::shutdown() {
{
std::lock_guard<std::mutex> lock(mtx_);
stop_ = true;
}
cv_.notify_all();
for (auto& th : workers_) {
if (th.joinable()) th.join();
}
}
TaskScheduler::~TaskScheduler() {
shutdown();
}
这样可以确保所有线程安全退出。
4. 使用示例
int main() {
TaskScheduler scheduler(4); // 4 核心线程
for (int i = 0; i < 10; ++i) {
scheduler.enqueue([i]{
std::cout << "任务 " << i << " 开始\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "任务 " << i << " 结束\n";
});
}
scheduler.shutdown(); // 等待所有任务完成后退出
return 0;
}
输出示例(并发执行):
任务 0 开始
任务 1 开始
任务 2 开始
任务 3 开始
任务 0 结束
任务 4 开始
...
5. 进一步优化
- 线程池最大线程数:通过计数器与条件变量限制最大线程数,防止过度创建。
- 优先级任务:改用
std::priority_queue或boost::heap来实现优先级调度。 - 任务返回值:改用
std::future包装任务,使调用者能获取执行结果。 - 错误处理:在
workerLoop捕获异常并记录日志,避免线程异常退出。
6. 结语
本文展示了一个基础但功能完整的 C++ 任务调度器实现,演示了线程池、任务队列、条件变量和互斥锁的协同工作方式。实际项目中,可以根据业务需求进一步扩展优先级调度、任务取消、限流等高级特性。掌握这些技术后,你就能在 C++ 项目中灵活地管理并发任务,显著提升程序性能与可维护性。