3、高并发编程-并发容器

1、ThreadLocal(额外部分)

package com.sunyou.p5;

import java.util.concurrent.TimeUnit;

/*
 * 其实就是一个map,
 * ThreadLocal.set(value) ——>map.put(Thread.getCurrentThread(),value);
 * TreadLocal.get() ——>map.get(Thread.getCurrentThread())
 * 注意:在并发量高的时候,可能有内存溢出,所以在使用ThreadLocal的时候
 * 一定要回收资源,每个线程结束之前,将当前线程保存的线程变量删除ThreadLocal.remove()
 */
public class ThreadLocalTest {
	volatile static String name = "张三";
	static ThreadLocal<String> tl = new ThreadLocal<>();
	public static void main(String[] args) {
		//开启两个线程,一个读取数据,一个写数据
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					//睡眠两秒
					TimeUnit.SECONDS.sleep(2);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(name);//输出李四
				System.out.println(tl.get());//输出null
			}
		}).start();
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					//睡眠一秒
					TimeUnit.SECONDS.sleep(1);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				name = "李四";
				tl.set(name);
			}
		}).start();
	}
}

ThreadLocal会为变量在每个线程中创建一个副本,线程之间的副本互不影响。ThreadLocal的应用场景为解决数据库连接、Session管理等等。推荐一篇很好的博文 深入剖析ThreadLocal

2、CuntCurrentHashMap

在并发编程中使用HashMap可能会导致程序死循环,具体原因是在rehash过程中导致链表产生了环路造成的。可以参考博文 HashMap多线程下死循环的原因

同时HashTable使用的效率非常低下,HashTable是使用sychronized来保证线程安全的,在一个线程put时,也不允许其他线程不能进行put和get。所以这个时候可以使用CountCurrentHashMap来替代它们称为多线程下的容器。

ConcurrentHashMap使用锁分段技术来保证线程安全同时提升并发访问效率。

3、高并发编程-并发容器

ContCurrentHashMap是由Segment数组和HashEntry数组构成。Segment是一种重入锁,HashEntry用于存储键值对。在初始化Segment数组和HashEntry数组时,会保证数组的长度都是2的N次方。同时其hash算法采用了Wang/Jenkins hash算法对元素的hashCode进行一次再散列。

测试HashTable、ConcurrentHashMap和ConcurrentSkipListMap的效率

package com.sunyou.p5;

import java.util.Hashtable;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;

/*
 * 测试concurrentMap的效率
 */
public class ConcurrentMapTest {
	public static void main(String[] args) {
		final Map<String, String>  map = new Hashtable<>();
		//final Map<String, String> map = new ConcurrentHashMap<>();
		//final Map<String, String> map = new ConcurrentSkipListMap<>();
		//定义随机数生成对象
		final Random random = new Random();
		//定义100个线程
		Thread[] threadArray = new Thread[100];
		//定义门闩
		final CountDownLatch latch = new CountDownLatch(100);
		long firstTime = System.currentTimeMillis();
		//开启100个线程向map中存入数据
		for(int i = 0; i < 100; i++) {
			threadArray[i] = new Thread(new Runnable() {
				@Override
				public void run() {
					for(int i = 0; i < 10000; i++) {
						map.put("key : "+random.nextInt(10000), "value : "+random.nextInt(10000));
					}
					//线程结束后门闩减一
					latch.countDown();
				}
			});
		}
		//开启所有线程
		for(int i = 0; i < 100; i++){
			threadArray[i].start();
		}
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		long secondTime = System.currentTimeMillis();
		System.out.println("运行时间为:"+(secondTime - firstTime));
	}
	
}

其中ConcurrentSkipListMap底层是跳表实现。

3、高并发编程-并发容器

 

3、CopyOnWriteArrayList 写时复制集合

写入效率低,读取效率高。每次写入数据,都会创建一个新的底层数组。下面代码采用多线程的方式测试了ArrayList、Vector和CopyOnWriteArrayList的效率,注意ArrayList在多线程下不安全。

package com.sunyou.p5;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Vector;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;

