8、阻塞队列 2022-02-15 19:33 ### 阻塞队列 ![](http://minio.riun.xyz/riun1/2022-01-07_1RdU1l2roiRevmckrs.jpg) #### 基本介绍 什么是阻塞队列? 1、它是一个队列,可以存数据,可以取数据,有队列的基本功能。先进先出(FIFO)。 2、它是有阻塞功能的队列。什么是阻塞呢?阻塞就是线程执行的时候停在这,达到特定条件又自动回复。这就叫阻塞。 应用在队列里就是:当队列没有数据的时候,若从队列中取数据,就会发生阻塞,当前取数据的线程会停在这,一直等到队列中有数据,然后它恢复执行; 当队列中数据满了的时候,若往队列里放数据,就会阻塞,当前放数据的线程就会停止,一直等到队列中有空位能放数据,当前线程恢复执行。 当然了,并不是说阻塞队列只有上述的功能:一取不出来就阻塞,一放不进去就阻塞。阻塞队列也是队列嘛,它也是有正常的队列方法的。 它只是对“存”和“取”的行为进行了“增强”,所以有不同的方法来完成“存”,“取”操作。 #### 阻塞队列家族体系 ![](http://minio.riun.xyz/riun1/2021-12-20_1K7rKkWZjEVlDSlXO0.jpg) #### ArrayBlockingQueue **由数组结构组成的有界阻塞队列。** ```java ArrayBlockingQueue<Integer> arrayBlockingQueue = new ArrayBlockingQueue<>(16); ``` ArrayBlockingQueue内部维护了一个数组Object[]:final Object[] items; 特点:定长。创建对象时初始化:this.items = new Object[capacity]; 默认维护非公平锁:lock = new ReentrantLock(fair); ```java public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity];//初始化Object数组 lock = new ReentrantLock(fair);//默认非公平锁 notEmpty = lock.newCondition();//创建两个Condition,分别用于:队列空时不能再取数据;队列满时不能再放数据。 notFull = lock.newCondition();//就是字面意思,如果notEmpty是醒着的话(不是await状态),那就代表当前队列“非空”,就是说有数据,那就可以取数据。后面会有详细介绍。 } ``` ##### 放入元素(入队): 目的是放入元素,就是将元素放在数组中。因不同情况,在不同方法中做出不同的行为。 不能放空元素。 ###### 1、offer(e) 特殊值:放入元素,并返回true或false。 队列未满时(count != items.length),添加,返回true;若队列满了,则返回false。 ```java public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { //当前队列中的元素个数和长度相等,则队列满了 if (count == items.length) return false; else { //没满就入队 enqueue(e); return true; } } finally { lock.unlock(); } } ``` 其中,checkNotNull(e): ```java private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); } ``` **所以不能放入空元素。** 初始时,count是0,items.length是队列的长度,肯定>0,所以可以放入元素,会执行enqueue(e)。而enqueue(e)就是所有存数据都会用到的最基本的存数据方法: ```java /** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; //直接放入元素(putIndex永远指向能放入元素的索引位置。) items[putIndex] = x; //此元素放入后队列满了,那么putIndex置为0 if (++putIndex == items.length) putIndex = 0; //count代表当前队列中的元素个数 count++; //唤醒另一个线程,唤醒的是notEmpty,也就是因为空而取不出数据而等待的线程。 notEmpty.signal(); } ``` ###### 2、offer(e, timeout, unit) 超时:在offer的基础上添加等待时间,等待时间内,若队列有空位置可以放入元素,则放入。 队列未满时(count != items.length),直接添加,返回true; 若在等待的时间内notFull.awaitNanos(nanos),队列有空位置可储存元素,则直接添加,返回true;若没有,则等待时间结束后,返回false。等待timeout时间, ```java public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //当队列满时,等待timeout时间。若等待时间到了后,队列仍满着,则返回false while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } //队列没满时,入队列(等待时间过去后,队列有空出的位置了,入队列) enqueue(e); return true; } finally { lock.unlock(); } } ``` ###### 3、add(e) 异常:放入返回true,放不进去抛出异常。 add内部直接使用的是offer(e)。当offer(e)返回true时,返回true;当offer(e)返回false时,直接抛出异常 ```java //ArrayBlockingQueue public boolean add(E e) { return super.add(e); } //AbstractQueue public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } ``` ###### 4、put(e) 阻塞:放不进去就阻塞(await),直到可以放进去。 ```java public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } ``` 以上四种入队操作在真实入队时,都是使用的enqueue(e)。 ##### 取出元素(出队): ###### 1、poll() 弹出队列最前面的元素,并返回该元素,或返回null。 从队列首部取出元素,队列不为空时,返回该元素;队列为空时,返回null。 ```java public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; } ``` takeIndex随时都指向要取出元素的索引位置。取出的元素的索引位置置为null。 notFull.signal(); 唤醒那些由于队列是满的但想要向队列中放入元素而阻塞的线程。告诉他们,notFull,不再满了,你们可以工作了。 对应上述中的4、put(e) ```java while (count == items.length) //想要放入元素,但由于队列满了,不能放入元素而阻塞等待。(释放锁) notFull.await(); ``` ###### 2、poll(timeout, unit) 弹出队列最前面的元素,并返回该元素,或返回null。 从队列首部取出元素,队列不为空时,返回该元素;队列为空时,等待timeout时间,再看看有没有元素。有元素,取出返回;没有,返回null。 ```java public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //没有元素时,就等待timeout时间,醒来还是没有元素,就返回null while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } //直接有元素,或者等待了一会才有元素,都取出元素,并返回 return dequeue(); } finally { lock.unlock(); } } ``` ###### 2、remove()、remove(e) 移除首个元素:调用poll() ```java public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); } ``` 移除特定元素,并返回true或者false。 移除队列首个符合的元素,然后停下。若有此元素,返回true;没有此元素,返回false ```java public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items[i])) { //移除后,立马返回 removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } } ``` removeAt(i): ```java /** * Deletes item at array index removeIndex. * Utility for remove(Object) and iterator.remove. * Call only when holding lock. */ void removeAt(final int removeIndex) { // assert lock.getHoldCount() == 1; // assert items[removeIndex] != null; // assert removeIndex >= 0 && removeIndex < items.length; final Object[] items = this.items; if (removeIndex == takeIndex) { // removing front item; just advance items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); } else { // an "interior" remove // slide over all others up through putIndex. final int putIndex = this.putIndex; for (int i = removeIndex;;) { int next = i + 1; if (next == items.length) next = 0; if (next != putIndex) { items[i] = items[next]; i = next; } else { items[i] = null; this.putIndex = i; break; } } count--; if (itrs != null) itrs.removedAt(removeIndex); } notFull.signal(); } ``` ###### 3、take() 弹出队列最前面的元素,没有就阻塞等待。 队列不为空时,返回第一个元素;否则,阻塞等待:notEmpty.await(); ```java public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //队列中没有元素了 while (count == 0) //等待 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } ``` 对应offer(e)的enqueue(e)中notEmpty.signal(); 他们放入元素就将notEmpty唤醒,告诉他们,队列中不再是空了,有元素了。 ##### 检查元素 ###### 1、peek() 返回队列的首个元素,不会移除元素。返回该元素或null。 返回当前队列中的首个元素,仅返回,不会移除元素。如果队列空了,则返回的是null(dequeue()时,将取出的元素所在位置置为了null) ```java public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } } final E itemAt(int i) { return (E) items[i]; } ``` ###### 2、element() 抛出异常:内部调用peek(),若返回的是null,则抛出异常;否则,正常返回。 ```java public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); } ``` ##### 总结 放入元素的四种方法:offer(e)、offer(e, timeout, unit)、add(e)、put(e)在真正向队列中放元素,也就是向Object[]中放元素时,都是调用了enqueue(e)。而enqueue(e)在放入元素后,就会执行notEmpty.signal(); 即唤醒等待(想取元素却阻塞)的线程。 取出元素的四种方法:poll()、poll(e, timeout, unit)、remove(e)、take(e)中,有三种都是使用dequeue()取出元素,dequeue()取出元素后,会执行notFull.signal(); 而remove(e)是使用removeAt(i)移除元素,removeAt(i)移除元素后,也会执行notFull.signal();。所以这四种方法在取出元素后都会执行notFull.signal(); 即唤醒等待的线程。 ArrayBlockingQueue中所有(可能产生线程不安全操作的)方法使用同一把锁lock,在方法开始时加锁,结束时解锁,因此不会产生线程安全的问题。我们可以看到它在判断是否还有空余容量时`if (count == items.length)`使用的count并不是原子类,而是普通的int,就是因为已经加锁了,不会有其他线程对其(count)同步操作。 综上: 在存入元素时,想要阻塞,可以使用put(e)(notFull.await(););想要阻塞一会,则使用offer(e, timeout, unit)(notFull.awaitNanos(nanos);)。而这些阻塞会被任意的取出元素的方法所唤醒:notFull.signal(); 在取出元素时,想要阻塞,可以使用take(e)(notEmpty.await(););想要阻塞一会,则使用poll(e, timeout, unit)(notEmpty.awaitNanos(nanos);)。而这些阻塞会被任意的存入元素的方法所唤醒:notEmpty.signal(); takeIndex维护着队列中首个将要取出的元素位置的索引,putIndex维护着队列中将要放入的位置的索引。 ![](http://minio.riun.xyz/riun1/2021-12-20_1Kdzc1hA05DJnaoUz3.jpg) 取出元素后,takeIndex后移;存入元素后,putIndex后移。 队列元素全部取出时,takeIndex移动到最后,将该位置元素取出,随即就会重置为0。 ```java //dequeue() if (++takeIndex == items.length) takeIndex = 0; ``` 队列已经存满时,putIndex也会置为0。 ```java //enqueue(e) if (++putIndex == items.length) putIndex = 0; ``` ![](http://minio.riun.xyz/riun1/2021-12-16_1IBKjAbXoNLWNmYA91.jpg) #### LinkedBlockingQueue ```java LinkedBlockingQueue<Integer> linkedBlockingQueue = new LinkedBlockingQueue<>(5); ``` LinkedBlockingQueue内部维护了一个链表Node<E>,理论上链表是可以动态增加节点、物理上没有容量限制的,但是可以给其指定一个逻辑容量,表明这个链表我最多允许它放置的节点数量。逻辑容量靠代码维护。 当指定容量时,设置其容量为指定值。不指定容量时,默认容量是Integer.MAX_VALUE,即21亿多,可以理解为无限大。 ```java /** 链表容量 */ private final int capacity; /** 当前节点数量 */ private final AtomicInteger count = new AtomicInteger(); public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //初始化链表,只有一个节点,既是头,又是尾。头结点数据域null last = head = new Node<E>(null); } ``` Node节点: ```java /** * Head of linked list. * Invariant: head.item == null */ transient Node<E> head; /** * Tail of linked list. * Invariant: last.next == null */ private transient Node<E> last; /** * Linked list node class */ static class Node<E> { //数据域 E item; //指针域 Node<E> next; Node(E x) { item = x; } } ``` ##### 放入元素(入队): 目的是放入元素,就是将元素放在链表中。因不同情况,在不同方法中做出不同的行为。 不能放空元素。 ###### 1、offer(e) 放入元素:成功则返回true;若队列已经满了则返回false。 与ArrayBlockingQueue相比,这里没有在整个方法上使用lock锁,而是局部lock+AtomicInteger原子类的方式,保证了多线程下同步操作的线程安全。 ```java public boolean offer(E e) { if (e == null) throw new NullPointerException(); //队列满了,直接返回false final AtomicInteger count = this.count; if (count.get() == capacity) return false; //没满,则准备放入元素 int c = -1; Node<E> node = new Node<E>(e); //获取公共put锁,并尝试lock()加锁 final ReentrantLock putLock = this.putLock; putLock.lock(); try { //所内再次判断,防止同步操作时的情况。有点单例模式中的加锁双if写法的感觉。 if (count.get() < capacity) { //队列没满,入对 enqueue(node); //count自增,注意这里是先get再自增,也就是说c是自增之前的值。 c = count.getAndIncrement(); //c + 1是当前元素放入之后,当前队列中的元素个数。如果个数没超过容量的话,就没满,唤醒一下没满的Condition。 if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } //c是放入该元素之前队列中元素的个数。如果之前元素个数是0的话,说明之前队列是空的,那么很可能notEmpty在await状态,这里唤醒一下。 if (c == 0) signalNotEmpty(); //初始值是-1,如果进入if (count.get() < capacity)里的话,肯定会将元素放进去,所以c肯定不会是-1。所以只要>=0,就成功了。 return c >= 0; } private void enqueue(Node<E> node) { //尾指针的next指向node(连接);并将尾指针后移,指向最后一个元素node(维护尾指针)。 last = last.next = node; } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; //告诉的前提的先拿到take锁,因为如果没有那take锁,这个线程刚把元素放进去,另一个线程立马拿着take锁就把元素取了出来,那么 takeLock.lock(); try { //告诉其他线程,队列非空。 notEmpty.signal(); } finally { takeLock.unlock(); } } ``` ###### 2、offer(e, timeout, unit) 放入元素:在offer(e)的基础上添加等待时间,等待时间内,若队列有空位置可以放入元素,则放入,并返回true。若等待后仍没有空余位置,则返回false。 ```java public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; } ``` ###### 3、add(e) 放入元素:成功则返回true;队列满了,放入失败则抛出异常。 和ArrayBlockingQueue一样,因为他们的add()都是用的同一个父类----AbstractQueue的。在add()内,调用各自实现的offer(e)。 ```java public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } ``` ###### 4、put(e) 阻塞式的放入元素:若队列有空余位置,则放入元素;没有则一直阻塞等待,直到有空余位置,再放入元素。 ```java public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); //获取put锁,并上锁 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { //如果当前队列元素个数和容量相等,那队列已满。notFull进入await状态。 while (count.get() == capacity) { notFull.await(); } //将元素入队 enqueue(node); c = count.getAndIncrement(); //c+1的值是count,当前元素个数 < 容量 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } //唤醒notEmpty,这是每次放入元素时,ArrayBlockingQueue和LinkedBlockingQueue都会做的事情(取元素时没有元素notEmpty会await,这里放入元素了,队列里有元素了,要唤醒notEmpty,让取元素的线程取) if (c == 0) signalNotEmpty(); } ``` ##### 取出元素(出队) ###### 1、poll() 取出元素:取出队列首部元素,并返回。没有元素则返回null。 ```java public E poll() { //队列是否已空,空就返回null final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; //队列不空,获取take锁,并尝试上锁 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //锁内再判断,经典的加锁双if写法。 if (count.get() > 0) { //队列不空,就出队一个 x = dequeue(); //先get,再自减。这里的c是出队之前队列中元素的个数 c = count.getAndDecrement(); //如果之前队列个数大于1,那么出队之后队列也不会空。唤醒notEmpty if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } //如果之前队列元素个数和容量相等,出队之后队列就不再满了,唤醒notFull if (c == capacity) signalNotFull(); return x; } //这里出队的方式是:头指针不要,然后将head指向第一个元素,取出第一个元素的数据域item, private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } ``` ###### 2、poll(timeout, unit) 取出元素:在poll()的基础上添加等待时间,等待时间内,如果队列有元素则取出。等待时间后,如果队列内仍没有元素则返回null。 ```java public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } ``` ###### 3、remove()、remove(e) 移除元素,调用poll(),若能取出元素,则返回;若取出null,则抛出异常。 ```java public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); } ``` 移除特定元素 ```java public boolean remove(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } } ``` ###### 4、take() 阻塞式的取出元素:若队列有元素,则取出并返回;若没有,则一直阻塞,等待有元素后便取出,并返回。 ```java public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; //获取take锁,并上锁 final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //当前队列元素个数为0的话,等待,notEmpty进入await状态 while (count.get() == 0) { notEmpty.await(); } //元素个数不是0,就出队 x = dequeue(); //这里是先get再自减,get到的是自减前的 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } //如果自减前的元素个数c,和容量相等,则唤醒notFull if (c == capacity) signalNotFull(); return x; } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } ``` ###### dequeue()详解: > 由于dequeue()内部没有涉及尾指针的移动,为了避免干扰,我就没有将其画出来。 > > = 赋值号是将后者的内容赋给前者,对于引用类型变量来说,内部储存的都是地址值,所以就是将地址值赋给别的变量。 > > int double 这种是基本类型变量(8大基本类型) > > 除了基本类型都是引用类型。 Node节点: ![](http://minio.riun.xyz/riun1/2021-12-24_1LJ7NG11eQXuSZFM8U.jpg) ![](http://minio.riun.xyz/riun1/2021-12-24_1LJ83zqCquf9c8Vj01.jpg) 比如现有一个链表,储存了1,2两个元素: ![](http://minio.riun.xyz/riun1/2021-12-24_1LIINp3cTNxhnMDy0S.jpg) 执行Node<E> h = head; 把head中的值0x00赋给h: ![](http://minio.riun.xyz/riun1/2021-12-24_1LIJZW5Q4fOeO8ubDW.jpg) 然后执行Node<E> first = h.next; 把h.next的值赋给first,h.next就是0x00地址空间的next域,就是0x11:(一个变量储存了一个地址,我们就说它指向了那里。上幅图忘记连上h那条线了。) ![](http://minio.riun.xyz/riun1/2021-12-24_1LIMlqMJCusGljRspF.jpg) 然后执行h.next = h; h是0x00,h.next就是0x00地址空间的next域。这里把0x00地址空间的next域置为0x00: ![](http://minio.riun.xyz/riun1/2021-12-24_1LIPRWauS2OvBctOby.jpg) 这句代码后面也标注了注释:help GC。就是帮助GC垃圾回收。0x00自己的next域储存自己,然后后面再把head指向别的地方,h又是函数内的局部变量,函数结束后就会消失,所以0x00这片空间就没有被其他地方引用,方便垃圾回收器找到并回收它(0x00这块空间)。 然后执行head = first; 将first的东西赋给head: ![](http://minio.riun.xyz/riun1/2021-12-24_1LIUCyBUP9B0ThNaLl.jpg) 然后执行E x = first.item; 将first的item域的数据拿出来,给x,这里的x后面会return出去,就是**拿到我们要出队的元素,返回了**。 然后执行first.item = null; 就是将0x11这个地址空间的数据域置为null,反正它的数据我们也拿到了(x),就将它作为头指针: ![](http://minio.riun.xyz/riun1/2021-12-24_1LIZ50lWmGn0rVPfZD.jpg) first也是函数内的局部变量,函数执行完成后也会消失。所以执行完dequeue()之后: ![](http://minio.riun.xyz/riun1/2021-12-24_1LJ0EgPTl9v5UeEyeB.jpg) 变成了开始的样子。经过dequeue(),成功将第一个元素出队。 #### 总结与对比 总结: 大体执行流程上,LinkedBlockingQueue与ArrayBlockingQueue相同,但是LinkedBlockingQueue在方法执行时,不是全方法加锁,而是阶段加锁,然后通过使用原子类AtomicInteger等,保证执行时的线程安全。并且多了许多优化(放入元素时不止会唤醒notEmpty,在满足特定条件也会唤醒notFull;取出元素时不止会唤醒notFull,满足特点条件也会唤醒notEmpty),所以LinkedBlockingQueue较ArrayBlockingQueue更经常使用。 对比: | 方法特性\方法类型 | 添加元素 | 取出元素 | | ------------------ | ----------------------------------- | -------------------------------------- | | 返回特定值(即时) | offer(e):true/false | poll():E/null remove(e):true/false | | 抛出异常(即时) | add(e):true/IllegalStateException | remove():E/NoSuchElementException | | 超时(等待) | offer(e, timeout, unit):true/false | poll(timeout, unit):E/null | | 阻塞(等待) | put(e):void,放不进去就一直等着 | take():E,没有元素可取就一直等着 | #### 阻塞队列小练习 ```java package com.example.demo.core; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author: HanXu * on 2021/12/20 * Class description: 阻塞队列小练习 * 主线程向容量为5的阻塞队列ArrayBlockingQueue中放入5个元素:1,2,3,4,5 * 另一个线程(线程池中的,假设他叫线程B)在这之后想要往队列中放入一个元素:6 * 尝试了一下,没放进去,然后就阻塞的放。(等待队列中有空闲空间) * * 主线程放入5个元素之后接着就睡了1秒,然后弹出一个元素 * * 线程B向队列中放入元素成功,接着shutdown掉线程池,程序结束。 */ public class ArrayBlockingQueueDemo { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 20, 120, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy()); ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<Integer>(5); arrayBlockingQueue.offer(1); arrayBlockingQueue.offer(2); arrayBlockingQueue.offer(3); arrayBlockingQueue.offer(4); arrayBlockingQueue.offer(5); threadPoolExecutor.execute( () -> { boolean offer = arrayBlockingQueue.offer(6); System.out.println(Thread.currentThread().getName() + ":" + "我想要把6放进去,成功了吗?:" + offer); if (offer) { System.out.println(arrayBlockingQueue.toString()); return; } System.out.println(Thread.currentThread().getName() + ":" + "没有成功,那我使用阻塞的方式放入元素"); try { arrayBlockingQueue.put(6);//notFull.await() } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ":" + "6放进去了吗?:" + arrayBlockingQueue.toString()); if (arrayBlockingQueue.contains(6)) { threadPoolExecutor.shutdown(); } } ); System.out.println(Thread.currentThread().getName() + ":" + "我放了5个元素进去我睡一会"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ":" + "我弹出一个元素吧"); Object poll = arrayBlockingQueue.poll();//notFull.signal() System.out.println(Thread.currentThread().getName() + ":" + "弹出的元素是:" + poll.toString()); } } /* main:我放了5个元素进去我睡一会 pool-1-thread-1:我想要把6放进去,成功了吗?:false pool-1-thread-1:没有成功,那我使用阻塞的方式放入元素 main:我弹出一个元素吧 main:弹出的元素是:1(这个输出和上面一句不一定挨着,因为线程抢占资源的结果不一定) pool-1-thread-1:6放进去了吗?:[2, 3, 4, 5, 6] Process finished with exit code 0 */ ``` --END--
发表评论