C++20 协程实现无阻塞 IO 的简易框架

C++20 引入了协程(coroutine)概念,极大地方便了异步编程。借助协程,我们可以像同步代码一样书写异步逻辑,同时实现无阻塞 IO 的效果。本文将从理论与实践两方面介绍如何用 C++20 协程构建一个简易的无阻塞 IO 框架,并指出常见坑点和最佳实践。

1. 协程基础

在 C++20 里,协程的核心是四个关键字:co_awaitco_yieldco_returnco_suspend。协程函数返回一个可挂起的类型(通常是 std::futurestd::generator 或自定义的 Awaitable)。编译器会把协程函数展开为一个状态机,挂起点会保存局部状态,恢复点会继续执行。

1.1 Awaitable 的实现

一个类型需要实现 await_readyawait_suspendawait_resume 三个成员函数,才能被 co_await 调用。最常见的例子是 std::future,它的 await_resume 会返回值。

struct AsyncSocket {
    int fd;  // 文件描述符
    std::function<void()> ready_cb;  // 就绪回调

    bool await_ready() const noexcept { return false; }
    void await_suspend(std::coroutine_handle<> h) {
        // 这里把回调注册到事件循环,完成后恢复协程
        register_read_event(fd, [h]() { h.resume(); });
    }
    void await_resume() const noexcept { /* 返回读取结果 */ }
};

2. 事件循环的最小实现

协程本身不提供事件循环,需要我们自己实现。最简单的方式是基于 epoll(Linux)或 kqueue(BSD/macOS)构建事件循环。下面给出一个极简的 EventLoop 类:

class EventLoop {
    int epfd = epoll_create1(0);
    std::unordered_map<int, std::function<void()>> callbacks;

public:
    ~EventLoop() { close(epfd); }

    void add_read(int fd, std::function<void()> cb) {
        epoll_event ev{ .events = EPOLLIN, .data.fd = fd };
        epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
        callbacks[fd] = std::move(cb);
    }

    void run() {
        epoll_event events[64];
        while (true) {
            int n = epoll_wait(epfd, events, 64, -1);
            for (int i = 0; i < n; ++i) {
                int fd = events[i].data.fd;
                auto it = callbacks.find(fd);
                if (it != callbacks.end()) it->second();
            }
        }
    }
};

3. 简易无阻塞 HTTP 客户端

下面演示如何用协程和事件循环写一个不阻塞的 HTTP GET 请求。

#include <coroutine>
#include <future>
#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>

class EventLoop {
    /* 见上文实现 */
};

EventLoop loop;

int make_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
    return 0;
}

struct AwaitableRead {
    int fd;
    std::vector <char> buffer;
    size_t pos = 0;

    bool await_ready() const noexcept { return false; }

    void await_suspend(std::coroutine_handle<> h) {
        loop.add_read(fd, [h]() mutable { h.resume(); });
    }

    std::vector <char> await_resume() {
        // 简化:假设一次读取完成
        return buffer;
    }
};

awaitable_read read_fd(int fd) {
    AwaitableRead ar{fd};
    // 读取一次
    char tmp[4096];
    ssize_t n = read(fd, tmp, sizeof(tmp));
    if (n > 0) ar.buffer.insert(ar.buffer.end(), tmp, tmp + n);
    co_return ar.buffer;
}

awaitable_co void http_get(const std::string& host, const std::string& path) {
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    make_nonblocking(sock);
    /* 连接到 host:80 ... */
    // 省略连接代码,假设已连接

    std::string req = "GET " + path + " HTTP/1.1\r\n"
                      "Host: " + host + "\r\n"
                      "Connection: close\r\n\r\n";
    send(sock, req.c_str(), req.size(), 0);

    // 等待可读
    std::vector <char> resp = co_await read_fd(sock);

    std::cout << "Received " << resp.size() << " bytes\n";
    close(sock);
}

int main() {
    http_get("example.com", "/").resume();  // 启动协程
    loop.run();                             // 开始事件循环
}

注意:上述代码极度简化,仅演示协程挂起与恢复的机制。实际生产代码需要处理错误、完整性读取、时间戳、心跳等。

4. 常见坑点

  1. 忘记把文件描述符设置为非阻塞:如果不设置,read/write 仍会阻塞,导致协程挂起失效。
  2. 回调持有错误的协程句柄await_suspend 必须在回调中保存协程句柄,并在 IO 完成后恢复。
  3. 事件循环与协程的交互:在事件循环中恢复协程后,若协程再次挂起,需要重新注册事件。
  4. 资源泄漏:事件循环退出前要确保关闭所有文件描述符、取消注册。

5. 小结

C++20 协程为无阻塞 IO 提供了极佳的语言层支持。通过自定义 Awaitable、实现简易事件循环,我们可以在保持同步风格的同时实现高并发的异步网络程序。实际使用中,建议借助成熟的库(如 asiolibuv)或使用 std::experimental::awaitable 之类的高级抽象,避免从零开始实现复杂细节。

祝你编码愉快,玩转协程世界!

发表评论