public class CopyOnWriteListTest {
	public static void main(String[] args) {
		//final List<String>  list = new ArrayList<>();//多线程下不安全
		//final List<String> list = new Vector<>();
		final List<String> list = new CopyOnWriteArrayList<>();//效率低下,采用复制整个数组的方式添加元素
		//定义随机数生成对象
		final Random random = new Random();
		//定义100个线程
		Thread[] threadArray = new Thread[100];
		//定义门闩
		final CountDownLatch latch = new CountDownLatch(100);
		long firstTime = System.currentTimeMillis();
		//开启100个线程向map中存入数据
		for(int i = 0; i < 100; i++) {
			threadArray[i] = new Thread(new Runnable() {
				@Override
				public void run() {
					for(int j = 0; j < 1000; j++) {
						list.add("value"+j);
					}
					//线程结束后门闩减一
					latch.countDown();
				}
			});
		}
		//开启所有线程
		for(int i = 0; i < 100; i++){
			threadArray[i].start();
		}
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		long secondTime = System.currentTimeMillis();
		System.out.println("运行时间为:"+(secondTime - firstTime) +"ms");
		System.out.println("容器中元素个数为 "+list.size());
	}
	
}

4、LinkedBlockingQueue 阻塞队列

put和take方法会自动阻塞

take  自动阻塞的方法,当队列的容量为0时会自动阻塞。
put   自动阻塞方法,当队列的容量满了之后会自动阻塞。
package com.sunyou.p5;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/*
 * 并发容器 LinkedBlockingQueue 阻塞容器
 * put & take 自动阻塞
 * take 自动阻塞的方法,当队列的容量为0时会自动阻塞。
 * put  自动阻塞方法,当队列的容量满了之后会自动阻塞
 */
