C++20 协程(Coroutines)实现异步流处理

C++20 引入了协程(coroutines)这一强大的语言特性,使得异步编程变得更直观、更易维护。协程本质上是一种可挂起与恢复的函数,允许你在执行过程中暂停并在需要时继续执行。本文将通过一个完整的例子演示如何使用协程实现一个异步流(async stream)处理框架,让你可以轻松地在 C++ 中进行事件驱动编程。

1. 协程的基础概念

在 C++20 中,协程需要满足以下几个条件:

  1. co_await:在协程内使用 co_await 可以挂起协程,等待某个异步操作完成后继续执行。
  2. co_yield:在协程内使用 co_yield 可以生成一个值并挂起协程,等待下一次调用。
  3. co_return:协程结束时使用 co_return

为了让编译器知道一个函数是协程,它必须返回一个 promise type,而 co_yieldco_await 的行为由该 promise type 定义。

2. 设计一个简易的异步流框架

我们先定义一个 async_generator,它类似于 Python 的 async for,可以异步产生值。实现思路:

  • **`async_generator `**:包装了协程的 promise,支持 `begin()`、`end()`。
  • **`async_iterator `**:包装协程的句柄,提供 `operator++()`、`operator*()`。
  • co_yield:将值存储在 promise 中,挂起协程。
  • co_await:等待外部事件完成后恢复协程。

2.1 代码实现

#include <coroutine>
#include <exception>
#include <iostream>
#include <thread>
#include <chrono>
#include <queue>
#include <optional>

template<typename T>
class async_generator {
public:
    struct promise_type;
    using handle_type = std::coroutine_handle <promise_type>;

    struct promise_type {
        std::optional <T> current_value;

        async_generator get_return_object() {
            return async_generator(handle_type::from_promise(*this));
        }
        std::suspend_always initial_suspend() { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        std::suspend_always yield_value(T value) {
            current_value = std::move(value);
            return {};
        }
        void unhandled_exception() { std::terminate(); }
        void return_void() {}
    };

    struct async_iterator {
        handle_type coro;

        async_iterator(handle_type h) : coro(h) {
            if (coro) coro.resume();
        }
        ~async_iterator() { if (coro) coro.destroy(); }

        async_iterator& operator++() {
            if (coro) coro.resume();
            return *this;
        }
        T operator*() const { return *coro.promise().current_value; }
        bool operator==(std::default_sentinel_t) const { return !coro || coro.done(); }
    };

    async_generator(handle_type h) : coro(h) {}
    ~async_generator() { if (coro) coro.destroy(); }

    async_iterator begin() { return async_iterator{coro}; }
    std::default_sentinel_t end() { return {}; }

private:
    handle_type coro;
};

2.2 模拟异步 I/O

下面的 async_sleep 模拟一个异步等待操作。它在后台线程中延迟一段时间后通过 co_yield 将结果返回。

async_generator <int> async_sleep(int ms, int id) {
    std::cout << "[Task " << id << "] 开始休眠 " << ms << " 毫秒\n";
    // 模拟异步延迟
    struct SleepAwaiter {
        std::chrono::milliseconds dur;
        bool await_ready() const noexcept { return false; }
        void await_suspend(std::coroutine_handle<> h) {
            std::thread([h, dur = dur] {
                std::this_thread::sleep_for(dur);
                h.resume();
            }).detach();
        }
        int await_resume() const noexcept { return 0; } // 这里返回值可自定义
    };

    co_await SleepAwaiter{std::chrono::milliseconds(ms)};
    co_yield id; // 任务完成后产出自己的 id
}

2.3 主程序:并发执行多个异步流

int main() {
    // 创建多个异步流
    auto stream1 = async_sleep(1000, 1);
    auto stream2 = async_sleep(500, 2);
    auto stream3 = async_sleep(1500, 3);

    // 使用 for-await-like 循环读取所有异步流
    for (auto id : stream1) {
        std::cout << "Stream1 产生值: " << id << "\n";
    }
    for (auto id : stream2) {
        std::cout << "Stream2 产生值: " << id << "\n";
    }
    for (auto id : stream3) {
        std::cout << "Stream3 产生值: " << id << "\n";
    }

    // 防止程序过早结束,等待后台线程完成
    std::this_thread::sleep_for(std::chrono::seconds(3));
    return 0;
}

3. 运行结果示例

[Task 1] 开始休眠 1000 毫秒
[Task 2] 开始休眠 500 毫秒
[Task 3] 开始休眠 1500 毫秒
Stream2 产生值: 2
Stream1 产生值: 1
Stream3 产生值: 3

可以看到,协程让我们轻松实现了并发的异步任务,而不需要显式地管理线程或事件循环。通过 co_yield 产生值,co_await 进行挂起与恢复,整个代码结构类似同步代码,极大地提升了可读性。

4. 进一步思考

  • 错误处理:在 promise 中实现 unhandled_exception,可以捕获协程内部抛出的异常。
  • 流合并:可以实现 async_merge 将多个 async_generator 合并成一个统一流,类似 Rx 的 merge 操作符。
  • 资源管理:在协程结束前及时释放资源,避免后台线程泄漏。

C++20 的协程提供了与现代异步框架(如 JavaScript 的 async/await、Python 的 async/await)类似的语义,却保持了 C++ 的性能与类型安全。通过上述例子,你可以在自己的项目中快速实验协程带来的便利。

发表评论