[JAVA探索之路]带你手写多线程实现生产者-消费者模型
目录引言一、什么是生产者-消费者模型二、为什么多线程下会有问题1. 数据不安全2. 仓库满了还继续放3. 仓库空了还继续取三、实现思路1. 定义一个仓库类 Buffer2. 提供两个方法3. 加锁保证线程安全4. 条件不满足时让线程等待5. 条件变化后唤醒其他线程四、代码实现五、代码详细解析1. 为什么要有 Buffer2. 为什么 put() 要加 synchronized3. 为什么仓库满了要 wait()4. 为什么仓库空了也要 wait()5. 为什么要用 while不能用 if6. 为什么要用 notifyAll()而不是 notify()六、总结引言在 Java 并发编程里生产者-消费者模型是一个非常经典的问题。很多人第一次接触它时会觉得概念有点绕什么是生产者什么是消费者为什么还要加一个缓冲区为什么要 wait()、notifyAll()其实把它想简单一点它就是一个“放东西”和“拿东西”的过程。比如生产者负责往仓库里放货消费者负责从仓库里拿货仓库有容量限制不能无限放如果仓库空了消费者就得等如果仓库满了生产者就得等这就是生产者-消费者模型的核心思想。这篇文章我们不使用 BlockingQueue 这样的现成工具而是用最基础的synchronized wait() notifyAll()手写一个多线程版的生产者-消费者模型。这样做的目的是为了真正理解 Java 线程通信的底层思路。一、什么是生产者-消费者模型生产者-消费者模型简单来说就是把“生产数据”和“消费数据”这两件事拆开中间通过一个共享缓冲区来连接。它通常包含 3 个角色生产者负责生成数据放到缓冲区里消费者负责从缓冲区中取出数据并处理缓冲区用于存放生产者产生的数据这样设计的好处是“解耦”。也就是说生产者不需要关心消费者处理得快不快消费者也不需要关心生产者什么时候生成数据。双方只需要和缓冲区打交道就行。现实开发中这种模型非常常见比如消息队列日志异步写入任务调度系统订单处理系统请求削峰填谷二、为什么多线程下会有问题如果只有一个线程这件事非常简单。但是一旦变成多个线程问题就来了。假设有多个生产者线程和多个消费者线程同时操作同一个仓库如果不加任何控制就可能出现下面这些问题1. 数据不安全多个线程同时修改同一个集合可能会导致数据错乱。2. 仓库满了还继续放如果仓库容量只有 5但生产者不检查容量就可能一直往里塞数据。3. 仓库空了还继续取如果仓库里已经没有数据了消费者还去取就会报错或者取到错误结果。所以多线程环境下我们必须解决两个关键问题线程同步保证同一时刻只有一个线程能操作共享资源线程通信当条件不满足时线程要等待条件满足后线程要被唤醒三、实现思路我们可以把这个问题拆成下面几步1. 定义一个仓库类 Buffer仓库内部维护一个有界队列用来保存数据。2. 提供两个方法put()生产者往仓库放数据take()消费者从仓库取数据3. 加锁保证线程安全使用 synchronized 保证同一时刻只能有一个线程进入 put() 或 take()。4. 条件不满足时让线程等待仓库满了生产者调用 wait()仓库空了消费者调用 wait()5. 条件变化后唤醒其他线程每次生产或消费完成后调用 notifyAll() 唤醒其他正在等待的线程。四、代码实现下面我们先来看完整代码然后再一段一段解释。import java.util.LinkedList; import java.util.Queue; class Buffer { private final QueueInteger queue new LinkedList(); private final int capacity; public Buffer(int capacity) { this.capacity capacity; } public synchronized void put(int value) throws InterruptedException { while (queue.size() capacity) { System.out.println(Thread.currentThread().getName() 发现仓库已满进入等待); wait(); } queue.offer(value); System.out.println(Thread.currentThread().getName() 生产了 value 当前库存 queue.size()); notifyAll(); } public synchronized int take() throws InterruptedException { while (queue.isEmpty()) { System.out.println(Thread.currentThread().getName() 发现仓库为空进入等待); wait(); } int value queue.poll(); System.out.println(Thread.currentThread().getName() 消费了 value 当前库存 queue.size()); notifyAll(); return value; } } class Producer implements Runnable { private final Buffer buffer; public Producer(Buffer buffer) { this.buffer buffer; } Override public void run() { int value 1; while (true) { try { buffer.put(value); Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } } } class Consumer implements Runnable { private final Buffer buffer; public Consumer(Buffer buffer) { this.buffer buffer; } Override public void run() { while (true) { try { buffer.take(); Thread.sleep(800); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } } } public class ProducerConsumerDemo { public static void main(String[] args) { Buffer buffer new Buffer(5); Thread producer1 new Thread(new Producer(buffer), 生产者1); Thread producer2 new Thread(new Producer(buffer), 生产者2); Thread consumer1 new Thread(new Consumer(buffer), 消费者1); Thread consumer2 new Thread(new Consumer(buffer), 消费者2); producer1.start(); producer2.start(); consumer1.start(); consumer2.start(); } }五、代码详细解析1. 为什么要有 BufferBuffer 就是共享仓库所有生产者和消费者都操作它。它里面最关键的两个成员是private final QueueInteger queue new LinkedList(); private final int capacity;queue 用来存放数据capacity 表示仓库最大容量这里我们用了 LinkedList 来模拟队列因为它支持先进先出比较符合“先生产先消费”的场景。2. 为什么 put() 要加 synchronizedpublic synchronized void put(int value) throws InterruptedException加上 synchronized 后表示同一时刻只能有一个线程进入这个方法。如果不加锁多个生产者线程可能同时往队列里加数据这样很容易出现线程安全问题。同理take() 也必须加锁。3. 为什么仓库满了要 wait()来看这段代码while (queue.size() capacity) { System.out.println(Thread.currentThread().getName() 发现仓库已满进入等待); wait(); }意思是如果仓库已经满了当前生产者就不要再生产了直接进入等待状态等到消费者消费掉一些数据后仓库有空间了再继续生产。这里的 wait() 不是“傻等”而是把当前线程挂起同时释放锁让其他线程有机会进入同步方法。这一点非常重要。如果线程等待时不释放锁那消费者就永远进不来仓库也永远腾不出空间程序就卡死了。4. 为什么仓库空了也要 wait()消费者的逻辑是一样的while (queue.isEmpty()) { System.out.println(Thread.currentThread().getName() 发现仓库为空进入等待); wait(); }如果仓库里没有数据消费者就不能硬取只能等生产者先放进去。5. 为什么要用 while不能用 if这是面试里特别爱问的一点。很多人第一次写会这样写if (queue.size() capacity) { wait(); }看起来好像没问题但实际上不够安全。原因是线程被唤醒之后不代表条件一定满足。比如某个生产者被唤醒了但它重新拿到锁时仓库可能又已经满了所以线程醒来后必须再次检查条件。这就是为什么要用 while一句话总结if 只检查一次while 会反复检查更安全。6. 为什么要用 notifyAll()而不是 notify()每次生产或消费完成后我们都调用了notifyAll();这表示唤醒所有在这个对象上等待的线程。为什么不用 notify()因为 notify() 只随机唤醒一个线程可能会唤醒“错误的人”。举个例子仓库已经满了现在有多个生产者在等待也有多个消费者在等待如果这时你用 notify()结果唤醒了另一个生产者但生产者醒来后发现仓库还是满的又继续等待真正应该被唤醒的消费者却没醒这样程序效率会很差甚至可能出现“假死”现象。所以在这种场景下notifyAll() 更稳妥。虽然它的唤醒范围更大一些但逻辑更安全也更容易写对。六、总结生产者-消费者模型本质上并不复杂它就是一个“仓库调度问题”生产者往仓库里放数据消费者从仓库里拿数据仓库满了生产者等待仓库空了消费者等待在 Java 中我们可以通过 synchronized wait() notifyAll() 手写实现这一套机制。你只要真正理解下面这几句话这个模型就算掌握了synchronized 负责加锁保证线程安全wait() 负责等待并释放锁notifyAll() 负责唤醒等待线程条件判断要用 while不能只用 if制作不易如果对你有帮助请点赞评论收藏感谢大家的支持