在现代 C++ 中,多线程编程已成为解决高并发问题的核心手段。本文将以生产者-消费者模型为例,展示如何利用标准库中的线程、互斥锁、条件变量以及线程安全队列来实现一个简单而完整的多线程程序。通过代码示例,你将能够快速掌握:
std::thread的创建与管理std::mutex与std::unique_lock的使用std::condition_variable的同步机制- 如何安全地在多线程之间传递数据
1. 背景与需求
生产者-消费者模型描述了两类线程:生产者产生数据并放入共享缓冲区,消费者从缓冲区取出数据进行处理。核心挑战是:
- 生产者不应在缓冲区已满时继续写入;
- 消费者不应在缓冲区为空时继续读取;
- 两类线程必须同步,避免竞争条件和死锁。
2. 设计思路
我们使用一个固定大小的循环缓冲区(ring buffer)来存放生产的数据。为保证线程安全,采用以下同步机制:
| 组件 | 作用 |
|---|---|
std::mutex |
保护缓冲区的读写操作 |
std::condition_variable |
生产者等待缓冲区非满,消费者等待缓冲区非空 |
std::unique_lock |
与 condition_variable 配合使用,允许条件变量在等待时自动释放互斥锁 |
3. 代码实现
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <random>
class BoundedBuffer {
public:
explicit BoundedBuffer(std::size_t capacity)
: capacity_(capacity) {}
// 生产者调用
void produce(int item) {
std::unique_lock<std::mutex> lock(mtx_);
not_full_.wait(lock, [this] { return buffer_.size() < capacity_; });
buffer_.push(item);
std::cout << "Produced: " << item << "\n";
not_empty_.notify_one(); // 通知消费者
}
// 消费者调用
int consume() {
std::unique_lock<std::mutex> lock(mtx_);
not_empty_.wait(lock, [this] { return !buffer_.empty(); });
int item = buffer_.front();
buffer_.pop();
std::cout << "Consumed: " << item << "\n";
not_full_.notify_one(); // 通知生产者
return item;
}
private:
std::size_t capacity_;
std::queue <int> buffer_;
std::mutex mtx_;
std::condition_variable not_empty_;
std::condition_variable not_full_;
};
void producer(BoundedBuffer& buf, int id, int count) {
std::mt19937 rng(id + 1);
std::uniform_int_distribution <int> dist(1, 100);
for (int i = 0; i < count; ++i) {
int value = dist(rng);
buf.produce(value);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
void consumer(BoundedBuffer& buf, int id, int count) {
for (int i = 0; i < count; ++i) {
int value = buf.consume();
// 模拟处理时间
std::this_thread::sleep_for(std::chrono::milliseconds(80));
}
}
int main() {
const std::size_t buffer_capacity = 5;
const int items_per_thread = 10;
BoundedBuffer buf(buffer_capacity);
std::thread p1(producer, std::ref(buf), 1, items_per_thread);
std::thread p2(producer, std::ref(buf), 2, items_per_thread);
std::thread c1(consumer, std::ref(buf), 1, items_per_thread);
std::thread c2(consumer, std::ref(buf), 2, items_per_thread);
p1.join(); p2.join(); c1.join(); c2.join();
std::cout << "All work finished.\n";
return 0;
}
关键点说明
-
条件变量的使用
not_full_由生产者等待,直到缓冲区非满。not_empty_由消费者等待,直到缓冲区非空。
两者通过wait的谓词形式确保即使出现“假唤醒”也能安全继续。
-
互斥锁与 unique_lock
std::unique_lock在等待时会自动释放互斥锁,等到条件满足后重新加锁。这样避免了手动解锁/加锁的繁琐。 -
公平性与死锁
采用notify_one()可以保证每次唤醒一个等待线程,减少资源浪费。由于生产者与消费者都只持锁一次,且不持锁调用notify_one(),因此不存在死锁。 -
多线程安全的
queue
标准std::queue本身并非线程安全,所有对其的访问都需要在mtx_保护下完成。
4. 扩展思考
- 生产者与消费者数量不等:可以通过不同的
count或者动态线程池来适配不同负载。 - 无界缓冲区:将
capacity_设置为SIZE_MAX,并去掉相关条件变量即可。 - 使用
std::shared_mutex:读多写少的场景可以改用共享锁提升并发性能。 - 异常安全:在
produce或consume里抛出异常时,unique_lock会自动释放锁,保持资源一致性。
5. 结语
上述代码演示了如何在 C++ 标准库中借助线程、互斥锁和条件变量,实现一个高效、健壮的生产者-消费者模型。掌握这些同步原语后,你可以在更复杂的场景中自由构建多线程程序,如工作池、任务调度、异步 I/O 等。祝你编码愉快!