我们可以把这个模型想象成工厂里的两条流水线,我们管他们叫生产者流水线和消费者流水线,生产者流水线生产出来的产品给消费者流水线使用,其中生产者流水线先把生产出来的产品放在仓库,然后消费者流水线再去仓库拿。这个仓库就叫做阻塞队列。
下面我们先来设计一下这个仓库(阻塞队列):
③阻塞队列是如何实现阻塞的?wait/notify 或者 await/signal都可以实现阻塞与唤醒操作。
import java.util.LinkedList; import java.util.Queue; public class BlockQueueplus { Queue<Integer> queue = new LinkedList(); int capacity ; //阻塞队列的容量 public BlockQueueplus(int capacity) { this.capacity = capacity; } /** * 堆代码 duidaima.com * 将数据放入阻塞队列中 * @param i 放入的元素 * @throws InterruptedException */ public synchronized void put(Integer i) throws InterruptedException { while(capacity <= queue.size()){ wait(); } queue.offer(i); System.out.println(Thread.currentThread().getName() + "生产了value, value的当前值是" + i ); notify(); } /** * 从阻塞队列中取出数据 * @return * @throws InterruptedException */ public synchronized int take() throws InterruptedException { if(size() == 0){ wait(); } Integer result = queue.poll(); System.out.println(Thread.currentThread().getName() + "消费了value, value的当前值是" + result ); notify(); return result; } public int size(){ return queue.size(); } public Boolean isEmpty(){ return queue.isEmpty(); } public Boolean isFull(){ return this.size()==this.capacity; } }synchronized关键字用在方法上的作用
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class Work{ private static BlockQueueplus blockQueueplus = new BlockQueueplus(100); public void set(int i) { try { blockQueueplus.put(i); }catch (InterruptedException e) { e.printStackTrace(); } } public void get() { try { Integer i = blockQueueplus.take(); } catch (InterruptedException e) { e.printStackTrace(); } } }测试类
class WorkTest { public static void main(String[] args) { Work work = new Work(); Runnable producerRunnable = new Runnable() { @Override public void run() { for (int i = 0; i < Integer.MAX_VALUE; i++) work.set(i); } }; Runnable customerRunnable = new Runnable() { @Override public void run() { for (int i = 0; i < Integer.MAX_VALUE; i++) work.get(); } }; Thread ProducerThread = new Thread(producerRunnable); ProducerThread.setName("Producer"); Thread ConsumerThread = new Thread(customerRunnable); ConsumerThread.setName("Consumer"); ProducerThread.start(); ConsumerThread.start(); } }运行结果: