在现代 C++ 开发中,异步任务调度成为提高程序并发性和响应性的关键手段。本文将演示如何利用 C++17 标准库中的 <thread>、<future>、<chrono> 等组件,搭建一个轻量级的任务调度器。该调度器支持:
- 定时执行:按照设定的周期或固定时间点执行任务。
- 任务优先级:简单的基于整数的优先级排序。
- 线程池:共享固定数量的工作线程,避免频繁创建销毁线程带来的性能损耗。
1. 基础架构
#include <iostream>
#include <queue>
#include <functional>
#include <thread>
#include <future>
#include <chrono>
#include <condition_variable>
#include <vector>
#include <atomic>
#include <optional>
Task:封装要执行的函数、执行时间以及优先级。TaskQueue:线程安全的任务优先队列。Scheduler:负责调度、线程池管理与定时执行。
2. 任务结构
struct Task {
std::function<void()> func;
std::chrono::steady_clock::time_point execute_at;
int priority; // 低值优先
bool operator<(const Task& other) const {
if (execute_at == other.execute_at)
return priority > other.priority; // 低值优先
return execute_at > other.execute_at; // 早先时间优先
}
};
3. 线程安全优先队列
class TaskQueue {
public:
void push(Task t) {
std::lock_guard<std::mutex> lock(m_);
q_.push(std::move(t));
cv_.notify_one();
}
std::optional <Task> pop() {
std::unique_lock<std::mutex> lock(m_);
while (q_.empty() && !stop_) cv_.wait(lock);
if (stop_ && q_.empty()) return std::nullopt;
Task t = q_.top();
q_.pop();
return t;
}
void stop() {
std::lock_guard<std::mutex> lock(m_);
stop_ = true;
cv_.notify_all();
}
private:
std::priority_queue <Task> q_;
std::mutex m_;
std::condition_variable cv_;
bool stop_{false};
};
4. Scheduler 实现
class Scheduler {
public:
explicit Scheduler(size_t worker_count = std::thread::hardware_concurrency())
: workers_(worker_count) {
for (auto& w : workers_) {
w = std::thread([this] { worker_loop(); });
}
}
~Scheduler() {
task_queue_.stop();
for (auto& w : workers_) {
if (w.joinable()) w.join();
}
}
// 立即执行
void schedule(std::function<void()> f, int priority = 0) {
task_queue_.push(Task{std::move(f),
std::chrono::steady_clock::now(),
priority});
}
// 延迟执行
void schedule_at(std::function<void()> f,
std::chrono::steady_clock::time_point when,
int priority = 0) {
task_queue_.push(Task{std::move(f), when, priority});
}
// 周期性执行
void schedule_every(std::function<void()> f,
std::chrono::milliseconds interval,
int priority = 0) {
auto next = std::chrono::steady_clock::now() + interval;
std::function<void()> wrapper;
wrapper = [=, &wrapper, interval, f]() mutable {
f();
schedule_at(wrapper, next + interval, priority);
};
schedule_at(wrapper, next, priority);
}
private:
void worker_loop() {
while (true) {
auto opt_task = task_queue_.pop();
if (!opt_task) break; // 队列已停止
auto now = std::chrono::steady_clock::now();
if (opt_task->execute_at > now) {
std::this_thread::sleep_until(opt_task->execute_at);
}
opt_task->func();
}
}
TaskQueue task_queue_;
std::vector<std::thread> workers_;
};
5. 使用示例
int main() {
Scheduler sched(4); // 4 个工作线程
// 立即执行
sched.schedule([] { std::cout << "立即任务 1\n"; }, 1);
// 延迟 2 秒执行
sched.schedule_at([] { std::cout << "延迟任务 2\n"; },
std::chrono::steady_clock::now() + std::chrono::seconds(2), 0);
// 每隔 1 秒打印一次
sched.schedule_every([] {
static int cnt = 0;
std::cout << "周期任务 " << ++cnt << "\n";
}, std::chrono::milliseconds(1000), 2);
// 主线程休眠 5 秒后结束,Scheduler 会在析构时等待线程退出
std::this_thread::sleep_for(std::chrono::seconds(5));
return 0;
}
6. 性能与扩展
- 线程池大小:默认使用硬件线程数,若需要更多并发可自行调整。
- 任务类型:当前仅支持无返回值的
void()函数。若需要返回值,可结合std::future与std::packaged_task。 - 异常处理:当前实现忽略任务内部异常。可在
worker_loop中使用try-catch捕获并记录日志。 - 优先级实现:优先级仅是简单整数,实际项目中可结合工作负载特性细化。
7. 结语
通过上述代码,即可在 C++17 环境下快速搭建一个轻量级、可维护的异步任务调度器。它兼顾了定时、优先级与线程池三大核心需求,既可用于后台任务调度,也适合嵌入到实时系统或网络服务器中。你可以根据具体业务进一步扩展功能,例如添加任务取消、动态调整线程池大小或支持异步回调等。祝你编码愉快!