C++20 引入了协程(coroutines)这一强大的语言特性,使得异步编程变得更直观、更易维护。协程本质上是一种可挂起与恢复的函数,允许你在执行过程中暂停并在需要时继续执行。本文将通过一个完整的例子演示如何使用协程实现一个异步流(async stream)处理框架,让你可以轻松地在 C++ 中进行事件驱动编程。
1. 协程的基础概念
在 C++20 中,协程需要满足以下几个条件:
co_await:在协程内使用co_await可以挂起协程,等待某个异步操作完成后继续执行。co_yield:在协程内使用co_yield可以生成一个值并挂起协程,等待下一次调用。co_return:协程结束时使用co_return。
为了让编译器知道一个函数是协程,它必须返回一个 promise type,而 co_yield、co_await 的行为由该 promise type 定义。
2. 设计一个简易的异步流框架
我们先定义一个 async_generator,它类似于 Python 的 async for,可以异步产生值。实现思路:
- **`async_generator `**:包装了协程的 promise,支持 `begin()`、`end()`。
- **`async_iterator `**:包装协程的句柄,提供 `operator++()`、`operator*()`。
co_yield:将值存储在 promise 中,挂起协程。co_await:等待外部事件完成后恢复协程。
2.1 代码实现
#include <coroutine>
#include <exception>
#include <iostream>
#include <thread>
#include <chrono>
#include <queue>
#include <optional>
template<typename T>
class async_generator {
public:
struct promise_type;
using handle_type = std::coroutine_handle <promise_type>;
struct promise_type {
std::optional <T> current_value;
async_generator get_return_object() {
return async_generator(handle_type::from_promise(*this));
}
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
std::suspend_always yield_value(T value) {
current_value = std::move(value);
return {};
}
void unhandled_exception() { std::terminate(); }
void return_void() {}
};
struct async_iterator {
handle_type coro;
async_iterator(handle_type h) : coro(h) {
if (coro) coro.resume();
}
~async_iterator() { if (coro) coro.destroy(); }
async_iterator& operator++() {
if (coro) coro.resume();
return *this;
}
T operator*() const { return *coro.promise().current_value; }
bool operator==(std::default_sentinel_t) const { return !coro || coro.done(); }
};
async_generator(handle_type h) : coro(h) {}
~async_generator() { if (coro) coro.destroy(); }
async_iterator begin() { return async_iterator{coro}; }
std::default_sentinel_t end() { return {}; }
private:
handle_type coro;
};
2.2 模拟异步 I/O
下面的 async_sleep 模拟一个异步等待操作。它在后台线程中延迟一段时间后通过 co_yield 将结果返回。
async_generator <int> async_sleep(int ms, int id) {
std::cout << "[Task " << id << "] 开始休眠 " << ms << " 毫秒\n";
// 模拟异步延迟
struct SleepAwaiter {
std::chrono::milliseconds dur;
bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<> h) {
std::thread([h, dur = dur] {
std::this_thread::sleep_for(dur);
h.resume();
}).detach();
}
int await_resume() const noexcept { return 0; } // 这里返回值可自定义
};
co_await SleepAwaiter{std::chrono::milliseconds(ms)};
co_yield id; // 任务完成后产出自己的 id
}
2.3 主程序:并发执行多个异步流
int main() {
// 创建多个异步流
auto stream1 = async_sleep(1000, 1);
auto stream2 = async_sleep(500, 2);
auto stream3 = async_sleep(1500, 3);
// 使用 for-await-like 循环读取所有异步流
for (auto id : stream1) {
std::cout << "Stream1 产生值: " << id << "\n";
}
for (auto id : stream2) {
std::cout << "Stream2 产生值: " << id << "\n";
}
for (auto id : stream3) {
std::cout << "Stream3 产生值: " << id << "\n";
}
// 防止程序过早结束,等待后台线程完成
std::this_thread::sleep_for(std::chrono::seconds(3));
return 0;
}
3. 运行结果示例
[Task 1] 开始休眠 1000 毫秒
[Task 2] 开始休眠 500 毫秒
[Task 3] 开始休眠 1500 毫秒
Stream2 产生值: 2
Stream1 产生值: 1
Stream3 产生值: 3
可以看到,协程让我们轻松实现了并发的异步任务,而不需要显式地管理线程或事件循环。通过 co_yield 产生值,co_await 进行挂起与恢复,整个代码结构类似同步代码,极大地提升了可读性。
4. 进一步思考
- 错误处理:在 promise 中实现
unhandled_exception,可以捕获协程内部抛出的异常。 - 流合并:可以实现
async_merge将多个async_generator合并成一个统一流,类似 Rx 的merge操作符。 - 资源管理:在协程结束前及时释放资源,避免后台线程泄漏。
C++20 的协程提供了与现代异步框架(如 JavaScript 的 async/await、Python 的 async/await)类似的语义,却保持了 C++ 的性能与类型安全。通过上述例子,你可以在自己的项目中快速实验协程带来的便利。