每天一例多线程[day14]----Queue队列
关于什么时候使用MQ消息队列,什么时候使用JAVA自带队列,有一些基本的原则,如下图:
生产者:生产消息的速度很慢,缓慢地放入MQ中存储。
消费者:消费能力强,快速处理掉MQ中的消息。
这种场景下,MQ就不适合使用了,因为消费端的处理能力很强,生产者完全可以通过TCP长连接直连消费者,比如使用Netty或者Mina等框架就比较合适。
介绍集中JDK并发包下的队列:
ConcurrentLinkedQueue
是一个适用于高并发场景下的队列,通过无锁的方式实现高并发场景下的高性能,通常ConcurrentLinkedQueue,性能好于BlockingQueue。它是一个基于链接节点的*线程安全队列,该队列元素遵循FIFO原则,头是最先加入的,尾是最近加入的,该。队列不允许null元素。
重要方法:
add()和offer(),都是加入元素的方法,在ConcurrentLinkedQueue中,这两个方法没有任何区别。
poll()和peek()都是取头部元素,区别是前者会删除元素,后者不会。
-
//高性能无阻塞线程安全*队列:ConcurrentLinkedQueue
-
ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
-
q.offer("a");
-
q.offer("b");
-
q.offer("c");
-
q.offer("d");
-
q.add("e");
-
System.out.println(q.poll()); //a 从头部取出元素,并从队列里删除
-
System.out.println(q.size()); //4
-
System.out.println(q.peek()); //b
-
System.out.println(q.size()); //4
BlockingQueue接口
ArrayBlockingQueue
基于数组的有界阻塞队列实现,它的内部维护了一个定长数据,使用时必须初始化长度,来缓存队列中的数据,内部没有实现读写分离,意味着生产和消费不能完全并行。可以指定先进先出或先进后出策略,
-
//数据阻塞有界队列 必须指定初始化长度
-
ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);//指定初始长度
-
//以下是三种添加元素进入队列的方式
-
array.put("a");
-
array.put("b");
-
array.add("c");
-
array.add("d");
-
array.add("e");
-
//array.add("f");//java.lang.IllegalStateException: Queue full
-
//3秒种内加入队列 成功则返回true,否则false
-
System.out.println(array.offer("a", 3, TimeUnit.SECONDS));
LinkedBlockingQueue
基于链表的*阻塞队列,同ArrayBlockingQueue类似,它的内部维护的是一个数据缓冲队列,该队列由链表组成,它之所以能高效处理并发数据,是因为它的内部采用了读写分离两个锁,从而实现生产者和消费者的完全并行运行。
-
//阻塞*队列 可以添加初始化参数变成有界队列,不传则是*队列
-
LinkedBlockingQueue<String> q = new LinkedBlockingQueue<String>();
-
q.offer("a");
-
q.offer("b");
-
q.offer("c");
-
q.offer("d");
-
q.offer("e");
-
q.add("f");
-
System.out.println(q.size());//6
-
//循环打印
-
for (Iterator iterator = q.iterator(); iterator.hasNext();) {
-
String string = (String) iterator.next();
-
System.out.println(string);
-
}
-
List<String> list = new ArrayList<String>();
-
//drainTo方法表示从队列中取出先加入的前n个元素放入集合中 返回加入的长度
-
System.out.println(q.drainTo(list, 3));
-
System.out.println(list.size());
-
for (String string : list) {
-
System.out.println(string);
-
}
SynchronousQueue
一种没有缓冲的队列,生产者生产的数据会直接被消费者获取并消费。
-
final SynchronousQueue<String> q = new SynchronousQueue<String>();
-
// q.add("aaa");//此种队列不允许加元素 报java.lang.IllegalStateException: Queue full
-
Thread t1 = new Thread(new Runnable() {
-
@Override
-
public void run() {
-
try {
-
System.out.println(q.take());
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
});
-
t1.start();
-
Thread t2 = new Thread(new Runnable() {
-
@Override
-
public void run() {
-
q.add("asdasd");
-
}
-
});
-
t2.start();
以上三个阻塞队列可以使用每天地铁站的人员流动限流策略来说明:
1 早上8点~9点30分,上班高峰期,地铁站空间有限(系统内存),采用ArrayBlockingQueue有界阻塞队列,当队列满的时候可以拒绝来访者,给予提示“稍后再来”。
2 早上10点~下午17点,人员流动平稳,地铁站不再限流,可以持续接纳乘客,采用LinkedBlockingQueue*阻塞队列,只要进来就可以保证可以乘车(被消费),在一定许可的时间内宽松的实时性。
3 晚上10点~11点半,人员稀少,采用SynchronousQueue,来了直接可以乘车,队列没有容量。
PriorityBlockingQueue
基于优先级的*阻塞队列,优先级的判断通过构造函数传入的Compare对象来决定,也就是传入队列的对象必须实现Comparable接口,在实现PriorityBlockingQueue队列时,内部控制线程采用的锁是公平锁。
举例:我们创建几个任务,通过比较任务的ID编号大小为任务进行排序,顺序表示优先级,看看取出队列中元素是按照什么优先级取出的?
-
public class Task implements Comparable<Task>{
-
private int id ;
-
private String name;
-
public int getId() {
-
return id;
-
}
-
public void setId(int id) {
-
this.id = id;
-
}
-
public String getName() {
-
return name;
-
}
-
public void setName(String name) {
-
this.name = name;
-
}
-
/**
-
* 按照升序排列
-
*/
-
@Override
-
public int compareTo(Task task) {
-
return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);
-
}
-
public String toString(){
-
return this.id + "," + this.name;
-
}
-
}
-
public class UsePriorityBlockingQueue {
-
public static void main(String[] args) throws Exception{
-
PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>();
-
Task t1 = new Task();
-
t1.setId(3);
-
t1.setName("id为3");
-
Task t2 = new Task();
-
t2.setId(4);
-
t2.setName("id为4");
-
Task t3 = new Task();
-
t3.setId(1);
-
t3.setName("id为1");
-
//return this.id > task.id ? 1 : 0;
-
q.add(t1); //3
-
q.add(t2); //4
-
q.add(t3); //1
-
// 1 3 4
-
System.out.println("容器:" + q);
-
System.out.println(q.take().getId());
-
System.out.println("容器:" + q);
-
System.out.println(q.take().getId());
-
System.out.println(q.take().getId());
-
}
-
}
容器:[1,id为1, 4,id为4, 3,id为3]
打印:
-
容器:[1,id为1, 4,id为4, 3,id为3]
-
1
-
容器:[3,id为3, 4,id为4]
-
3
-
4
说明优先级队列取出的顺序时1、3、4,从小到大的顺序.
DelayQueue
带有延迟时间的*队列,其中的元素只有当延迟的时间到了,才能被从队列中取出。DelayQueue必须实现Delayed接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如缓存超时的数据进行移除,任务超时处理,空闲连接的关闭等等。
举例:拿网吧上网为例,甲乙丙三个人上网分别1s,5s和10s,在上机的时间内,其他人是无法抢占这三台机器进行上网,只有当这三个人的上网时间到了,才能下机被别人获取到
-
import java.util.concurrent.Delayed;
-
import java.util.concurrent.TimeUnit;
-
public class Wangmin implements Delayed {
-
private String name;
-
//身份证
-
private String id;
-
//截止时间
-
private long endTime;
-
//定义时间工具类
-
private TimeUnit timeUnit = TimeUnit.SECONDS;
-
public Wangmin(String name,String id,long endTime){
-
this.name=name;
-
this.id=id;
-
this.endTime = endTime;
-
}
-
public String getName(){
-
return this.name;
-
}
-
public String getId(){
-
return this.id;
-
}
-
/**
-
* 用来判断是否到了截止时间
-
*/
-
@Override
-
public long getDelay(TimeUnit unit) {
-
//return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-
return endTime - System.currentTimeMillis();
-
}
-
/**
-
* 相互批较排序用
-
*/
-
@Override
-
public int compareTo(Delayed delayed) {
-
Wangmin w = (Wangmin)delayed;
-
return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;
-
}
-
}
-
import java.util.concurrent.DelayQueue;
-
public class WangBa implements Runnable {
-
private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>();
-
public boolean yinye =true;
-
public void shangji(String name,String id,int money){
-
Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());
-
System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"交钱"+money+"块,开始上机...");
-
this.queue.add(man);
-
}
-
public void xiaji(Wangmin man){
-
System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"时间到下机...");
-
}
-
@Override
-
public void run() {
-
while(yinye){
-
try {
-
Wangmin man = queue.take();
-
xiaji(man);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
public static void main(String args[]){
-
try{
-
System.out.println("网吧开始营业");
-
WangBa siyu = new WangBa();
-
Thread shangwang = new Thread(siyu);
-
shangwang.start();
-
siyu.shangji("路人甲", "123", 1);
-
siyu.shangji("路人乙", "234", 10);
-
siyu.shangji("路人丙", "345", 5);
-
}
-
catch(Exception e){
-
e.printStackTrace();
-
}
-
}
-
}
打印:
-
网吧开始营业
-
网名路人甲 身份证123交钱1块,开始上机...
-
网名路人乙 身份证234交钱10块,开始上机...
-
网名路人丙 身份证345交钱5块,开始上机...
-
网名路人甲 身份证123时间到下机...
-
网名路人丙 身份证345时间到下机...
-
网名路人乙 身份证234时间到下机...