如何在 C++20 中实现多线程任务调度器?

在现代 C++ 开发中,任务调度器是并发框架的核心组件,它负责将工作拆分成可并行执行的任务,并在多核系统上高效利用资源。C++20 提供了大量语言和库特性,使我们能够在标准库层面构建一个轻量且可扩展的任务调度器。本文将从设计目标、关键技术到完整示例,系统地展示如何在 C++20 中实现一个基于线程池的任务调度器。


1. 设计目标

  1. 可扩展性:支持任意数量的任务,并能够根据系统负载动态调节线程数。
  2. 高效性:任务调度和线程同步的开销最小化,避免频繁的上下文切换。
  3. 易用性:API 友好,类似 std::asyncstd::future 的使用方式。
  4. 安全性:线程安全,避免数据竞争和死锁。

2. 关键技术点

2.1 线程池(ThreadPool)

  • 工作线程:每个线程循环从任务队列中取任务执行,直到被终止。
  • 任务队列:使用 std::queue<std::function<void()>> 并配合 std::condition_variable 控制生产/消费。

2.2 std::futurestd::promise

  • 通过 std::packaged_task 把任意可调用对象包装成任务,并生成 std::future 供调用方等待结果。
  • std::future 与线程池解耦,支持同步和异步两种模式。

2.3 std::atomicstd::latch

  • 使用 `std::atomic ` 控制线程池是否关闭。
  • shutdown() 时使用 std::latch 等待所有工作线程退出,保证资源安全释放。

2.4 模板与完美转发

  • 采用可变模板参数和 std::forward,支持任意函数签名和参数。

3. 代码实现

#pragma once
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <atomic>
#include <latch>
#include <optional>

class ThreadPool {
public:
    explicit ThreadPool(std::size_t threads = std::thread::hardware_concurrency())
        : stop_(false), latch_(threads) {
        for (std::size_t i = 0; i < threads; ++i)
            workers_.emplace_back(&ThreadPool::worker, this);
    }

    ~ThreadPool() { shutdown(); }

    // 阻塞提交任务,返回未来对象
    template<class F, class... Args>
    auto submit(F&& f, Args&&... args)
        -> std::future<std::invoke_result_t<F, Args...>> {
        using Ret = std::invoke_result_t<F, Args...>;
        auto task = std::make_shared<std::packaged_task<Ret()>>(
            std::bind(std::forward <F>(f), std::forward<Args>(args)...));

        std::future <Ret> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mtx_);
            if (stop_)
                throw std::runtime_error("submit on stopped ThreadPool");
            tasks_.emplace([task](){ (*task)(); });
        }
        queue_cv_.notify_one();
        return res;
    }

    // 非阻塞提交,返回 std::optional <future>
    template<class F, class... Args>
    auto try_submit(F&& f, Args&&... args)
        -> std::optional<std::future<std::invoke_result_t<F, Args...>>> {
        using Ret = std::invoke_result_t<F, Args...>;
        std::unique_lock<std::mutex> lock(queue_mtx_, std::try_to_lock);
        if (!lock || stop_)
            return std::nullopt;
        auto task = std::make_shared<std::packaged_task<Ret()>>(
            std::bind(std::forward <F>(f), std::forward<Args>(args)...));
        std::future <Ret> res = task->get_future();
        tasks_.emplace([task](){ (*task)(); });
        queue_cv_.notify_one();
        return res;
    }

    // 关闭线程池,等待所有线程退出
    void shutdown() {
        {
            std::unique_lock<std::mutex> lock(queue_mtx_);
            stop_ = true;
        }
        queue_cv_.notify_all();
        for (auto &t : workers_)
            if (t.joinable())
                t.join();
        latch_.arrive_and_wait(); // 只在构造时使用
    }

private:
    void worker() {
        while (true) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(queue_mtx_);
                queue_cv_.wait(lock, [this]{
                    return stop_ || !tasks_.empty();
                });
                if (stop_ && tasks_.empty())
                    break;
                task = std::move(tasks_.front());
                tasks_.pop();
            }
            task();
        }
        latch_.arrive_and_wait(); // 线程结束时同步
    }

    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex queue_mtx_;
    std::condition_variable queue_cv_;
    std::atomic <bool> stop_;
    std::latch latch_;
};

说明

  1. 构造:默认使用 CPU 核数,构造时创建对应数量的工作线程,并让 latch_ 记录线程总数。
  2. submit:将可调用对象包装为 std::packaged_task,生成 future 并推入任务队列。生产者与消费者通过 condition_variable 同步。
  3. try_submit:尝试非阻塞提交,若锁未能及时获取或线程池已关闭,返回 std::nullopt
  4. shutdown:设置停止标志,唤醒所有线程,随后 join 等待线程结束。latch_ 用于确保所有工作线程在 shutdown 期间完整退出。

4. 使用示例

#include <iostream>
#include "ThreadPool.h"

int add(int a, int b) { return a + b; }

int main() {
    ThreadPool pool(4); // 4 条工作线程

    // 1. 同步任务
    auto f1 = pool.submit(add, 3, 5);
    std::cout << "add result: " << f1.get() << '\n'; // 8

    // 2. 异步任务
    auto f2 = pool.submit([]{
        std::this_thread::sleep_for(std::chrono::seconds(1));
        return std::string("Async done");
    });
    std::cout << "async: " << f2.get() << '\n';

    // 3. 异步任务 + 参数
    auto f3 = pool.submit([](const std::string& msg){
        return msg + " processed";
    }, "Message");
    std::cout << f3.get() << '\n';

    pool.shutdown();
    return 0;
}

5. 性能优化建议

  1. 任务合并:对小任务进行批量合并,减少线程上下文切换。
  2. 自适应线程数:根据任务队列长度动态调整线程数,例如使用 std::barrier 或 `std::atomic ` 计数。
  3. 锁自由队列:如果任务量极大,可使用 lock-free queue(如 boost::lockfree::queue)进一步提升吞吐量。

6. 结语

借助 C++20 的 std::latchstd::packaged_taskstd::future 等现代特性,我们可以在标准库层面轻松实现一个功能完备、易用且高效的多线程任务调度器。以上代码仅为基础实现,实际生产环境中仍需结合具体业务逻辑进行调优。希望本文能为你在 C++ 并发编程中提供有价值的参考。

发表评论