/* * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. * * * * * * * * * * * * * * * * * * * * *//* * * * * * * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */package java.util.concurrent;import java.util.concurrent.locks.*;import java.util.*;/** * An unbounded {
@linkplain BlockingQueue blocking queue} of * Delayed elements, in which an element can only be taken * when its delay has expired. The head of the queue is that * Delayed element whose delay expired furthest in the * past. If no delay has expired there is no head and poll * will return null. Expiration occurs when an element's * getDelay(TimeUnit.NANOSECONDS) method returns a value less * than or equal to zero. Even though unexpired elements cannot be * removed using take or poll, they are otherwise * treated as normal elements. For example, the size method * returns the count of both expired and unexpired elements. * This queue does not permit null elements. * *

This class and its iterator implement all of the * optional methods of the {

@link Collection} and {
@link * Iterator} interfaces. * *

This class is a member of the * * Java Collections Framework. * * @since 1.5 * @author Doug Lea * @param

the type of elements held in this collection */public class DelayQueue
extends AbstractQueue
implements BlockingQueue
{ private transient final ReentrantLock lock = new ReentrantLock(); private final PriorityQueue
q = new PriorityQueue
(); /** * Thread designated to wait for the element at the head of * the queue. This variant of the Leader-Follower pattern * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to * minimize unnecessary timed waiting. When a thread becomes * the leader, it waits only for the next delay to elapse, but * other threads await indefinitely. The leader thread must * signal some other thread before returning from take() or * poll(...), unless some other thread becomes leader in the * interim. Whenever the head of the queue is replaced with * an element with an earlier expiration time, the leader * field is invalidated by being reset to null, and some * waiting thread, but not necessarily the current leader, is * signalled. So waiting threads must be prepared to acquire * and lose leadership while waiting. */ private Thread leader = null; /** * Condition signalled when a newer element becomes available * at the head of the queue or a new thread may need to * become leader. */ private final Condition available = lock.newCondition(); /** * Creates a new
DelayQueue that is initially empty. */ public DelayQueue() {} /** * Creates a
DelayQueue initially containing the elements of the * given collection of { @link Delayed} instances. * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null */ public DelayQueue(Collection
c) { this.addAll(c); } /** * Inserts the specified element into this delay queue. * * @param e the element to add * @return
true (as specified by { @link Collection#add}) * @throws NullPointerException if the specified element is null */ public boolean add(E e) { return offer(e); } /** * Inserts the specified element into this delay queue. * * @param e the element to add * @return
true * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } } /** * Inserts the specified element into this delay queue. As the queue is * unbounded this method will never block. * * @param e the element to add * @throws NullPointerException { @inheritDoc} */ public void put(E e) { offer(e); } /** * Inserts the specified element into this delay queue. As the queue is * unbounded this method will never block. * * @param e the element to add * @param timeout This parameter is ignored as the method never blocks * @param unit This parameter is ignored as the method never blocks * @return
true * @throws NullPointerException { @inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); } /** * Retrieves and removes the head of this queue, or returns
null * if this queue has no elements with an expired delay. * * @return the head of this queue, or
null if this * queue has no elements with an expired delay */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } } /** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException { @inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } /** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue, * or the specified wait time expires. * * @return the head of this queue, or
null if the * specified waiting time elapses before an element with * an expired delay becomes available * @throws InterruptedException { @inheritDoc} */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); if (nanos <= 0) return null; if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } /** * Retrieves, but does not remove, the head of this queue, or * returns
null if this queue is empty. Unlike *
poll, if no expired elements are available in the queue, * this method returns the element that will expire next, * if one exists. * * @return the head of this queue, or
null if this * queue is empty. */ public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.peek(); } finally { lock.unlock(); } } public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } } /** * @throws UnsupportedOperationException { @inheritDoc} * @throws ClassCastException { @inheritDoc} * @throws NullPointerException { @inheritDoc} * @throws IllegalArgumentException { @inheritDoc} */ public int drainTo(Collection
c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; for (;;) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } return n; } finally { lock.unlock(); } } /** * @throws UnsupportedOperationException { @inheritDoc} * @throws ClassCastException { @inheritDoc} * @throws NullPointerException { @inheritDoc} * @throws IllegalArgumentException { @inheritDoc} */ public int drainTo(Collection
c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; while (n < maxElements) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } return n; } finally { lock.unlock(); } } /** * Atomically removes all of the elements from this delay queue. * The queue will be empty after this call returns. * Elements with an unexpired delay are not waited for; they are * simply discarded from the queue. */ public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { q.clear(); } finally { lock.unlock(); } } /** * Always returns
Integer.MAX_VALUE because * a
DelayQueue is not capacity constrained. * * @return
Integer.MAX_VALUE */ public int remainingCapacity() { return Integer.MAX_VALUE; } /** * Returns an array containing all of the elements in this queue. * The returned array elements are in no particular order. * *

The returned array will be "safe" in that no references to it are * maintained by this queue. (In other words, this method must allocate * a new array). The caller is thus free to modify the returned array. * *

This method acts as bridge between array-based and collection-based * APIs. * * @return an array containing all of the elements in this queue */ public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(); } finally { lock.unlock(); } } /** * Returns an array containing all of the elements in this queue; the * runtime type of the returned array is that of the specified array. * The returned array elements are in no particular order. * If the queue fits in the specified array, it is returned therein. * Otherwise, a new array is allocated with the runtime type of the * specified array and the size of this queue. * *

If this queue fits in the specified array with room to spare * (i.e., the array has more elements than this queue), the element in * the array immediately following the end of the queue is set to * null. * *

Like the {

@link #toArray()} method, this method acts as bridge between * array-based and collection-based APIs. Further, this method allows * precise control over the runtime type of the output array, and may, * under certain circumstances, be used to save allocation costs. * *

The following code can be used to dump a delay queue into a newly * allocated array of Delayed: * *

     *     Delayed[] a = q.toArray(new Delayed[0]);
* * Note that toArray(new Object[0]) is identical in function to * toArray(). * * @param a the array into which the elements of the queue are to * be stored, if it is big enough; otherwise, a new array of the * same runtime type is allocated for this purpose * @return an array containing all of the elements in this queue * @throws ArrayStoreException if the runtime type of the specified array * is not a supertype of the runtime type of every element in * this queue * @throws NullPointerException if the specified array is null */ public
T[] toArray(T[] a) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(a); } finally { lock.unlock(); } } /** * Removes a single instance of the specified element from this * queue, if it is present, whether or not it has expired. */ public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.remove(o); } finally { lock.unlock(); } } /** * Returns an iterator over all the elements (both expired and * unexpired) in this queue. The iterator does not return the * elements in any particular order. * *

The returned iterator is a "weakly consistent" iterator that * will never throw {

@link java.util.ConcurrentModificationException * ConcurrentModificationException}, and guarantees to traverse * elements as they existed upon construction of the iterator, and * may (but is not guaranteed to) reflect any modifications * subsequent to construction. * * @return an iterator over the elements in this queue */ public Iterator
iterator() { return new Itr(toArray()); } /** * Snapshot iterator that works off copy of underlying q array. */ private class Itr implements Iterator
{ final Object[] array; // Array of all elements int cursor; // index of next element to return; int lastRet; // index of last element, or -1 if no such Itr(Object[] array) { lastRet = -1; this.array = array; } public boolean hasNext() { return cursor < array.length; } @SuppressWarnings("unchecked") public E next() { if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return (E)array[cursor++]; } public void remove() { if (lastRet < 0) throw new IllegalStateException(); Object x = array[lastRet]; lastRet = -1; // Traverse underlying queue to find == element, // not just a .equals element. lock.lock(); try { for (Iterator it = q.iterator(); it.hasNext(); ) { if (it.next() == x) { it.remove(); return; } } } finally { lock.unlock(); } } }}

0. DelayQueue简介



1. 接口分析




2. DelayQueue原理概述





3. Delayed接口

public interface Delayed extends Comparable
{ /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit);}


