题目:C++20协程实战:从协程到异步流的实现

在 C++20 标准正式发布后,协程(coroutine)这一强大功能被正式纳入语言核心。协程允许程序员用更直观的方式书写异步代码,隐藏了事件循环的细节,使得代码可读性与可维护性大幅提升。本文将从基本概念出发,介绍协程的核心组成,演示如何实现一个简单的异步流,并结合文件 I/O 示例演示实际应用场景。


1. 协程基础概念

1.1 协程与线程的区别

  • 协程:协作式多任务,执行上下文在同一线程内切换。协程暂停点是显式的(如 co_await),需要调度器手动切换。
  • 线程:抢占式多任务,操作系统调度多核并行执行。线程上下文切换成本较高,且同步复杂度高。

协程在 I/O 密集型或需要大量状态机的场景下表现优异。

1.2 协程的三大核心组件

  1. promise_type:协程的承诺类型,定义协程返回值、异常处理以及生命周期管理。
  2. handle:协程句柄,用于管理协程的生命周期,启动、暂停、恢复、销毁。
  3. awaitable:可等待对象,提供 await_ready()await_suspend()await_resume() 三个成员函数,决定协程何时挂起、恢复以及返回结果。

2. 协程的基本语法

// 1. 定义 awaitable 对象
struct TimerAwaitable {
    std::chrono::milliseconds duration;
    std::promise <void> prom;

    bool await_ready() { return duration.count() == 0; }
    void await_suspend(std::coroutine_handle<> h) {
        std::thread([=, h]() mutable {
            std::this_thread::sleep_for(duration);
            prom.set_value();
            h.resume();
        }).detach();
    }
    void await_resume() {}
};

// 2. 定义协程函数
struct Task {
    struct promise_type {
        Task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };
};

Task async_operation() {
    std::cout << "Start\n";
    co_await TimerAwaitable{std::chrono::milliseconds(1000)};
    std::cout << "After 1s\n";
}

上述示例展示了一个简单的协程函数 async_operation,通过 co_await 暂停 1 秒后继续执行。


3. 实现一个异步流(Async Stream)

3.1 需求分析

我们想要实现一个能够异步读取文件内容的流式接口,类似于 JavaScript 的 ReadableStream。读者可以使用 co_await 或循环 while 来逐块读取数据。

3.2 设计思路

  • ChunkSize:每次读取的字节数,默认 4096。
  • AsyncFileReader:协程类,内部维护文件指针与缓冲区。
  • awaitableReadAwaitable 用于实现 co_await,在后台线程读取文件内容后返回给协程。

3.3 关键实现

#include <coroutine>
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <thread>
#include <chrono>

class AsyncFileReader {
public:
    struct promise_type;
    using handle_type = std::coroutine_handle <promise_type>;

    struct promise_type {
        AsyncFileReader get_return_object() {
            return AsyncFileReader{handle_type::from_promise(*this)};
        }
        std::suspend_never initial_suspend() noexcept { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };

    AsyncFileReader(handle_type h) : coro(h) {}
    ~AsyncFileReader() { if (coro) coro.destroy(); }

    // awaitable for reading next chunk
    struct ReadAwaitable {
        std::string& buffer;
        std::ifstream& file;
        std::size_t chunkSize;

        bool await_ready() const noexcept { return false; }

        void await_suspend(std::coroutine_handle<> h) {
            std::thread([=, h]() mutable {
                buffer.resize(chunkSize);
                file.read(buffer.data(), static_cast<std::streamsize>(chunkSize));
                std::size_t readSize = static_cast<std::size_t>(file.gcount());
                buffer.resize(readSize); // shrink to actual size
                h.resume();
            }).detach();
        }

        std::string await_resume() { return buffer; }
    };

    // Interface to fetch next chunk
    ReadAwaitable read_next(std::size_t chunkSize = 4096) {
        return ReadAwaitable{buf, file, chunkSize};
    }

private:
    handle_type coro;
    std::ifstream file{ "sample.txt", std::ios::binary };
    std::string buf;
};

async Task read_file_stream() {
    AsyncFileReader reader{ };
    while (!reader.file.eof()) {
        std::string chunk = co_await reader.read_next();
        if (chunk.empty()) break;
        std::cout << "Chunk (" << chunk.size() << " bytes): " << chunk << "\n";
    }
}

说明:

  • AsyncFileReader 在构造时打开文件,并提供 read_next 方法返回一个 awaitable。
  • ReadAwaitable 在后台线程中执行 file.read,完成后恢复协程。
  • 读者可以在自己的协程中使用 while 循环 co_await read_next,实现流式读取。

3.4 使用示例

int main() {
    std::cout << "Async file read started\n";
    auto coro = read_file_stream();
    // 直接运行协程
    // 若有事件循环,可将其挂载至事件循环
    return 0;
}

此程序会在后台线程中读取文件,主线程不会被阻塞。通过 co_await 的方式,代码保持了同步式的可读性。


4. 进阶:协程与网络 I/O

在网络编程中,协程常用于实现非阻塞套接字。大多数现代网络库(如 Boost.Asio、cppcoro、libuv 的 C++ 封装)都已经提供了协程支持。使用协程可以把异步回调链转化为顺序式代码。

asio::ip::tcp::socket socket{io_context};
co_await socket.async_connect({ip, port}, asio::use_awaitable);
co_await socket.async_send(asio::buffer(msg), asio::use_awaitable);
std::array<char, 1024> buf;
std::size_t n = co_await socket.async_receive(asio::buffer(buf), asio::use_awaitable);

上述代码不需要显式回调函数,协程自动在网络 I/O 完成后恢复执行。


5. 结语

C++20 的协程为编写高性能、可维护的异步代码提供了语言级支持。通过 promise_type、awaitable 以及 coroutine_handle 等机制,协程可以轻松实现文件 I/O、网络通信以及任何需要状态机的场景。虽然协程的实现细节仍有一定的学习成本,但一旦掌握后,其代码结构会变得更加直观,错误率显著降低。

在实际项目中,建议先熟悉协程的基础用法,然后逐步尝试结合第三方库(如 Boost.Asio)实现更复杂的异步业务。随着标准的完善与生态的丰富,C++协程将成为高性能服务器、游戏引擎以及嵌入式系统中的重要工具。祝你在协程之旅中收获丰硕成果!

发表评论