C++20 中协程的实现与应用

协程(Coroutine)是 C++20 标准新增的一项特性,它为异步编程提供了更直观、可读性更强的语法。与传统的回调、Promise/Future 机制相比,协程可以让开发者像写同步代码一样书写异步逻辑,从而降低错误率并提升代码可维护性。本文将从协程的基本原理、关键关键字、实现细节以及典型应用场景进行阐述,并给出实战代码示例。

1. 协程基础

协程本质上是能够在执行过程中被“挂起”和“恢复”的函数。其实现依赖于两大概念:

  • promise:协程体外的状态容器,负责管理协程的生命周期、返回值、异常等。
  • handle:协程内部的控制入口,用于挂起、恢复、检查结束状态。

C++20 对协程的底层实现做了透明化,开发者只需使用 co_awaitco_yieldco_return 等关键字即可。

2. 关键关键字

关键字 作用 典型用法
co_await 让协程等待一个 awaitable 对象完成 auto result = co_await asyncTask();
co_yield 产生一个值并挂起协程 co_yield value;
co_return 结束协程并返回值 co_return finalValue;
co_spawn (需配合协程库)启动协程 auto handle = co_spawn(asyncFunc(), ...);

3. 典型实现:异步文件读取

以下示例演示如何使用协程实现异步文件读取,借助 C++20 标准库 std::future 以及自定义 awaitable。

#include <coroutine>
#include <future>
#include <fstream>
#include <iostream>

struct async_file_reader {
    struct awaiter {
        std::ifstream& file;
        std::string& buffer;
        bool await_ready() { return !file.eof(); }
        void await_suspend(std::coroutine_handle<> h) {
            // 在后台线程读取文件
            std::async(std::launch::async, [&](){
                buffer.clear();
                if (file >> buffer) {
                    std::cout << "Read: " << buffer << "\n";
                }
                h.resume(); // 读取完毕恢复协程
            });
        }
        void await_resume() {}
    };
    awaiter operator co_await() { return {file, buffer}; }
    std::ifstream file;
    std::string buffer;
};

auto async_read_file(const std::string& path) -> std::future<std::string> {
    async_file_reader reader{std::ifstream(path), std::string{}};
    co_await reader;
    co_return reader.buffer;
}

int main() {
    auto fut = async_read_file("example.txt");
    std::cout << "Result: " << fut.get() << "\n";
}

4. 与线程池结合

协程可以轻松与线程池协同工作。下面展示一个简易的线程池与协程调度示例:

#include <thread>
#include <vector>
#include <queue>
#include <condition_variable>
#include <coroutine>

class thread_pool {
public:
    thread_pool(size_t n) : stop_(false) {
        workers_.reserve(n);
        for(size_t i = 0; i < n; ++i)
            workers_.emplace_back([this]{
                while(true){
                    std::function<void()> task;
                    {
                        std::unique_lock lock(m_);
                        cv_.wait(lock, [this]{ return stop_ || !tasks_.empty(); });
                        if(stop_ && tasks_.empty()) return;
                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }
                    task();
                }
            });
    }
    ~thread_pool(){ stop(); }

    template<typename F>
    void enqueue(F&& f){
        {
            std::unique_lock lock(m_);
            tasks_.emplace(std::forward <F>(f));
        }
        cv_.notify_one();
    }
private:
    void stop(){
        {
            std::unique_lock lock(m_);
            stop_ = true;
        }
        cv_.notify_all();
        for(auto& w: workers_) w.join();
    }
    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex m_;
    std::condition_variable cv_;
    bool stop_;
};

struct coroutine_task {
    struct promise_type;
    using handle_type = std::coroutine_handle <promise_type>;
    handle_type h;

    coroutine_task(handle_type h) : h(h) {}
    ~coroutine_task() { if(h) h.destroy(); }

    struct promise_type {
        coroutine_task get_return_object() {
            return {handle_type::from_promise(*this)};
        }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() { std::rethrow_exception(std::current_exception()); }
    };
};

void example_task(thread_pool& pool){
    coroutine_task ct([]() -> coroutine_task {
        std::cout << "Task started\n";
        co_await std::suspend_always{};
        std::cout << "Task resumed\n";
    }());
    pool.enqueue([ct = std::move(ct)](){ /* 线程池会执行此 lambda,进而调度协程 */ });
}

int main(){
    thread_pool pool(4);
    example_task(pool);
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

5. 性能与注意事项

  • 栈使用:协程的状态保存在堆(promise)中,减少了栈空间占用,适合高并发场景。
  • 异常传播:协程支持异常链,unhandled_exception 会将异常提升到外层。
  • 生命周期管理:协程句柄若不及时销毁会导致内存泄漏,建议使用 RAII 包装。

6. 典型应用场景

  1. 高性能网络服务器:使用协程 + IOCP / epoll,单线程即可处理数千连接。
  2. 数据流处理co_yield 可以实现可组合的数据流,类似 Rx 的 observable。
  3. 游戏引擎:协程用于实现非阻塞任务调度、状态机、动画系统。
  4. 异步数据库访问:将查询过程拆分为协程,避免线程阻塞。

7. 结语

C++20 协程为异步编程提供了更简洁、更直观的语法。掌握其基本原理、关键字以及与线程池、事件循环等传统技术的融合方式,可以帮助开发者构建高并发、低延迟的应用。随着标准化进程的深入,协程将成为 C++ 生态中不可或缺的一部分。

发表评论