在 C++20 里,协程(coroutine)被引入为语言级别的异步编程工具。C++23 对其进行了进一步的完善,尤其是对协程返回类型(awaitable)、异步迭代器(async_iterator)和更细粒度的异常处理进行了优化。本文以一个简单的“异步数据流水线”为例,演示如何使用这些新特性构建一个高效、可组合的异步系统。
一、设计目标
- 数据源:模拟读取文件或网络数据,返回
uint8_t数组。 - 处理器:对每个字节执行简单变换(如 XOR 或压缩)。
- 消费者:最终将处理后的数据写入磁盘或打印。
- 流水线:每个阶段都是协程,利用
co_yield、co_await进行异步传递。 - 错误传播:在任意阶段出现错误,应能通过协程链向上传递。
二、关键技术
-
`,是 `co_yield` 的标准化包装,类似于 C# 的 `yield return`。std::generator与co_yield
C++23 新增 `std::generator -
std::future与co_await
std::future在 C++23 中改进为可以被co_await的异步结果。我们可以直接在协程里等待一个std::future。 -
自定义
awaiter
为了在协程内部自定义等待行为,可以实现await_ready、await_suspend、await_resume。 -
异常安全
co_return后会自动把异常包装到返回的std::future,可以在上层统一捕获。
三、示例代码
下面给出完整可编译的示例,演示上述流水线。
#include <iostream>
#include <vector>
#include <future>
#include <coroutine>
#include <thread>
#include <chrono>
#include <optional>
#include <stdexcept>
#include <cstring>
using namespace std::literals::chrono_literals;
// 简易异步等待器:模拟 I/O 延迟
struct async_sleep {
std::chrono::milliseconds dur;
std::promise <void> prom;
bool await_ready() const noexcept { return dur.count() == 0; }
void await_suspend(std::coroutine_handle<> h) {
std::thread([this, h](){
std::this_thread::sleep_for(dur);
prom.set_value();
h.resume();
}).detach();
}
void await_resume() noexcept {}
};
// 模拟异步文件读取(返回 vector <uint8_t>)
std::future<std::vector<uint8_t>> async_read_file(const std::string& path) {
return std::async(std::launch::async, [path](){
// 简单模拟:每 50 ms 读取一个字节
std::vector <uint8_t> data;
for (int i = 0; i < 10; ++i) {
data.push_back(static_cast <uint8_t>(i + 1));
std::this_thread::sleep_for(50ms);
}
return data;
});
}
// 处理器协程:对每个字节做 XOR
std::generator <uint8_t> process_bytes(std::vector<uint8_t> input, uint8_t key) {
for (auto byte : input) {
co_yield byte ^ key; // 立即返回下一个字节
co_await async_sleep{10ms}; // 模拟处理延迟
}
}
// 消费者协程:收集所有字节并打印
std::future <void> consume_bytes(std::generator<uint8_t> gen) {
return std::async(std::launch::async, [&gen](){
std::vector <uint8_t> output;
for (auto val : gen) { // 迭代协程生成器
output.push_back(val);
std::this_thread::sleep_for(5ms); // 模拟写入延迟
}
// 打印结果
std::cout << "Processed data: ";
for (auto b : output) std::cout << std::hex << static_cast<int>(b) << ' ';
std::cout << std::dec << '\n';
});
}
int main() {
try {
// 1. 读取文件
auto read_fut = async_read_file("dummy.txt");
// 2. 等待读取完成
std::vector <uint8_t> data = read_fut.get();
// 3. 处理数据
auto gen = process_bytes(data, 0xAA);
// 4. 消费处理结果
auto consume_fut = consume_bytes(gen);
consume_fut.get(); // 等待消费者完成
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << '\n';
}
return 0;
}
代码说明
async_sleep:一个自定义的 awaiter,包装std::this_thread::sleep_for,演示如何在协程里等待非 I/O 操作。async_read_file:使用std::async异步读取文件,返回std::future<std::vector<uint8_t>>。process_bytes:使用std::generator,对每个字节做 XOR,并在每步后co_await async_sleep{10ms},模拟 CPU 或网络延迟。consume_bytes:异步消费生成器中的所有字节,最后输出。
四、可扩展思路
-
链式协程
可以将process_bytes直接返回std::future<std::generator<uint8_t>>,再直接co_await消费者。 -
错误注入
在process_bytes内部抛出异常,示例中会自动包装进std::future,上层可统一处理。 -
并行流水线
将读取、处理、消费分别放在不同线程或协程池中,使用std::channel(C++23 提供)实现无锁通信。 -
资源回收
通过std::unique_ptr或std::shared_ptr管理协程上下文,避免内存泄漏。
五、结语
C++23 的协程为异步编程带来了更直观、更安全的语法。通过 std::generator、awaitable 等工具,开发者可以像编写同步代码一样组织异步流程。本文演示的异步数据流水线仅是一个入门示例,实际项目中可以结合 I/O、网络、数据库等多种来源,构建复杂且高效的异步系统。希望能为你在 C++23 时代的异步编程之路提供一点参考。