在现代软件开发中,异步执行和任务并发已成为不可或缺的技术。C++ 标准库提供了 std::thread、std::future、std::async 等工具,但如果你需要更细粒度的控制,或者想要一个可插拔、可扩展的任务调度框架,就需要自己实现一个自定义的多线程任务调度器。下面将从设计原则、核心组件、实现细节以及性能调优等方面,系统性地阐述如何在 C++ 中实现一个轻量、易用且高效的任务调度器。
1. 设计目标
| 目标 | 说明 |
|---|---|
| 低开销 | 线程池大小可配置,避免频繁创建/销毁线程。 |
| 高并发 | 支持数千个任务的并发排队与执行。 |
| 可扩展性 | 任务可以通过回调、模板、函数对象等多种方式提交。 |
| 容错性 | 任务异常不影响调度器整体运行,异常信息可查询。 |
| 优先级调度 | 支持任务优先级或延迟执行。 |
| 生命周期管理 | 线程池启动、停止、等待全部任务完成等功能。 |
2. 核心组件
-
Task(任务)
任务是一个可调用对象,包装了要执行的函数、参数以及元数据(如优先级、延迟时间、提交时间)。struct Task { std::function<void()> func; std::chrono::steady_clock::time_point execute_at; int priority; // 可选 }; -
TaskQueue(任务队列)
用于存放待执行的任务。典型实现是使用std::priority_queue或std::deque。为支持延迟执行,需要根据execute_at字段进行排序。struct TaskCompare { bool operator()(const Task& a, const Task& b) const { return a.execute_at > b.execute_at; // 未来时间靠前 } }; std::priority_queue<Task, std::vector<Task>, TaskCompare> queue_; -
Worker(工作线程)
线程池中的工作线程循环从TaskQueue取出任务并执行。线程在空闲时阻塞,使用std::condition_variable等待新的任务或停止信号。 -
Scheduler(调度器)
提供接口:submit(),start(),shutdown(),wait_for_all()等。管理线程池、任务队列和同步原语。
3. 关键实现细节
3.1 线程安全
TaskQueue需要在多线程环境下操作,必须使用互斥量保护:std::mutex queue_mutex_;- 条件变量
std::condition_variable cv_;用于通知工作线程有新任务或需要停止。
3.2 延迟任务与优先级
- 任务队列按
execute_at排序,工作线程在取任务前检查时间:若任务未到执行时间,线程cv_.wait_until()等待到execute_at或出现新的更早任务。 - 若需要多级优先级,可在
Task结构中加入priority字段,并在TaskCompare中结合execute_at进行复合比较。
3.3 异常处理
- 每个任务执行时使用
try-catch捕获异常,并记录日志或回调给外部。 - 可以提供
std::future或std::promise来返回任务结果,让调用者通过get()获取异常。
3.4 生命周期管理
start():创建指定数量的工作线程,线程循环从队列取任务。shutdown():设置停止标志,唤醒所有线程,等待它们退出。wait_for_all():阻塞直到队列为空且所有线程已完成当前任务。
3.5 资源回收
- 使用 RAII 方式管理线程对象,防止泄露。
std::unique_lock<std::mutex>用于锁定,确保在异常时也能正确解锁。
4. 代码示例
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <functional>
#include <chrono>
#include <mutex>
#include <condition_variable>
class Scheduler {
public:
using Clock = std::chrono::steady_clock;
explicit Scheduler(size_t threads = std::thread::hardware_concurrency())
: shutdown_(false), threads_(threads) {}
~Scheduler() { stop(); }
// 启动线程池
void start() {
for (size_t i = 0; i < threads_; ++i) {
workers_.emplace_back([this]() { this->worker_loop(); });
}
}
// 提交普通任务
void submit(const std::function<void()>& func,
const Clock::time_point& at = Clock::now()) {
{
std::lock_guard<std::mutex> lock(mutex_);
queue_.emplace(Task{func, at, 0});
}
cv_.notify_one();
}
// 停止调度器,等待所有线程结束
void stop() {
{
std::lock_guard<std::mutex> lock(mutex_);
shutdown_ = true;
}
cv_.notify_all();
for (auto& t : workers_) {
if (t.joinable())
t.join();
}
}
// 等待所有任务完成
void wait_for_all() {
std::unique_lock<std::mutex> lock(mutex_);
cv_all_done_.wait(lock, [this]() { return queue_.empty() && !running_; });
}
private:
struct Task {
std::function<void()> func;
Clock::time_point execute_at;
int priority;
};
struct TaskCompare {
bool operator()(const Task& a, const Task& b) const {
return a.execute_at > b.execute_at; // 先执行时间近的
}
};
void worker_loop() {
while (true) {
Task task;
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return shutdown_ || !queue_.empty(); });
if (shutdown_ && queue_.empty())
break;
// 取最早执行的任务
task = queue_.top();
if (Clock::now() < task.execute_at) {
cv_.wait_until(lock, task.execute_at);
continue; // 重新检查
}
queue_.pop();
running_ = true;
}
try {
task.func();
} catch (const std::exception& e) {
std::cerr << "Task exception: " << e.what() << std::endl;
} catch (...) {
std::cerr << "Task unknown exception" << std::endl;
}
{
std::lock_guard<std::mutex> lock(mutex_);
running_ = false;
if (queue_.empty())
cv_all_done_.notify_all();
}
}
}
std::vector<std::thread> workers_;
std::priority_queue<Task, std::vector<Task>, TaskCompare> queue_;
std::mutex mutex_;
std::condition_variable cv_;
std::condition_variable cv_all_done_;
bool shutdown_;
bool running_{false};
size_t threads_;
};
int main() {
Scheduler scheduler(4);
scheduler.start();
scheduler.submit([]() {
std::cout << "Immediate task executed\n";
});
scheduler.submit([]() {
std::cout << "Delayed task executed after 1s\n";
}, Scheduler::Clock::now() + std::chrono::seconds(1));
scheduler.wait_for_all();
scheduler.stop();
return 0;
}
5. 性能调优技巧
-
减少锁竞争
- 将
queue_和cv_的锁粒度拆分:仅对queue_进行短时锁定;把等待条件放在外层。 - 使用
std::atomic标志代替布尔变量,减少锁。
- 将
-
线程池大小
- 经验值是
CPU 核数 * 2,或根据任务 IO 阻塞比例调整。 - 可以动态扩容/缩容,但需注意线程创建销毁的成本。
- 经验值是
-
批量调度
- 当提交大量任务时,可一次性将多个任务放入队列,减少
notify_one()的调用次数。
- 当提交大量任务时,可一次性将多个任务放入队列,减少
-
延迟任务
- 对于大量延迟任务,考虑使用时间轮(Time Wheel)或多级时间量化技术,避免
std::condition_variable每次都需要比较时间。
- 对于大量延迟任务,考虑使用时间轮(Time Wheel)或多级时间量化技术,避免
-
内存分配
- 为
Task对象预留池化(如std::pmr::monotonic_buffer_resource),降低new/delete的频率。
- 为
6. 结语
通过上述设计与实现,你可以得到一个功能完整、可维护、易于集成的 C++ 多线程任务调度器。它既能满足轻量级应用的需求,也足以在高并发服务器或实时系统中发挥作用。根据具体业务场景,你可以进一步扩展调度策略(如基于权重的负载均衡、可重复任务、超时机制等),使调度器更贴合真实需求。祝你编码愉快!