题目:C++23 新特性协程:实现简易的异步数据流水线

在 C++20 里,协程(coroutine)被引入为语言级别的异步编程工具。C++23 对其进行了进一步的完善,尤其是对协程返回类型(awaitable)、异步迭代器(async_iterator)和更细粒度的异常处理进行了优化。本文以一个简单的“异步数据流水线”为例,演示如何使用这些新特性构建一个高效、可组合的异步系统。

一、设计目标

  • 数据源:模拟读取文件或网络数据,返回 uint8_t 数组。
  • 处理器:对每个字节执行简单变换(如 XOR 或压缩)。
  • 消费者:最终将处理后的数据写入磁盘或打印。
  • 流水线:每个阶段都是协程,利用 co_yieldco_await 进行异步传递。
  • 错误传播:在任意阶段出现错误,应能通过协程链向上传递。

二、关键技术

  1. std::generatorco_yield
    C++23 新增 `std::generator

    `,是 `co_yield` 的标准化包装,类似于 C# 的 `yield return`。
  2. std::futureco_await
    std::future 在 C++23 中改进为可以被 co_await 的异步结果。我们可以直接在协程里等待一个 std::future

  3. 自定义 awaiter
    为了在协程内部自定义等待行为,可以实现 await_readyawait_suspendawait_resume

  4. 异常安全
    co_return 后会自动把异常包装到返回的 std::future,可以在上层统一捕获。

三、示例代码

下面给出完整可编译的示例,演示上述流水线。

#include <iostream>
#include <vector>
#include <future>
#include <coroutine>
#include <thread>
#include <chrono>
#include <optional>
#include <stdexcept>
#include <cstring>

using namespace std::literals::chrono_literals;

// 简易异步等待器:模拟 I/O 延迟
struct async_sleep {
    std::chrono::milliseconds dur;
    std::promise <void> prom;

    bool await_ready() const noexcept { return dur.count() == 0; }

    void await_suspend(std::coroutine_handle<> h) {
        std::thread([this, h](){
            std::this_thread::sleep_for(dur);
            prom.set_value();
            h.resume();
        }).detach();
    }

    void await_resume() noexcept {}
};

// 模拟异步文件读取(返回 vector <uint8_t>)
std::future<std::vector<uint8_t>> async_read_file(const std::string& path) {
    return std::async(std::launch::async, [path](){
        // 简单模拟:每 50 ms 读取一个字节
        std::vector <uint8_t> data;
        for (int i = 0; i < 10; ++i) {
            data.push_back(static_cast <uint8_t>(i + 1));
            std::this_thread::sleep_for(50ms);
        }
        return data;
    });
}

// 处理器协程:对每个字节做 XOR
std::generator <uint8_t> process_bytes(std::vector<uint8_t> input, uint8_t key) {
    for (auto byte : input) {
        co_yield byte ^ key;          // 立即返回下一个字节
        co_await async_sleep{10ms};   // 模拟处理延迟
    }
}

// 消费者协程:收集所有字节并打印
std::future <void> consume_bytes(std::generator<uint8_t> gen) {
    return std::async(std::launch::async, [&gen](){
        std::vector <uint8_t> output;
        for (auto val : gen) {        // 迭代协程生成器
            output.push_back(val);
            std::this_thread::sleep_for(5ms); // 模拟写入延迟
        }
        // 打印结果
        std::cout << "Processed data: ";
        for (auto b : output) std::cout << std::hex << static_cast<int>(b) << ' ';
        std::cout << std::dec << '\n';
    });
}

int main() {
    try {
        // 1. 读取文件
        auto read_fut = async_read_file("dummy.txt");
        // 2. 等待读取完成
        std::vector <uint8_t> data = read_fut.get();
        // 3. 处理数据
        auto gen = process_bytes(data, 0xAA);
        // 4. 消费处理结果
        auto consume_fut = consume_bytes(gen);
        consume_fut.get();  // 等待消费者完成
    } catch (const std::exception& e) {
        std::cerr << "Error: " << e.what() << '\n';
    }
    return 0;
}

代码说明

  • async_sleep:一个自定义的 awaiter,包装 std::this_thread::sleep_for,演示如何在协程里等待非 I/O 操作。
  • async_read_file:使用 std::async 异步读取文件,返回 std::future<std::vector<uint8_t>>
  • process_bytes:使用 std::generator,对每个字节做 XOR,并在每步后 co_await async_sleep{10ms},模拟 CPU 或网络延迟。
  • consume_bytes:异步消费生成器中的所有字节,最后输出。

四、可扩展思路

  1. 链式协程
    可以将 process_bytes 直接返回 std::future<std::generator<uint8_t>>,再直接 co_await 消费者。

  2. 错误注入
    process_bytes 内部抛出异常,示例中会自动包装进 std::future,上层可统一处理。

  3. 并行流水线
    将读取、处理、消费分别放在不同线程或协程池中,使用 std::channel(C++23 提供)实现无锁通信。

  4. 资源回收
    通过 std::unique_ptrstd::shared_ptr 管理协程上下文,避免内存泄漏。

五、结语

C++23 的协程为异步编程带来了更直观、更安全的语法。通过 std::generatorawaitable 等工具,开发者可以像编写同步代码一样组织异步流程。本文演示的异步数据流水线仅是一个入门示例,实际项目中可以结合 I/O、网络、数据库等多种来源,构建复杂且高效的异步系统。希望能为你在 C++23 时代的异步编程之路提供一点参考。

发表评论