阻塞队列

Java中的阻塞队列

阻塞队列

1.1 什么是阻塞队列(BlockingQueue)

支持阻塞操作的队列。具体来讲,支持阻塞添加和阻塞移除。

阻塞添加:当队列满的时候,队列会阻塞插入插入的元素的线程,直到队列不满;

阻塞移除:在队列为空时,队里会阻塞插入元素的线程,直到队列不满。

阻塞队列常用于生产者和消费者场景,生产者是向队列添加元素的线程;消费者是从队列取元素的线程。

插入和移除操作的四种处理方式

阻塞队列

抛出异常:

Add: 如果队列的容量已满,在添加元素就会抛出Queue full异常

Remove: 如果队列没有元素,则抛出NoSuchElement异常;如果是删除某一个元素,如果元素不存在,则不会抛出异常,而是返回false.

Element: 如果队列为空,则抛出NoSuchElement异常;如果队不为空,则返回第一个元素。

 

返回特殊值:

offer(anObject): 如果可能的话,将元素添加到队列,返回true,如果队列已满,则不添加,返回false.不阻塞执行线程

poll: 从队列返回第一个元素,如果没有返回null.

peek: 从队列返回第一个元素,如果没有返回null

Block Queue

 

一直阻塞:

put:把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

take: 如果取到则返回,如果没有取到,则一直阻塞,直到队列不为空

 

超时退出:

offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列。加入BlockingQueue,则返回失败。

poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

 

1.2 常见的阻塞队列

1.2.1ArrayBlockingQueue

是一个基于数组实现的有界队列,此队列取元素基于先进先出。

它有区别为公平和非公平访问队列。所谓公平是指,处于阻塞的队列,按照阻塞的先后顺序访问队列;不公平是指被阻塞的线程都可以争夺访问队列的资格。默认是不公平的,如果需要保证公平,则需要通过参数指定。

ArrayBlockingQueue(int capacity, boolean fair);

公平的实现是基于可重入锁ReentrantLock实现。

public ArrayBlockingQueue(int capacity, boolean fair) {

    if (capacity <=0)

        throw newIllegalArgumentException();

    this.items = new Object[capacity];

    lock = newReentrantLock(fair);

    notEmpty = lock.newCondition();

    notFulllock.newCondition();

}

 

1.2.2 LinkedBlockingQueue

LinkedBlockingQueue 是一个基于链表实现的有界阻塞队列。默认长度为Integer.MAX_VALUE.也是按照先进先出的原则取元素。LinkedBlockingQueue内部有一个Node类,表示结点,用于存放元素。

重要的属性:

int capacity;

AtomicInteger count = new AtomicInteger();

Node<E> head;

Node<E> last;

// 读锁

ReentrantLock takeLock = new ReentrantLock();

Condition notEmpty = takeLock.newCondition();

// 写锁

ReentrantLock putLock = new ReentrantLock();

Condition notFull = putLock.newCondition();

根据属性我们可以知道它读和写是分开加锁的。

重要的方法:

public void put(E e) throwsInterruptedException {

    if (e == null) throw newNullPointerException();

    int c = -1;

    Node<E> node = new Node<E>(e);

    final ReentrantLock putLock = this.putLock;

    final AtomicInteger count = this.count;

    //当前线程未被中断

    putLock.lockInterruptibly();

    try {

        //如果元素已满,一直等待,直到队列可添加

        while (count.get() == capacity){

            notFull.await();

        }

        enqueue(node);

        c = count.getAndIncrement();

        if (c + 1 < capacity)

            notFull.signal();

    } finally {

        putLock.unlock();

    }

    if (c == 0)

        signalNotEmpty();

}

 

public E take() throwsInterruptedException {

    E x;

    int c = -1;

    final AtomicInteger count = this.count;

    final ReentrantLock takeLock = this.takeLock;

    takeLock.lockInterruptibly();

    try {

        //如果数量为0,一直阻塞,直到队列非空

        while (count.get() == 0) {

            notEmpty.await();

        }

        x = dequeue();

        c = count.getAndDecrement();

        if (c > 1)

            notEmpty.signal();

    } finally {

        takeLock.unlock();

    }

    if (c == capacity)

        signalNotFull();

    return x;

}

 

1.2.3 PriorityBlockingQueue

是一个支持优先级的*阻塞队列。基于堆这种数据结构。

它可以对元素进行排序。我们可以指定一个实现了Comparator的比较器。

重要方法:

private static <T> void siftUpComparable(int k, T x, Object[] array) {

    Comparable<? super T> key =(Comparable<? super T>) x;

    while (k > 0) {

        int parent = (k - 1) >>>1;

        Object e = array[parent];

        if (key.compareTo((T) e) >= 0)

            break;

        array[k] = e;

        k = parent;

    }

array[k] = key;

}   

private static <T> voidsiftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) {

      while (k > 0) {

           int parent = (k - 1) >>>1;

           Object e = array[parent];

           if (cmp.compare(x, (T) e) >= 0)

                 break;

           array[k] = e;

           k = parent;

      }

      array[k] = x;

}

1.2.4 DelayQueue

是一个支持延时获取元素的*阻塞队列。队列使用PriortiyQueue来实现。队列中的元素必须实现Delayed接口,在创建元素的时候,可以指定多久才能从队列获取当前元素。只有在延迟期满时才能从队列中提取元素。

应用场景:

缓存系统设计,用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦从DelayQueue中获取元素,表示缓存到期了。

定时调度任务,使用DelayQueue保存当前将会执行的任务和执行时间,一旦从DelayQueue获取到任务就开始执行。比如TimerQueue

 

1.2.5 SynchrousQueue

是一个不存储元素的阻塞队列,每一个put操作必须等一个take操作,否则不能继续添加元素。即直接将接到的元素传给其他使用者。

它支持公平的访问队列,默认情况下是不公平的。

 

1.2.6 LinkedTransferQueue

链表实现的*阻塞队列,多了trysTransfer()和transfer()。

transfer:如果当前有消费者正在等待接收元素,使用take()方法或待时间限制的poll方法,transfer可以把生产者传入的元素立刻传给消费者,如果没有消费者在等待接收元素,该方法会将元素存放在队列的tail节点,并且等待这个元素被消费了才返回。

tryTransfer: 是试探传入的元素是否能够直接传给消费者。如果没有消费者在等待接收元素,则返回false.无论消费者是否接受,方法立即返回。但transfer是消费者消费了才返回。

1.2.7 LinkedBlockDeque

链表实现的双向阻塞队列,即可以从两端插入和移除元素。双向队列因为多了一个操作队列的入口,在多线程同时入队列,也就减少了一半的竞争。相比其他的阻塞队列。他多了addFirst , offerFirst,offerLast,

PeekFirst,peekLast方法