public class LinkedBlockingQueueTest {
	private static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
	public static void main(String[] args) {
		//开启一个生产者线程和三个消费者线程,生产者生产一个就会被某个消费者拿走
		new Thread(new Runnable() {
			@Override
			public void run() {
				while(true) {
					try {
						queue.put("烤鸭");
						TimeUnit.SECONDS.sleep(1);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		},"生产者").start();
		for(int i = 0; i < 3; i++) {
			new Thread(new Runnable() {
				@Override
				public void run() {
					while(true) {
						try {
							System.out.println(Thread.currentThread().getName()+"消费"+queue.take());
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
			},"消费者"+(i+1)).start();
		}
	}
}

5、ArrayBlockingQueue 有界阻塞队列

add              使用add添加元素时,当元素超过容器大小会抛出异常 queue full
put              使用put添加元素时,如果超过容量会引起阻塞
无参数的offer     不阻塞,容量不足时候返回false,当前新增的元素放弃
有参数的offer     如果超过容量,等待一段时间。如果这段时间有空闲空间,返回true,否则返回false。
package com.sunyou.p5;
/*
 * 底层是有界队列,自动阻
 */
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ArrayBlockingQueueTest {
	private static ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
	public static void main(String[] args) {
		for(int i = 0; i < 5; i++) {
			//queue.add("value : " + i); //Exception in thread "main" java.lang.IllegalStateException: Queue full
			
			/*try {
				queue.put("value : " + i);//如果出现queue容量不足,会阻塞等待
			} catch (InterruptedException e) {
				e.printStackTrace();
			}*/
			
			//queue.offer("value : " + i);//不阻塞,容量不足的时候返回false,当前新增数据操作放弃
			
			try {
				queue.offer("value : " + i, 1, TimeUnit.SECONDS);//当容量不足的时候,阻塞中间参数的时间,单位为第三个参数
			} catch (InterruptedException e) {//如果在阻塞时间内有了空闲空间返回true,否则返回false
				e.printStackTrace();
			}
		}
		System.out.println(queue);
	}
}

6、DelayQueue 延时队列

实现自定义处理顺序的队列。常用语定时任务,下列代码中的task的类必须实现Delayed接口,才能将其对象放入到延时队列中。下列代码通过当前时间和每个task给出的时间对比,进行排序

package com.sunyou.p5;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueTest {
	private static DelayQueue<MyTask> queue = new DelayQueue<>();
	
	public static void main(String[] args) {
		long currentTime = System.currentTimeMillis();
		MyTask[] task = new MyTask[6];
		//创建6个task
		for(int i = 0; i < 6; i++) {
			task[i] = new MyTask(currentTime+(i+1)*1000);
		}
		System.out.println("currentTime " + currentTime);
		//将task放入queue中
		for(int i = 0; i < 6; i++)
			queue.put(task[i]);
		//从queue中取出task
		for(int i = 0; i < 6; i++) {
			try {
				System.out.println(queue.take());
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
			
	}
}
class MyTask implements Delayed{
	private long compareValue;
	MyTask(long compareValue){
		this.compareValue = compareValue;
	}
	
	/*
	 * 当前对象的值-传入对象的值 > 0 返回正数
	 * 表示当前对象更大,放在后面。即从小到大排序
	 */
	@Override
	public int compareTo(Delayed o) {
		return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
	}

	@Override
	public long getDelay(TimeUnit unit) {
		return unit.convert(compareValue - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
	}

	@Override
	public String toString() {
		return "MyTask [compareValue=" + compareValue + "]";
	}
}

7、LinkedTransferQueue 转移队列

使用transfer方法,实现数据的即时处理。没有take,就一直阻塞

add 队列会保存数据,不做阻塞等待。
transfer 是TransferQueue特有的方法。必须有消费者take,如果没有,transfer方法阻塞。一般用于处理即使消息
package com.sunyou.p5;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;

/*
 * 并发容器 LinkedTransferQueue
 * 转移队列
 * add 队列会保存数据,不做阻塞等待。
 * transfer 是TransferQueue特有的方法。必须有消费者take,
 * 如果没有,transfer方法阻塞。一般用于处理即使消息
 */
public class LinkedTransferQueueTest {
	private static LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();
	public static void main(String[] args) {
		//打开两个线程,一个线程一直等待取数据,一个线程使用transfer发数据
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					TimeUnit.SECONDS.sleep(2);
					System.out.println(Thread.currentThread().getName()+" 开始工作");
					System.out.println("取到的数据为 " + queue.take());
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		},"取数据").start();
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println("开始发送数据,发送的数据为 ‘信息块’");
					queue.transfer("信息块");
					//queue.add("信息块");
					System.out.println("发数据完成,发送的数据为 ‘信息块’");
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			
		},"发数据").start();
	}
}

上述代码输出,接收方把数据收完后,发送发中断阻塞

开始发送数据,发送的数据为 ‘信息块’
取数据 开始工作
取到的数据为 信息块
发数据完成,发送的数据为 ‘信息块’

如果使用add,容器会暂时保存数据,发送方不会阻塞

开始发送数据,发送的数据为 ‘信息块’
发数据完成,发送的数据为 ‘信息块’
取数据 开始工作
取到的数据为 信息块

8、SyschronousQueue 同步队列

是一个容量为0的队列。是一个特殊的TransferQueue,必须有消费线程等待,才能使用。

add 不会阻塞,如果存在线程在take,就直接交付数据。如果没有线程在take,抛出异常。
put 会阻塞,等待线程来take。
package com.sunyou.p5;
/*
 * SyschronousQueue是一个容量为0的队列。是一个特殊的TransferQueue
 * add方法,没有阻塞。如果没有take线程等待数据,抛出异常
 * put方法,有阻塞。如果没有take线程等待数据,会阻塞
 */
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueTest {
	private static SynchronousQueue<String> queue = new SynchronousQueue<>();
	public static void main(String[] args) throws Exception {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					TimeUnit.SECONDS.sleep(3);
					System.out.println(Thread.currentThread().getName()+"开始工作");
					//等待取数据
					System.out.println("收到的数据为:"+queue.take());
					System.out.println(Thread.currentThread().getName()+"结束工作");
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			
		},"取数据").start();
		TimeUnit.SECONDS.sleep(1);
		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println(Thread.currentThread().getName()+"开始工作");
				try {
					//queue.add("信息块");
					queue.put("信息块");
					System.out.println(Thread.currentThread().getName()+"结束工作");
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		},"发数据").start();
	}
}