在现代 C++ 开发中,任务调度器是并发框架的核心组件,它负责将工作拆分成可并行执行的任务,并在多核系统上高效利用资源。C++20 提供了大量语言和库特性,使我们能够在标准库层面构建一个轻量且可扩展的任务调度器。本文将从设计目标、关键技术到完整示例,系统地展示如何在 C++20 中实现一个基于线程池的任务调度器。
1. 设计目标
- 可扩展性:支持任意数量的任务,并能够根据系统负载动态调节线程数。
- 高效性:任务调度和线程同步的开销最小化,避免频繁的上下文切换。
- 易用性:API 友好,类似
std::async、std::future的使用方式。 - 安全性:线程安全,避免数据竞争和死锁。
2. 关键技术点
2.1 线程池(ThreadPool)
- 工作线程:每个线程循环从任务队列中取任务执行,直到被终止。
- 任务队列:使用
std::queue<std::function<void()>>并配合std::condition_variable控制生产/消费。
2.2 std::future 与 std::promise
- 通过
std::packaged_task把任意可调用对象包装成任务,并生成std::future供调用方等待结果。 std::future与线程池解耦,支持同步和异步两种模式。
2.3 std::atomic 与 std::latch
- 使用 `std::atomic ` 控制线程池是否关闭。
- 在
shutdown()时使用std::latch等待所有工作线程退出,保证资源安全释放。
2.4 模板与完美转发
- 采用可变模板参数和
std::forward,支持任意函数签名和参数。
3. 代码实现
#pragma once
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <atomic>
#include <latch>
#include <optional>
class ThreadPool {
public:
explicit ThreadPool(std::size_t threads = std::thread::hardware_concurrency())
: stop_(false), latch_(threads) {
for (std::size_t i = 0; i < threads; ++i)
workers_.emplace_back(&ThreadPool::worker, this);
}
~ThreadPool() { shutdown(); }
// 阻塞提交任务,返回未来对象
template<class F, class... Args>
auto submit(F&& f, Args&&... args)
-> std::future<std::invoke_result_t<F, Args...>> {
using Ret = std::invoke_result_t<F, Args...>;
auto task = std::make_shared<std::packaged_task<Ret()>>(
std::bind(std::forward <F>(f), std::forward<Args>(args)...));
std::future <Ret> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mtx_);
if (stop_)
throw std::runtime_error("submit on stopped ThreadPool");
tasks_.emplace([task](){ (*task)(); });
}
queue_cv_.notify_one();
return res;
}
// 非阻塞提交,返回 std::optional <future>
template<class F, class... Args>
auto try_submit(F&& f, Args&&... args)
-> std::optional<std::future<std::invoke_result_t<F, Args...>>> {
using Ret = std::invoke_result_t<F, Args...>;
std::unique_lock<std::mutex> lock(queue_mtx_, std::try_to_lock);
if (!lock || stop_)
return std::nullopt;
auto task = std::make_shared<std::packaged_task<Ret()>>(
std::bind(std::forward <F>(f), std::forward<Args>(args)...));
std::future <Ret> res = task->get_future();
tasks_.emplace([task](){ (*task)(); });
queue_cv_.notify_one();
return res;
}
// 关闭线程池,等待所有线程退出
void shutdown() {
{
std::unique_lock<std::mutex> lock(queue_mtx_);
stop_ = true;
}
queue_cv_.notify_all();
for (auto &t : workers_)
if (t.joinable())
t.join();
latch_.arrive_and_wait(); // 只在构造时使用
}
private:
void worker() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mtx_);
queue_cv_.wait(lock, [this]{
return stop_ || !tasks_.empty();
});
if (stop_ && tasks_.empty())
break;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
latch_.arrive_and_wait(); // 线程结束时同步
}
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex queue_mtx_;
std::condition_variable queue_cv_;
std::atomic <bool> stop_;
std::latch latch_;
};
说明
- 构造:默认使用 CPU 核数,构造时创建对应数量的工作线程,并让
latch_记录线程总数。 - submit:将可调用对象包装为
std::packaged_task,生成future并推入任务队列。生产者与消费者通过condition_variable同步。 - try_submit:尝试非阻塞提交,若锁未能及时获取或线程池已关闭,返回
std::nullopt。 - shutdown:设置停止标志,唤醒所有线程,随后
join等待线程结束。latch_用于确保所有工作线程在shutdown期间完整退出。
4. 使用示例
#include <iostream>
#include "ThreadPool.h"
int add(int a, int b) { return a + b; }
int main() {
ThreadPool pool(4); // 4 条工作线程
// 1. 同步任务
auto f1 = pool.submit(add, 3, 5);
std::cout << "add result: " << f1.get() << '\n'; // 8
// 2. 异步任务
auto f2 = pool.submit([]{
std::this_thread::sleep_for(std::chrono::seconds(1));
return std::string("Async done");
});
std::cout << "async: " << f2.get() << '\n';
// 3. 异步任务 + 参数
auto f3 = pool.submit([](const std::string& msg){
return msg + " processed";
}, "Message");
std::cout << f3.get() << '\n';
pool.shutdown();
return 0;
}
5. 性能优化建议
- 任务合并:对小任务进行批量合并,减少线程上下文切换。
- 自适应线程数:根据任务队列长度动态调整线程数,例如使用
std::barrier或 `std::atomic ` 计数。 - 锁自由队列:如果任务量极大,可使用 lock-free queue(如
boost::lockfree::queue)进一步提升吞吐量。
6. 结语
借助 C++20 的 std::latch、std::packaged_task、std::future 等现代特性,我们可以在标准库层面轻松实现一个功能完备、易用且高效的多线程任务调度器。以上代码仅为基础实现,实际生产环境中仍需结合具体业务逻辑进行调优。希望本文能为你在 C++ 并发编程中提供有价值的参考。