long getDelay(TimeUnit unit) -> 根据传入的时间单位,返回当前对象距离到期还有多久时间(不能无视单位随意返回值,因为DelayQueue会根据这个返回时间设置等待时间,乱设可能会导致多余的自旋占用CPU)

public int compareTo(T o); -> 与传入的另外一个实现了Delayed接口的对象比较,两者谁先到期


4. leader线程


Thread designated to wait for the element at the head of the queue.  This variant of the Leader-Follower pattern (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to minimize unnecessary timed waiting.  When a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.  The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim.  Whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled.  So waiting threads must be prepared to acquire and lose leadership while waiting.


关键点在于,只有一个线程会超时等待,其他线程永久等待(超时等待要维护计时器,开销肯定相对较大),这样就减少了开销。再一次体现了Doug Lea大神对性能的追求,膜之。


5. DelayQueue.offer方法解析

/**     * Inserts the specified element into this delay queue.     *     * @param e the element to add     * @return true     * @throws NullPointerException if the specified element is null     */    public boolean offer(E e) {        final ReentrantLock lock = this.lock;        lock.lock();//加锁保证线程安全        try {            q.offer(e);//向内部维护的PriorityQueue插入元素,由于插入的对象实现了Comparable接口,距离超时时间最近的对象会排在队首            if (q.peek() == e) {
//如果新插入的对象就被排在队首了,那么leader线程的等待时间就不正确了,需要将leader线程唤醒 leader = null; available.signal();//唤醒leader线程 } return true; } finally { lock.unlock();//解锁 } }


6. DelayQueue.take方法解析

/**     * Retrieves and removes the head of this queue, waiting if necessary     * until an element with an expired delay is available on this queue.     *     * @return the head of this queue     * @throws InterruptedException {
@inheritDoc} */ public E take() throws InterruptedException {
//阻塞可中断 final ReentrantLock lock = this.lock; lock.lockInterruptibly();//加全局可中断锁 try { for (;;) { E first = q.peek();//获取队首元素,这个元素的等待时间是最短的 if (first == null) available.await();//如果队列为空,那也没法根据队首元素设置线程等待时间了,就直接无限等待 else { long delay = first.getDelay(TimeUnit.NANOSECONDS);//用ns为单位,获取队首元素的等待时间 if (delay <= 0)//如果这个元素已经过期了,直接出队 return q.poll(); else if (leader != null)//如果队首元素还没有过期,而且leader线程存在,当前线程选择永久等待,leader线程会将其唤醒 available.await(); else {
//将自己设置为leader线程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay);//根据队首元素设置自己的等待超时时间 } finally { if (leader == thisThread)//如果leader线程还是自己(可能在offer方法中被修改) leader = null;//让出leader位置 } } } } } finally { if (leader == null && q.peek() != null)//如果没有leader线程,并且优先队列不为空,那么唤醒一个等待线程 available.signal(); lock.unlock();//解锁 } }



7. DelayQueue.poll方法解析

/**     * Retrieves and removes the head of this queue, or returns null     * if this queue has no elements with an expired delay.     *     * @return the head of this queue, or null if this     *         queue has no elements with an expired delay     */    public E poll() {
//非阻塞方法 final ReentrantLock lock = this.lock; lock.lock();//加锁 try { E first = q.peek();//获取优先队列的队首元素,如果队首元素已经超时,则poll队首元素并返回,否则返回null if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } } /** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue, * or the specified wait time expires. * * @return the head of this queue, or null if the * specified waiting time elapses before an element with * an expired delay becomes available * @throws InterruptedException {
@inheritDoc} */ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
//超时可中断方法 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly();//全局可中断锁 try { for (;;) { E first = q.peek();//查找队首元素 if (first == null) {
//如果队列为空,则根据传入的超时时间设置线程等待时间 if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS);//获取队首元素的等待时间 if (delay <= 0) return q.poll(); if (nanos <= 0) return null; if (nanos < delay || leader != null)//如果leader的等待时间比当前线程的等待时间长,当前线程设置等待时间并等待 nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }


8. DelayQueue的迭代器

/**     * Returns an iterator over all the elements (both expired and     * unexpired) in this queue. The iterator does not return the     * elements in any particular order.     *     * 

The returned iterator is a "weakly consistent" iterator that * will never throw {

@link java.util.ConcurrentModificationException * ConcurrentModificationException}, and guarantees to traverse * elements as they existed upon construction of the iterator, and * may (but is not guaranteed to) reflect any modifications * subsequent to construction. * * @return an iterator over the elements in this queue */ public Iterator
iterator() { return new Itr(toArray());//将PriorityQueue中的所有元素复制一份 } /** * Snapshot iterator that works off copy of underlying q array. */ private class Itr implements Iterator
{ final Object[] array; // Array of all elements int cursor; // index of next element to return; int lastRet; // index of last element, or -1 if no such Itr(Object[] array) { lastRet = -1; this.array = array; } public boolean hasNext() { return cursor < array.length; } @SuppressWarnings("unchecked") public E next() {
//遍历数组 if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return (E)array[cursor++]; } public void remove() { if (lastRet < 0) throw new IllegalStateException(); Object x = array[lastRet];//获取上一次返回的元素,也就是将要被删除的元素 lastRet = -1; // Traverse underlying queue to find == element, // not just a .equals element. lock.lock();//加全局锁 try { for (Iterator it = q.iterator(); it.hasNext(); ) {
//遍历底层的PriorityQueue,删除对应的元素(不一定真的能找到这个元素) if (it.next() == x) { it.remove(); return; } } } finally { lock.unlock(); } } }






