如何在C++中使用多线程实现生产者-消费者模型

在现代 C++ 中,多线程编程已成为解决高并发问题的核心手段。本文将以生产者-消费者模型为例,展示如何利用标准库中的线程、互斥锁、条件变量以及线程安全队列来实现一个简单而完整的多线程程序。通过代码示例,你将能够快速掌握:

  1. std::thread 的创建与管理
  2. std::mutexstd::unique_lock 的使用
  3. std::condition_variable 的同步机制
  4. 如何安全地在多线程之间传递数据

1. 背景与需求

生产者-消费者模型描述了两类线程:生产者产生数据并放入共享缓冲区,消费者从缓冲区取出数据进行处理。核心挑战是:

  • 生产者不应在缓冲区已满时继续写入;
  • 消费者不应在缓冲区为空时继续读取;
  • 两类线程必须同步,避免竞争条件和死锁。

2. 设计思路

我们使用一个固定大小的循环缓冲区(ring buffer)来存放生产的数据。为保证线程安全,采用以下同步机制:

组件 作用
std::mutex 保护缓冲区的读写操作
std::condition_variable 生产者等待缓冲区非满,消费者等待缓冲区非空
std::unique_lock condition_variable 配合使用,允许条件变量在等待时自动释放互斥锁

3. 代码实现

#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <random>

class BoundedBuffer {
public:
    explicit BoundedBuffer(std::size_t capacity)
        : capacity_(capacity) {}

    // 生产者调用
    void produce(int item) {
        std::unique_lock<std::mutex> lock(mtx_);
        not_full_.wait(lock, [this] { return buffer_.size() < capacity_; });

        buffer_.push(item);
        std::cout << "Produced: " << item << "\n";
        not_empty_.notify_one(); // 通知消费者
    }

    // 消费者调用
    int consume() {
        std::unique_lock<std::mutex> lock(mtx_);
        not_empty_.wait(lock, [this] { return !buffer_.empty(); });

        int item = buffer_.front();
        buffer_.pop();
        std::cout << "Consumed: " << item << "\n";
        not_full_.notify_one(); // 通知生产者
        return item;
    }

private:
    std::size_t capacity_;
    std::queue <int> buffer_;
    std::mutex mtx_;
    std::condition_variable not_empty_;
    std::condition_variable not_full_;
};

void producer(BoundedBuffer& buf, int id, int count) {
    std::mt19937 rng(id + 1);
    std::uniform_int_distribution <int> dist(1, 100);

    for (int i = 0; i < count; ++i) {
        int value = dist(rng);
        buf.produce(value);
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
}

void consumer(BoundedBuffer& buf, int id, int count) {
    for (int i = 0; i < count; ++i) {
        int value = buf.consume();
        // 模拟处理时间
        std::this_thread::sleep_for(std::chrono::milliseconds(80));
    }
}

int main() {
    const std::size_t buffer_capacity = 5;
    const int items_per_thread = 10;

    BoundedBuffer buf(buffer_capacity);

    std::thread p1(producer, std::ref(buf), 1, items_per_thread);
    std::thread p2(producer, std::ref(buf), 2, items_per_thread);
    std::thread c1(consumer, std::ref(buf), 1, items_per_thread);
    std::thread c2(consumer, std::ref(buf), 2, items_per_thread);

    p1.join(); p2.join(); c1.join(); c2.join();

    std::cout << "All work finished.\n";
    return 0;
}

关键点说明

  1. 条件变量的使用

    • not_full_ 由生产者等待,直到缓冲区非满。
    • not_empty_ 由消费者等待,直到缓冲区非空。
      两者通过 wait 的谓词形式确保即使出现“假唤醒”也能安全继续。
  2. 互斥锁与 unique_lock
    std::unique_lock 在等待时会自动释放互斥锁,等到条件满足后重新加锁。这样避免了手动解锁/加锁的繁琐。

  3. 公平性与死锁
    采用 notify_one() 可以保证每次唤醒一个等待线程,减少资源浪费。由于生产者与消费者都只持锁一次,且不持锁调用 notify_one(),因此不存在死锁。

  4. 多线程安全的 queue
    标准 std::queue 本身并非线程安全,所有对其的访问都需要在 mtx_ 保护下完成。

4. 扩展思考

  • 生产者与消费者数量不等:可以通过不同的 count 或者动态线程池来适配不同负载。
  • 无界缓冲区:将 capacity_ 设置为 SIZE_MAX,并去掉相关条件变量即可。
  • 使用 std::shared_mutex:读多写少的场景可以改用共享锁提升并发性能。
  • 异常安全:在 produceconsume 里抛出异常时,unique_lock 会自动释放锁,保持资源一致性。

5. 结语

上述代码演示了如何在 C++ 标准库中借助线程、互斥锁和条件变量,实现一个高效、健壮的生产者-消费者模型。掌握这些同步原语后,你可以在更复杂的场景中自由构建多线程程序,如工作池、任务调度、异步 I/O 等。祝你编码愉快!

发表评论