史上最简单的 ArrayBlockingQueue 源码解析

it2024-12-09  21

ArrayBlockingQueue的实质就是一个数组加上两个起始标志位。重要参数如下

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * Serialization ID. This class relies on default serialization * even for the items array, which is default-serialized, even if * it is empty. Otherwise it could not be declared final, which is * necessary here. */ private static final long serialVersionUID = -817911632652898426L; /** The queued items */ //构造对象时初始化的数组 final Object[] items; /** items index for next take, poll, peek or remove */ //开始标记位的标记,take,poll,peek,remove方法调用时取到的数组位置 int takeIndex; /** items index for next put, offer, or add */ //结尾标记位的标记,put, offer,add方法调用时取到的数组位置 int putIndex; /** Number of elements in the queue */ //队列中元素的个数 int count;

然后让我们来看一下put方法(offer和add方法只是做略微改动)

public void put(E e) throws InterruptedException { //校验元素非空 checkNotNull(e); final ReentrantLock lock = this.lock; //加锁 lock.lockInterruptibly(); try { while (count == items.length) //当数组满了,进行阻塞,等待消费者消费或删除元素 notFull.await(); //重点 enqueue(e); } finally { //释放锁 lock.unlock(); } }

可以看到put(E e)方法的重点是 enqueue(e)   offer和add同理

private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; //将元素放入队列中 items[putIndex] = x; //如果putIndex 已经在队列最后一个位置,则 ++putIndex 从0位置开始继续存放 if (++putIndex == items.length) putIndex = 0; //队列长度++; count++; //通知消费者可以消费,或删除元素 notEmpty.signal(); }

那让我们看看offer方法和add方法的不同之处吧

public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) //队列已满,不进行阻塞,直接返回false return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }

可以看到offer(E e)方法不进行阻塞等待直接返回boolean的操作结果

让我们再来看看add方法

public boolean add(E e) { if (offer(e)) return true; else //插入失败直接抛出IllegalStateException异常 throw new IllegalStateException("Queue full"); }

add(E e)方法则是不进行阻塞,失败直接抛出异常。

 

看完了添加元素的方法,在让我们来看看获取元素 take()方法

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //加锁 lock.lockInterruptibly(); try { while (count == 0) //当没有元素时阻塞,等待添加元素唤醒 notEmpty.await(); //重点 return dequeue(); } finally { //释放锁 lock.unlock(); } }

理所当然,让我们来看看dequeue()方法

private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") //获取元素 E x = (E) items[takeIndex]; //将当前位置置为null items[takeIndex] = null; 如果takeIndex 已经在队列最后一个位置,则 ++takeIndex 从0位置开始继续获取 if (++takeIndex == items.length) takeIndex = 0; //记录元素个数 count--; if (itrs != null) //对迭代器进行操作,有兴趣的同学可以自行研究 itrs.elementDequeued(); //唤醒元素添加方法,表示数组不满,可以添加元素 notFull.signal(); //返回元素 return x; }

这大概就是ArrayBlockingQueue 的运行原理:

一个数组,加上两个标志位

 

最新回复(0)