JAVA的Collection队列BlockingQueue

队列是一种数据结构,它有两个基本操作:在队列尾部加入元素和从队列头部移除元素。在我们日常开发中,经常用来并发操作数据。

非阻塞队列:以ConcurrentLinkedQueue为代表的非阻塞队列;

阻塞队列:以BlockingQueue接口为代表的阻塞队列;

JAVA的Collection队列BlockingQueue

JAVA的Collection队列BlockingQueue

    //***************************************************************//
    //        抛出异常             特殊值        阻塞        超时
    //插入    add(e)            offer(e)        put(e)       offer(e, time, unit)
    //移除    remove()        poll()           take()       poll(time, unit)
    //检查    element()        peek()        不可用     不可用
    //***************************************************************//

常用方法:

//    1、boolean add(E e):插入指定的元素要队列中,并返回true或者false,如果队列数量超过了容量,则抛出 llegalStateException的异常。
//    2、E remove():搜索并删除最顶层的队列元素,如果队列为空,则抛出一个Exception
//    3、boolean offer(E e):插入指定的元素到队列,并返回true或者false,如果队列数量超过了容量,不会抛出异常,只会返回false。
//    4、E poll():搜索并删除最顶层的队列元素,如果队列为空,则返回null
//    5、E element():检索但不删除并返回队列中最顶层的元素,如果该队列为空,则抛出一个Exception
//    6、E peek(): 检索但不删除并返回最顶层的元素,如果该队列为空,则返回null
//    7、put(E e)阻塞:把 e 加到 BlockingQueue 里,如果 BlockQueue 没有空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续
//    8、take():阻塞取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 Blocking 有新的对象被加入为止 

BlockingQueue常用于多线程环境下生产者-消费者模型

生产者:

/**
 * 30通过生产者消费者模型理解等待唤醒机制
 * @author Administrator
 *
 */
public class Demo49PushTarge implements Runnable {

    private Demo49Tmall tmall;
    
    public Demo49PushTarge(Demo49Tmall tmall) {
        this.tmall = tmall;
    }

    @Override
    public void run() {
        while(true){
            tmall.push();
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        
    }

}

 

/**
 * 30通过生产者消费者模型理解等待唤醒机制
 * @author Administrator
 *
 */
public class Demo49TakeTarge implements Runnable {

    private Demo49Tmall tmall;
    
    public Demo49TakeTarge(Demo49Tmall tmall) {
        this.tmall = tmall;
    }

    @Override
    public void run() {
        while(true){
            tmall.take();
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}
 

/**
 * 32使用Condition重写waitnotify案例并实现一个有界队列
 * 通过Condition改造生产者消费者模型理解等待唤醒机制
 * @author Administrator
 *
 */
public class Demo49Tmall {
    
    
    private final static int MAX_COUNT = 10;
    
    private BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(MAX_COUNT);
    
    public void push(){
//        queue.add(1);
        try {
            queue.put(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public void take(){
//        queue.remove();
        try {
            queue.take();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    public void size(){
        while(true){
            System.out.println("当前队列的长度为:"+queue.size());
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

 

package com.jetsen.thread.tc2;
/**
 * 49Java中的阻塞队列原理与使用
 * BlockingQueue 
 * put take
 * 抛出异常add remove
 * 对比非阻塞队列ConcurrentLinkedQueue  offer poll ,它们是有返回值
 * @author Administrator
 *
 */
public class Demo49ProducerAndConsumerMain {
    
    public static void main(String[] args) {
        Demo49Tmall tmall = new Demo49Tmall();
        
        Demo49PushTarge pushT = new Demo49PushTarge(tmall);
        Demo49TakeTarge takeT = new Demo49TakeTarge(tmall);
        
        new Thread(pushT).start();
        new Thread(pushT).start();
        new Thread(pushT).start();
        
        new Thread(takeT).start();
        new Thread(takeT).start();
        
        new Thread(new Runnable() {
            @Override
            public void run() {
                tmall.size();
            }
        }).start();   
    }
}