DelayQueue 
DelayQueue对元素进行持有直到一个特定的延迟到期,不允许null元素且注入其中的元素必须实现java.util.concurrent.Delayed接口。 PriorityQueue是一个根据队列里元素某些属性排列先后的顺序队列,DelayQueue其实就是在每次往优先级队列中添加元素, 然后以元素的delay过期值作为排序的因素,以此来达到先过期的元素会排在队首,每次从队列里取出来都是最先过期的元素。
源码分析 
DelayQueue是容量无界的最大为Integer.MAX_VALUE,默认容量为11,若容量不够以当前容量50%递增, 可以使用一个现有集合对象初始化。
- DelayQueue()
- DelayQueue(Collection<? extends E> c)
 元素需要实现的接口Delayed
java
public interface Delayed extends Comparable<Delayed> {
    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}public interface Delayed extends Comparable<Delayed> {
    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
属性 
java
// 重入锁
private final transient ReentrantLock lock = new ReentrantLock();
// 锁条件对象
private final Condition available = lock.newCondition();
// 根据delay时间排序的优先队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
 * Thread designated to wait for the element at the head of
 * the queue.  This variant of the Leader-Follower pattern
 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 * minimize unnecessary timed waiting.  When a thread becomes
 * the leader, it waits only for the next delay to elapse, but
 * other threads await indefinitely.  The leader thread must
 * signal some other thread before returning from take() or
 * poll(...), unless some other thread becomes leader in the
 * interim.  Whenever the head of the queue is replaced with
 * an element with an earlier expiration time, the leader
 * field is invalidated by being reset to null, and some
 * waiting thread, but not necessarily the current leader, is
 * signalled.  So waiting threads must be prepared to acquire
 * and lose leadership while waiting.
 */
// 用于优化阻塞通知的线程元素leader 用leader来减少不必要的等待时间
// 
private Thread leader = null;// 重入锁
private final transient ReentrantLock lock = new ReentrantLock();
// 锁条件对象
private final Condition available = lock.newCondition();
// 根据delay时间排序的优先队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
/**
 * Thread designated to wait for the element at the head of
 * the queue.  This variant of the Leader-Follower pattern
 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 * minimize unnecessary timed waiting.  When a thread becomes
 * the leader, it waits only for the next delay to elapse, but
 * other threads await indefinitely.  The leader thread must
 * signal some other thread before returning from take() or
 * poll(...), unless some other thread becomes leader in the
 * interim.  Whenever the head of the queue is replaced with
 * an element with an earlier expiration time, the leader
 * field is invalidated by being reset to null, and some
 * waiting thread, but not necessarily the current leader, is
 * signalled.  So waiting threads must be prepared to acquire
 * and lose leadership while waiting.
 */
// 用于优化阻塞通知的线程元素leader 用leader来减少不必要的等待时间
// 
private Thread leader = null;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
leader的作用?
leader的主要作用用于减少不必要的阻塞时间,例如有多个消费者线程用take方法去取, 内部先加锁,然后每个线程都去peek第一个节点。如果leader不为空说明已经有线程在取了,设置当前线程阻塞。 如果为空说明没有其他线程去取这个节点,设置leader并等待delay延时到期,直到poll后结束循环。
方法 
添加元素 
- boolean offer(E e)
java
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e); // 优先队列入队
        if (q.peek() == e) { // 查看元素是否是优先队列队首
            leader = null; // 设置leader为空 唤醒take线程
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e); // 优先队列入队
        if (q.peek() == e) { // 查看元素是否是优先队列队首
            leader = null; // 设置leader为空 唤醒take线程
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
- boolean offer(E e, long timeout, TimeUnit unit)
 同offer(E e)方法 因为队列没有容量限制故没有超时的offer方法
- boolean add(E e)
 同offer(E e)方法 因为队列没有容量限制故没有抛出异常的add方法
- void put(E e)
 同offer(E e)方法 因为队列没有容量限制故没有阻塞的put方法
移除元素 
- E poll()
java
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek(); // 查看优先队队首
        // 如果优先队列为空或队首delay时间为达到返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll(); // 优先队列出队
    } finally {
        lock.unlock();
    }
}public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek(); // 查看优先队队首
        // 如果优先队列为空或队首delay时间为达到返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll(); // 优先队列出队
    } finally {
        lock.unlock();
    }
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
- E poll(long timeout, TimeUnit unit)
java
 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) { // 当优先队列为空时
                if (nanos <= 0) // 超过dequeue操作的timeout时间 返回null
                    return null;
                else // dequeue操作阻塞
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                if (nanos <= 0)
                    return null;
                // 如果优先队列队首未达到可用时间 释放队首节点避免内存溢出
                first = null; // don't retain ref while waiting
                // 如果队首delay时间大于poll等待时间
                // 或者leader不为null(已有其他线程在阻塞队首delay时间)
                // 继续进行poll等待
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    // 如果队首delay时间小于poll阻塞时间
                    // 且leader为null(没有线程在阻塞队首delay时间)
                    // 将当前线程设置为leader
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 阻塞delay时间
                        long timeLeft = available.awaitNanos(delay);
                        // 设置poll需要阻塞的时间
                        nanos -= delay - timeLeft;
                    } finally {
                        // 释放leader
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果没有线程阻塞delay且优先队列队首不为null
        if (leader == null && q.peek() != null)
            available.signal(); // 唤醒等待消费的线程
        lock.unlock();
    }
} public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) { // 当优先队列为空时
                if (nanos <= 0) // 超过dequeue操作的timeout时间 返回null
                    return null;
                else // dequeue操作阻塞
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                if (nanos <= 0)
                    return null;
                // 如果优先队列队首未达到可用时间 释放队首节点避免内存溢出
                first = null; // don't retain ref while waiting
                // 如果队首delay时间大于poll等待时间
                // 或者leader不为null(已有其他线程在阻塞队首delay时间)
                // 继续进行poll等待
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    // 如果队首delay时间小于poll阻塞时间
                    // 且leader为null(没有线程在阻塞队首delay时间)
                    // 将当前线程设置为leader
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 阻塞delay时间
                        long timeLeft = available.awaitNanos(delay);
                        // 设置poll需要阻塞的时间
                        nanos -= delay - timeLeft;
                    } finally {
                        // 释放leader
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果没有线程阻塞delay且优先队列队首不为null
        if (leader == null && q.peek() != null)
            available.signal(); // 唤醒等待消费的线程
        lock.unlock();
    }
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
出队方法中为什么要释放first元素?
假如有线程A和B都来获取队首,如果线程A阻塞完毕,获取对象成功,出队完成。 这个对象理应被GC回收,但是他还被线程B持有着,GC链可达,所以不能回收这个first。
- E remove()
java
public E remove() {
    E x = poll(); // 见上poll()方法
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}public E remove() {
    E x = poll(); // 见上poll()方法
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}1
2
3
4
5
6
7
2
3
4
5
6
7
- boolean remove(Object o)
java
public boolean remove(Object o) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 调用优先队列remove(Object o)方法
        return q.remove(o);
    } finally {
        lock.unlock();
    }
}public boolean remove(Object o) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 调用优先队列remove(Object o)方法
        return q.remove(o);
    } finally {
        lock.unlock();
    }
}1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
- E take()
java
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) // 如果队首为null 阻塞当前线程
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0) // 队首可用 队列弹出队首元素
                    return q.poll();
                // 防止内存溢出
                first = null; // don't retain ref while waiting
                if (leader != null) // 如果有其他线程在阻塞队首delay时间 阻塞当前线程
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 阻塞队首delay时间
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) // 如果队首为null 阻塞当前线程
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0) // 队首可用 队列弹出队首元素
                    return q.poll();
                // 防止内存溢出
                first = null; // don't retain ref while waiting
                if (leader != null) // 如果有其他线程在阻塞队首delay时间 阻塞当前线程
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 阻塞队首delay时间
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
总结 
DelayQueue底层是一个优先队列(java.util.PriorityQueue),使用一个重入锁和锁生成的条件对象进行并发控制。 toArray、drainTo、clear都加锁但contains、toString未加锁。