如何使用 C++20 协程实现异步文件读取

C++20 引入了协程(Coroutines)这一强大的语言特性,极大地简化了异步编程的实现。本文将通过一个完整的示例,演示如何利用标准库中的协程相关工具来实现一个异步文件读取器,并说明协程内部的工作机制。


1. 背景:协程的基本概念

  • 协程(Coroutine):是一种能够挂起和恢复的函数。与传统函数不同,协程可以在执行过程中“暂停”,并在稍后恢复执行。
  • 关键字co_awaitco_yieldco_return。它们分别用于等待异步结果、生成值以及返回最终结果。
  • 协程对象:每个协程函数生成一个状态机对象,负责维护协程的状态、栈帧以及悬挂/恢复逻辑。

在 C++20 标准库中,std::futurestd::async 并不直接支持协程;相反,我们需要使用 std::experimental::generator 或自定义 Awaitable 类型来让协程与异步操作协同工作。


2. 方案概述

我们实现以下功能:

  1. 异步读取文件:将文件内容分块读取,模拟磁盘 I/O。
  2. 协程包装:把异步读取操作包装为 Awaitable 对象,供协程 co_await
  3. 协程入口:编写一个 async_read_file 协程,按需读取文件块,累积结果。

2.1 Awaitable 类型

为了让 co_await 能够等待文件读取完成,我们需要实现一个符合 Awaitable 协议的类:

struct AsyncFileRead {
    std::string path;
    std::size_t chunkSize;

    AsyncFileRead(const std::string& p, std::size_t sz)
        : path(p), chunkSize(sz) {}

    bool await_ready() const noexcept { return false; }

    void await_suspend(std::coroutine_handle<> h) const {
        // 异步读取逻辑:在后台线程中读取文件块
        std::thread([h, this] {
            std::ifstream in(path, std::ios::binary);
            if (!in) { h.resume(); return; }
            std::vector <char> buffer(chunkSize);
            while (in.read(buffer.data(), buffer.size()) ||
                   in.gcount() > 0) {
                // 这里可以通过回调或状态机把块返回给协程
                // 简化示例:直接放入全局队列(示例代码不安全,仅演示)
                {
                    std::lock_guard<std::mutex> lk(g_mutex);
                    g_chunks.push_back(std::string(buffer.data(), in.gcount()));
                }
                // 通知协程继续
                h.resume();
            }
        }).detach();
    }

    std::string await_resume() const noexcept {
        // 这里不需要返回值,因为我们通过共享容器收集块
        return {};
    }
};

注意:真实项目中请使用更安全、可扩展的异步 I/O 库(如 ASIO 或 libuv)。此处代码仅为演示。

2.2 全局共享容器

std::vector<std::string> g_chunks;
std::mutex g_mutex;

协程与后台线程通过这个共享容器和锁来交换文件块。


3. 协程实现

#include <coroutine>
#include <string>
#include <vector>
#include <fstream>
#include <thread>
#include <mutex>
#include <iostream>

struct AsyncFileRead {
    std::string path;
    std::size_t chunkSize;
    // ...
    // await_ready / await_suspend / await_resume 实现如上
};

class AsyncFileReader {
public:
    AsyncFileReader(const std::string& p, std::size_t sz)
        : path(p), chunkSize(sz) {}

    std::future<std::string> read() {
        // 协程主体
        return [=]() -> std::future<std::string> {
            co_await AsyncFileRead(path, chunkSize);
            // 等待后台线程全部完成后再合并结果
            while (true) {
                std::unique_lock<std::mutex> lk(g_mutex);
                if (g_chunks.empty()) break;
                lk.unlock();
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
            }
            // 合并所有块
            std::string data;
            for (const auto& block : g_chunks) data += block;
            co_return data;
        }();
    }

private:
    std::string path;
    std::size_t chunkSize;
};

说明

  • read() 返回一个 std::future<std::string>,代表最终的文件内容。
  • 在协程内部,先 co_await 异步读取对象,等待后台线程完成读取。
  • 读取完成后,协程合并所有块并返回。

4. 使用示例

int main() {
    AsyncFileReader reader("sample.txt", 4096);
    std::future<std::string> fut = reader.read();

    // 在主线程做其他事情...
    std::cout << "正在读取文件...\n";

    // 等待协程完成
    std::string content = fut.get();
    std::cout << "文件内容(" << content.size() << " 字节)已读完。\n";
}

运行时,主线程先打印 “正在读取文件…”,随后等待协程完成,最后输出完整文件内容。


5. 性能与可扩展性

  • 非阻塞 I/O:协程内部并没有阻塞主线程,文件读取在后台线程中完成。
  • 可扩展:可以将 AsyncFileRead 改为真正的非阻塞 I/O,利用操作系统的异步接口(如 Linux 的 aio_read 或 Windows 的 ReadFileEx)来提升性能。
  • 错误处理:当前示例未处理读取错误,实际使用时应在 await_suspendawait_resume 中抛出异常或返回错误码。

6. 小结

  • C++20 的协程为异步 I/O 提供了更直观的语法。
  • 通过实现 Awaitable 对象,可以将后台线程或系统异步 I/O 与协程无缝结合。
  • 示例演示了异步文件读取的完整流程,虽然实现简化,但已涵盖协程关键概念。

在后续的实践中,你可以尝试将协程与网络 I/O(如 HTTP 请求)、数据库访问或多线程任务调度结合,进一步探索 C++20 协程在高性能程序中的广泛应用。

发表评论