**题目:在 C++20 中利用协程实现异步管道(Async Pipeline)**

在现代 C++ 中,协程(Coroutines)为实现轻量级异步和流式编程提供了强大的工具。本文将通过一个完整的示例,演示如何使用 C++20 协程(co_yieldco_return)构建一个可组合的异步管道系统,支持数据的异步生产、过滤、映射以及最终消费。通过逐步拆解代码,帮助读者深入理解协程的工作原理及其在实际编程中的优势。


1. 需求与设计思路

我们希望实现一个可以链式调用的管道 API,类似于流式操作符(|)或 LINQ。其核心功能包括:

  1. 数据生成:从外部异步源(例如文件、网络)读取数据。
  2. 转换:支持 mapfilter 等操作。
  3. 消费:将最终结果写入目标(例如文件、控制台)。

实现思路:

  • 使用 `generator `(自定义协程返回类型)来表示可异步迭代的数据流。
  • 通过成员函数 mapfilter 返回新的 generator,实现链式调用。
  • for co_await 循环消费生成器,完成真正的异步迭代。

2. 必要的 C++20 头文件与辅助结构

#include <coroutine>
#include <iostream>
#include <vector>
#include <string>
#include <functional>
#include <chrono>
#include <thread>

2.1 协程返回类型:`generator

` “`cpp template struct generator { struct promise_type { T current_value; std::suspend_always yield_value(T value) { current_value = std::move(value); return {}; } std::suspend_always initial_suspend() { return {}; } std::suspend_always final_suspend() noexcept { return {}; } void unhandled_exception() { std::terminate(); } generator get_return_object() { return generator{std::coroutine_handle ::from_promise(*this)}; } void return_void() {} }; std::coroutine_handle coro; explicit generator(std::coroutine_handle h) : coro(h) {} generator(const generator&) = delete; generator(generator&& other) noexcept : coro(other.coro) { other.coro = nullptr; } ~generator() { if (coro) coro.destroy(); } bool next() { return coro.resume(), !coro.done(); } T value() const { return coro.promise().current_value; } }; “` — ## 3. 数据源:异步生成整数 “`cpp generator async_range(int start, int count, std::chrono::milliseconds delay) { for (int i = start; i generator()(std::declval()))> map(generator src, Func f) { while (src.next()) { co_yield f(src.value()); } } template generator filter(generator src, Predicate p) { while (src.next()) { if (p(src.value())) co_yield src.value(); } } “` — ## 5. 消费者:打印结果 “`cpp void consume(generator src) { while (src.next()) { std::cout ” 20 -> 40 -> 60 -> 80 -> 100 “` — ## 7. 进一步扩展 – **错误处理**:在协程内部抛出异常后,可在 `promise_type::unhandled_exception` 中捕获并转化为错误码。 – **并行化**:将 `async_range` 改为使用多线程或网络 I/O,真正实现异步 I/O。 – **管道组合器**:提供 `pipe` 函数,使得 `auto final = src | filter | map;` 的语法更简洁。 — ## 8. 小结 本文展示了如何利用 C++20 协程实现一个可组合的异步管道。通过自定义 `generator `、`map`、`filter` 等工具函数,开发者可以像使用标准库容器一样,灵活地对数据流进行异步处理。协程的轻量级特性使得异步代码更易读、易写,显著提升了 C++ 在异步编程领域的竞争力。 —

发表评论