JDK7 ArrayBlockingQueue源码分析

阻塞队列提供两个附加的操作,允许阻塞的插入操作和阻塞的移除操作。

  • 阻塞的插入操作:当队列满时,插入操作将一直阻塞,直到队列不满,插入操作才能进行并返回。
  • 阻塞的移除操作:当队列空时,移除操作将一直阻塞,直到队列非空,移除操作才能进行并返回。

阻塞队列常用于生产者和消费者的场景。生产者线程负责往队列添加数据,消费者线程负责从队列中取出数据进行逻辑处理。

JDK的线程池ThreadPoolExecutor内部的工作队列就是用阻塞队列来维护的。

阻塞队列是线程安全的类,适用于在多线程环境下。

1. ArrayBlockingQueue实现原理

ArrayBlockingQueue是阻塞队列中最常用的一种,它是通过内部持有的可重入锁(ReentrantLock)来维护线程安全性的。其中每一个操作都必须先持有该锁,才能进行操作,并且在操作结束后释放锁。

ArrayBlockingQueue是一个有界阻塞队列,内部维护一个Object类型的数组items,通过下标索引标识putIndextakeIndex标识插入的位置和移除的位置,用count来标识当前队列里面的元素个数。

1.1 等待/通知 模型

ArrayBlockingQueue是基于等待/通知 模型来实现阻塞功能的。在其内部维护了一个ReentrantLock和两个关联的Condition:notEmptynotFull。阻塞的插入操作在判断到队列满的时候会调用notFull.await()进行等待,该等待将由成功的移除操作唤醒——成功移除会调用notFull.signal()。而阻塞的移除操作在判断到队列为空的时候会调用notEmpty.await()进行等待,该等待将由成功的插入操作来唤醒——成功的插入会调用notEmpty.signal()

下面是ArrayBlockingQueue内部的封装的插入方法和移除方法,对外公开的插入和移除操作都会调用这两个内部方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 插入操作将调用notEmpty.signal()唤醒等待中的移除操作线程
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
// 移除操作将调用notFull.signal()唤醒等待中的插入操作线程
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}

接下来我们分析ArrayBlockingQueue的一些常用操作:

2. 插入元素方法

2.1 会抛异常的add方法

add方法通过调用父类AbstractQueue的add方法来实现元素插入,查看父类add方法源码发现,方法内部会调用子类即ArrayBlockingQueue实现的offer方法执行添加。当添加失败(offer方法返回false),抛出IllegalStateException(“Queue full”)的异常。

1
2
3
4
5
6
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

2.2 offer方法

offer方法需要判断当前队列是否已满,如果满的话返回false,否则插入元素并返回true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}

2.3 超时等待的offer方法

ArrayBlockingQueue还提供了允许超时等待的offer操作。方法首先会判断当前队列是否已满,如果队列已满,则调用内部持有的ReentrantLock关联的Condiction notFull.awaitNanos方法来实现超时等待。在等待期间,如果队列有元素被移除(其他线程会调用notFull.signal()来唤醒在notFull上等待的线程),那么等待的线程将有机会被唤醒并重新获取锁,执行插入操作并返回true;如果超时等待结束,那么元素将不被插入,直接返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
insert(e);
return true;
} finally {
lock.unlock();
}
}

2.4 阻塞的put方法

阻塞的put操作其实跟超时等待的offer操作类似,只是在notFull上调用的是非超时的await方法,表示只有等到其他线程调用notFull.signal()才有机会被唤醒并重新获取锁,执行插入操作。

1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}

从lock的方式看阻塞的put操作是支持中断的。

3. 移除元素方法

3.1 poll方法

当队列为空返回null,否则调用extract方法获取队列头元素并将该元素移除出队列。在extract方法里面会调用notFull.signal()用于唤醒阻塞的插入线程。

3.2 超时等待的poll方法

超时等待原理与超时offer操作类似,这里不再赘述。

3.3 阻塞的take方法

阻塞的take操作其实跟超时等待的poll操作类似,只是在notEmpty上调用的是非超时的await方法,表示只有等到其他线程调用notEmpty.signal()才有机会被唤醒并重新获取锁,执行移除操作。

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}

4. 取队头元素

取队列首元素的方法包括peek方法和element方法,这些方法只是返回了队列首元素,并不会在队列中将该元素移除。这里就不详细介绍这些方法,因为在实际应用中很少用到这些方法。有兴趣的同学可以自己翻翻源码看看。

坚持原创技术分享,您的支持将鼓励我继续创作!