**如何在C++中实现一个高效的事件分发系统?**

事件分发系统是许多实时游戏、GUI 框架以及网络应用的核心组件。它的主要任务是:

  1. 接收事件(键盘、鼠标、网络数据、计时器等)
  2. 将事件路由给合适的处理器(回调函数、观察者对象等)
  3. 保证高并发、低延迟

下面给出一个基于 C++20 的完整实现思路,涵盖事件类型定义观察者注册线程安全的事件队列以及异步处理。实现中会用到 std::variantstd::functionstd::shared_mutexstd::atomicstd::condition_variable 等现代 C++ 特性。


1. 事件类型设计

#include <variant>
#include <string>
#include <chrono>

namespace ev = std::chrono;

// 基础事件结构
struct EventBase
{
    using TimePoint = std::chrono::steady_clock::time_point;
    TimePoint   timestamp{ std::chrono::steady_clock::now() };
    virtual ~EventBase() = default;
};

// 具体事件
struct KeyboardEvent : EventBase
{
    int keyCode;
    bool pressed;
};

struct MouseEvent : EventBase
{
    int  x, y;
    int  button;
    bool pressed;
};

struct NetworkEvent : EventBase
{
    std::string data;
};

struct TimerEvent : EventBase
{
    ev::duration <double> interval;
};

// 事件可变体
using Event = std::variant<KeyboardEvent, MouseEvent, NetworkEvent, TimerEvent>;

说明

  • std::variant 让事件统一管理,且保持类型安全。
  • 事件基类提供时间戳,可用于日志或延迟计算。

2. 观察者接口与注册机制

#include <functional>
#include <unordered_map>
#include <vector>
#include <mutex>
#include <shared_mutex>
#include <atomic>

class EventDispatcher
{
public:
    using HandlerId = std::size_t;
    using Handler   = std::function<void(const Event&)>;

    // 注册观察者
    HandlerId subscribe(Handler h)
    {
        std::unique_lock lock{m_handlers_mutex};
        HandlerId id = m_next_id++;
        m_handlers.emplace(id, std::move(h));
        return id;
    }

    // 取消订阅
    void unsubscribe(HandlerId id)
    {
        std::unique_lock lock{m_handlers_mutex};
        m_handlers.erase(id);
    }

    // 产生事件
    void dispatch(const Event& ev)
    {
        // 线程安全复制一次 handler 列表
        std::vector <Handler> snapshot;
        {
            std::shared_lock lock{m_handlers_mutex};
            snapshot.reserve(m_handlers.size());
            for (auto& [id, h] : m_handlers)
                snapshot.emplace_back(h);
        }

        for (auto& h : snapshot)
            h(ev);     // 异步处理可自行改为 std::async 或者线程池
    }

private:
    std::unordered_map<HandlerId, Handler> m_handlers;
    std::shared_mutex m_handlers_mutex;
    std::atomic <HandlerId> m_next_id{1};
};

说明

  • subscribe 返回唯一 HandlerId,可以在需要时取消。
  • 通过 shared_mutex 允许多线程并发读取注册表,同时写操作时加排他锁。
  • dispatch 先复制一次观察者列表,避免在处理过程中有人注册/注销导致迭代异常。

3. 事件队列与多线程分发

#include <queue>
#include <condition_variable>
#include <thread>
#include <memory>

class EventBus
{
public:
    EventBus() : m_stop(false)
    {
        m_worker = std::thread(&EventBus::workerThread, this);
    }

    ~EventBus()
    {
        stop();
    }

    void post(const Event& ev)
    {
        {
            std::unique_lock lock{m_queue_mutex};
            m_queue.push(ev);
        }
        m_cond.notify_one();
    }

    void subscribe(EventDispatcher::Handler h)
    {
        m_dispatcher.subscribe(std::move(h));
    }

    void unsubscribe(EventDispatcher::HandlerId id)
    {
        m_dispatcher.unsubscribe(id);
    }

    void stop()
    {
        {
            std::unique_lock lock{m_queue_mutex};
            m_stop = true;
        }
        m_cond.notify_all();
        if (m_worker.joinable())
            m_worker.join();
    }

private:
    void workerThread()
    {
        while (true)
        {
            Event ev;
            {
                std::unique_lock lock{m_queue_mutex};
                m_cond.wait(lock, [&]{ return m_stop || !m_queue.empty(); });

                if (m_stop && m_queue.empty())
                    break;

                ev = std::move(m_queue.front());
                m_queue.pop();
            }
            m_dispatcher.dispatch(ev);
        }
    }

    std::queue <Event>                     m_queue;
    std::mutex                            m_queue_mutex;
    std::condition_variable               m_cond;
    bool                                  m_stop{false};

    EventDispatcher                       m_dispatcher;
    std::thread                           m_worker;
};

说明

  • post 将事件放入线程安全队列;
  • workerThread 在后台线程持续取事件并交给 EventDispatcher 处理;
  • 采用 condition_variable 省去忙等,提高 CPU 利用率。

4. 使用示例

#include <iostream>
#include <thread>
#include <chrono>

int main()
{
    EventBus bus;

    // 订阅键盘事件
    bus.subscribe([](const Event& ev) {
        if (auto p = std::get_if <KeyboardEvent>(&ev)) {
            std::cout << "Keyboard: key=" << p->keyCode << " pressed=" << p->pressed << '\n';
        }
    });

    // 订阅鼠标事件
    bus.subscribe([](const Event& ev) {
        if (auto p = std::get_if <MouseEvent>(&ev)) {
            std::cout << "Mouse: (" << p->x << "," << p->y << ") " << "button=" << p->button << " pressed=" << p->pressed << '\n';
        }
    });

    // 产生事件
    bus.post(KeyboardEvent{ .keyCode = 65, .pressed = true });
    bus.post(MouseEvent{ .x = 100, .y = 200, .button = 1, .pressed = true });

    std::this_thread::sleep_for(std::chrono::seconds(1));
    bus.stop();
}

运行结果(示例):

Keyboard: key=65 pressed=1
Mouse: (100,200) button=1 pressed=1

5. 性能优化建议

场景 优化手段
事件数量极大 std::queue 换成 Ring Buffer(如 boost::lockfree::queue)
多线程并发产生 std::atomic双缓冲 的方式,减少锁竞争
回调执行耗时 Handler 拆分为 异步任务,交给线程池处理
内存占用 对事件使用 对象池std::pmr::memory_resource
CPU 难以占用 采用 事件复用:同类型事件合并后一次处理,降低调度开销

6. 小结

  • 使用 std::variant 统一事件类型,保持类型安全。
  • EventDispatcher 负责观察者管理,使用共享锁实现并发读写。
  • EventBus 通过线程安全队列和后台线程实现事件生产者-消费者模型。
  • 通过 C++20 的协程或线程池可进一步提升异步处理效率。

此实现可直接嵌入到游戏引擎、GUI 框架或网络服务器中,具有高性能易维护的优势。

发表评论