3、高并发编程-并发容器
1、ThreadLocal(额外部分)
package com.sunyou.p5;
import java.util.concurrent.TimeUnit;
/*
* 其实就是一个map,
* ThreadLocal.set(value) ——>map.put(Thread.getCurrentThread(),value);
* TreadLocal.get() ——>map.get(Thread.getCurrentThread())
* 注意:在并发量高的时候,可能有内存溢出,所以在使用ThreadLocal的时候
* 一定要回收资源,每个线程结束之前,将当前线程保存的线程变量删除ThreadLocal.remove()
*/
public class ThreadLocalTest {
volatile static String name = "张三";
static ThreadLocal<String> tl = new ThreadLocal<>();
public static void main(String[] args) {
//开启两个线程,一个读取数据,一个写数据
new Thread(new Runnable() {
@Override
public void run() {
try {
//睡眠两秒
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name);//输出李四
System.out.println(tl.get());//输出null
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
//睡眠一秒
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
name = "李四";
tl.set(name);
}
}).start();
}
}
ThreadLocal会为变量在每个线程中创建一个副本,线程之间的副本互不影响。ThreadLocal的应用场景为解决数据库连接、Session管理等等。推荐一篇很好的博文 深入剖析ThreadLocal
2、CuntCurrentHashMap
在并发编程中使用HashMap可能会导致程序死循环,具体原因是在rehash过程中导致链表产生了环路造成的。可以参考博文 HashMap多线程下死循环的原因
同时HashTable使用的效率非常低下,HashTable是使用sychronized来保证线程安全的,在一个线程put时,也不允许其他线程不能进行put和get。所以这个时候可以使用CountCurrentHashMap来替代它们称为多线程下的容器。
ConcurrentHashMap使用锁分段技术来保证线程安全同时提升并发访问效率。
ContCurrentHashMap是由Segment数组和HashEntry数组构成。Segment是一种重入锁,HashEntry用于存储键值对。在初始化Segment数组和HashEntry数组时,会保证数组的长度都是2的N次方。同时其hash算法采用了Wang/Jenkins hash算法对元素的hashCode进行一次再散列。
测试HashTable、ConcurrentHashMap和ConcurrentSkipListMap的效率
package com.sunyou.p5;
import java.util.Hashtable;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
/*
* 测试concurrentMap的效率
*/
public class ConcurrentMapTest {
public static void main(String[] args) {
final Map<String, String> map = new Hashtable<>();
//final Map<String, String> map = new ConcurrentHashMap<>();
//final Map<String, String> map = new ConcurrentSkipListMap<>();
//定义随机数生成对象
final Random random = new Random();
//定义100个线程
Thread[] threadArray = new Thread[100];
//定义门闩
final CountDownLatch latch = new CountDownLatch(100);
long firstTime = System.currentTimeMillis();
//开启100个线程向map中存入数据
for(int i = 0; i < 100; i++) {
threadArray[i] = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 10000; i++) {
map.put("key : "+random.nextInt(10000), "value : "+random.nextInt(10000));
}
//线程结束后门闩减一
latch.countDown();
}
});
}
//开启所有线程
for(int i = 0; i < 100; i++){
threadArray[i].start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long secondTime = System.currentTimeMillis();
System.out.println("运行时间为:"+(secondTime - firstTime));
}
}
其中ConcurrentSkipListMap底层是跳表实现。
3、CopyOnWriteArrayList 写时复制集合
写入效率低,读取效率高。每次写入数据,都会创建一个新的底层数组。下面代码采用多线程的方式测试了ArrayList、Vector和CopyOnWriteArrayList的效率,注意ArrayList在多线程下不安全。
package com.sunyou.p5;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Vector;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
public class CopyOnWriteListTest {
public static void main(String[] args) {
//final List<String> list = new ArrayList<>();//多线程下不安全
//final List<String> list = new Vector<>();
final List<String> list = new CopyOnWriteArrayList<>();//效率低下,采用复制整个数组的方式添加元素
//定义随机数生成对象
final Random random = new Random();
//定义100个线程
Thread[] threadArray = new Thread[100];
//定义门闩
final CountDownLatch latch = new CountDownLatch(100);
long firstTime = System.currentTimeMillis();
//开启100个线程向map中存入数据
for(int i = 0; i < 100; i++) {
threadArray[i] = new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0; j < 1000; j++) {
list.add("value"+j);
}
//线程结束后门闩减一
latch.countDown();
}
});
}
//开启所有线程
for(int i = 0; i < 100; i++){
threadArray[i].start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long secondTime = System.currentTimeMillis();
System.out.println("运行时间为:"+(secondTime - firstTime) +"ms");
System.out.println("容器中元素个数为 "+list.size());
}
}
4、LinkedBlockingQueue 阻塞队列
put和take方法会自动阻塞
take 自动阻塞的方法,当队列的容量为0时会自动阻塞。
put 自动阻塞方法,当队列的容量满了之后会自动阻塞。
package com.sunyou.p5;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/*
* 并发容器 LinkedBlockingQueue 阻塞容器
* put & take 自动阻塞
* take 自动阻塞的方法,当队列的容量为0时会自动阻塞。
* put 自动阻塞方法,当队列的容量满了之后会自动阻塞
*/
public class LinkedBlockingQueueTest {
private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
public static void main(String[] args) {
//开启一个生产者线程和三个消费者线程,生产者生产一个就会被某个消费者拿走
new Thread(new Runnable() {
@Override
public void run() {
while(true) {
try {
queue.put("烤鸭");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"生产者").start();
for(int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
while(true) {
try {
System.out.println(Thread.currentThread().getName()+"消费"+queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"消费者"+(i+1)).start();
}
}
}
5、ArrayBlockingQueue 有界阻塞队列
add 使用add添加元素时,当元素超过容器大小会抛出异常 queue full
put 使用put添加元素时,如果超过容量会引起阻塞
无参数的offer 不阻塞,容量不足时候返回false,当前新增的元素放弃
有参数的offer 如果超过容量,等待一段时间。如果这段时间有空闲空间,返回true,否则返回false。
package com.sunyou.p5;
/*
* 底层是有界队列,自动阻
*/
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ArrayBlockingQueueTest {
private static ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
public static void main(String[] args) {
for(int i = 0; i < 5; i++) {
//queue.add("value : " + i); //Exception in thread "main" java.lang.IllegalStateException: Queue full
/*try {
queue.put("value : " + i);//如果出现queue容量不足,会阻塞等待
} catch (InterruptedException e) {
e.printStackTrace();
}*/
//queue.offer("value : " + i);//不阻塞,容量不足的时候返回false,当前新增数据操作放弃
try {
queue.offer("value : " + i, 1, TimeUnit.SECONDS);//当容量不足的时候,阻塞中间参数的时间,单位为第三个参数
} catch (InterruptedException e) {//如果在阻塞时间内有了空闲空间返回true,否则返回false
e.printStackTrace();
}
}
System.out.println(queue);
}
}
6、DelayQueue 延时队列
实现自定义处理顺序的队列。常用语定时任务,下列代码中的task的类必须实现Delayed接口,才能将其对象放入到延时队列中。下列代码通过当前时间和每个task给出的时间对比,进行排序
package com.sunyou.p5;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueTest {
private static DelayQueue<MyTask> queue = new DelayQueue<>();
public static void main(String[] args) {
long currentTime = System.currentTimeMillis();
MyTask[] task = new MyTask[6];
//创建6个task
for(int i = 0; i < 6; i++) {
task[i] = new MyTask(currentTime+(i+1)*1000);
}
System.out.println("currentTime " + currentTime);
//将task放入queue中
for(int i = 0; i < 6; i++)
queue.put(task[i]);
//从queue中取出task
for(int i = 0; i < 6; i++) {
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class MyTask implements Delayed{
private long compareValue;
MyTask(long compareValue){
this.compareValue = compareValue;
}
/*
* 当前对象的值-传入对象的值 > 0 返回正数
* 表示当前对象更大,放在后面。即从小到大排序
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(compareValue - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public String toString() {
return "MyTask [compareValue=" + compareValue + "]";
}
}
7、LinkedTransferQueue 转移队列
使用transfer方法,实现数据的即时处理。没有take,就一直阻塞
add 队列会保存数据,不做阻塞等待。
transfer 是TransferQueue特有的方法。必须有消费者take,如果没有,transfer方法阻塞。一般用于处理即使消息
package com.sunyou.p5;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
/*
* 并发容器 LinkedTransferQueue
* 转移队列
* add 队列会保存数据,不做阻塞等待。
* transfer 是TransferQueue特有的方法。必须有消费者take,
* 如果没有,transfer方法阻塞。一般用于处理即使消息
*/
public class LinkedTransferQueueTest {
private static LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
public static void main(String[] args) {
//打开两个线程,一个线程一直等待取数据,一个线程使用transfer发数据
new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+" 开始工作");
System.out.println("取到的数据为 " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"取数据").start();
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("开始发送数据,发送的数据为 ‘信息块’");
queue.transfer("信息块");
//queue.add("信息块");
System.out.println("发数据完成,发送的数据为 ‘信息块’");
} catch (Exception e) {
e.printStackTrace();
}
}
},"发数据").start();
}
}
上述代码输出,接收方把数据收完后,发送发中断阻塞
开始发送数据,发送的数据为 ‘信息块’
取数据 开始工作
取到的数据为 信息块
发数据完成,发送的数据为 ‘信息块’
如果使用add,容器会暂时保存数据,发送方不会阻塞
开始发送数据,发送的数据为 ‘信息块’
发数据完成,发送的数据为 ‘信息块’
取数据 开始工作
取到的数据为 信息块
8、SyschronousQueue 同步队列
是一个容量为0的队列。是一个特殊的TransferQueue,必须有消费线程等待,才能使用。
add 不会阻塞,如果存在线程在take,就直接交付数据。如果没有线程在take,抛出异常。
put 会阻塞,等待线程来take。
package com.sunyou.p5;
/*
* SyschronousQueue是一个容量为0的队列。是一个特殊的TransferQueue
* add方法,没有阻塞。如果没有take线程等待数据,抛出异常
* put方法,有阻塞。如果没有take线程等待数据,会阻塞
*/
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueTest {
private static SynchronousQueue<String> queue = new SynchronousQueue<>();
public static void main(String[] args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"开始工作");
//等待取数据
System.out.println("收到的数据为:"+queue.take());
System.out.println(Thread.currentThread().getName()+"结束工作");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"取数据").start();
TimeUnit.SECONDS.sleep(1);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"开始工作");
try {
//queue.add("信息块");
queue.put("信息块");
System.out.println(Thread.currentThread().getName()+"结束工作");
} catch (Exception e) {
e.printStackTrace();
}
}
},"发数据").start();
}
}