JAVA高并发知识点
1.NIO BIO AIO
要了解NIO,BIO和AIO的区别吗,首先就要搞懂什么是同步异步,什么是阻塞非阻塞。
阻塞是从线程的角度去讨论的,简单来讲就是线程在执行到某一段代码的时候遇到某些操作被挂起,不再参与后续对cpu的资源争夺,直到被唤醒为止;
非阻塞是指线程在执行到某一段代码时遇到某些操作不会被挂起,而是就行参与cpu资源的争夺,继续执行。
同步异步则是从业务角度去考虑的,某些操作在执行该操作的时候多方是否会等待业务执行完才执行后续操作,如果需要等待,则为同步,不需要等待,则为异步
Java中传统的IO为BIO(同步阻塞式IO),在java1.0的时候就已经产生,本质上是面向流的操作,在执行connect,accept,read,write等方法的时候都有可能产生阻塞,造成后续代码无法继续执行
Jdk1.4之后,引入了NIO(同步非阻塞式IO),它和BIO的区别在于面向通道,操作缓冲区,这种方式使代码变得更加灵活,但是在执行上述操作的时候,虽然不会产生阻塞,但是在实际操作中还是需要编写代码来循环处理以上操作,所以仍然是同步的。
AIO是在jdk1.7之后引入的,底层使用了大量的回调函数,这里不进行详细的介绍(如果想进一步了解关于这三个IO的详细信息,可以查看http://blog.****.net/anxpp/article/details/51512200,)
2.BlockQueue
传统意义上的队列,一边往里放,另一边从中取,当队列中的对象被放满的时候,外界对象放不进来,此时就会产生阻塞,直到队列中有对象被拿走消费掉。相对的,如果队列中没有对象,而此时消费端从队列中去取对象,此时也会产生阻塞,直到有新的对象被放到队列中。示意图如下:
BlockingQueue有四种不同的方法用于添加,删除,以及对队列中的元素进行检查,如果请求的操作不能得到立即执行的话,相应的方法表现也会不同
这些方法如下:
抛出异常 |
返回一个特定值 |
阻塞 |
超时 |
|
增加 |
add |
offer |
put |
timeout(time,timeunits) |
移除 |
Remove |
Poll |
Take |
Poll(timeout,timeunits) |
检查 |
Element |
Peek |
抛出异常:如果试图进行的操作不能立即执行,那么会返回一个异常
返回一个特定值:如果试图进行的操作不能立即执行,那么会返回一个特定的值(一般为true或者false)
阻塞:如果试图进行的操作不能立即执行,线程会阻塞住,直到代码可以继续执行为止
超时:如果试图进行的操作不能立即执行,在一定时间内会阻塞住,直到代码可以继续执行为止,而且会返回一个特定值告知代码是否执行成功
注意:向BlockingQueue中不能插入null,如果强行插入null则会返回NullPointerExceprion
BlockQueue的实现类有以下几种:
数组阻塞队列,底层基于数组,队列的长度在生成的时候就已经确定,一旦初始化就不能修改其长度,内部是基于FIFO(First in first out)顺序存储元素。
延迟队列,放入该队列的元素必须实现java.util.concurrent.Delay接口,并重写里面的两个方法compareto (用来给delay元素做比较)和 getDelay方法,其中getDelay方法的参数为TimeUnits类,为时间的单位,方法的返回值为延迟的时间,队列会在每个元素的返回值时间之后就会释放掉这个元素,如果返回值为0或者负值的话,那么就视为延迟超时,不在释放元素,直到有下一次take方法被调用时释放。
链式阻塞队列,和数组阻塞队列差不多,只不过这个队列可以不定义上上限,如果不定义上限的话最大值为Integer.MAX_VALUE
优先级阻塞队列,队列中的元素必须实现java.lang.Comparable接口,重写里面的compare方法,元素的排序就是按照compare方法进行排序
同步队列,只能容纳单个元素,再向里面放入元素的话会被阻塞住
双端阻塞队列,简单来讲就是两边都能存和取的队列,本身也是一个几口,继承了BlockingQueue,自身有一个实现类为LinkedBlockingDeque,内部实现的方法和Queue的差不多,只不过是分两类,一类是方法First,从前方增取和查找,另一类是方法Last,从后方增取和查找。
3.ConcurrentMap
ConcurrentMap本身是一个接口,自身有一个实现类为ConcurrentHashMap
和HashTable一样是线程安全的,只不过HashTable只要有操作就会锁住整个Map,造成效率底下,而ConcurrentHashMap对其进行了优化,引入了桶的机制(key的hash值取模并对桶的个数做余数,余数相同的放到同一个桶里),默认情况下分16个桶,进行操作的时候只会锁住一个桶而不会对整个表进行锁操作。
并发导航映射
支持并发访问的NavigableMap,也支持其子类Map的方法并发访问
4.闭锁CountDownLatch
闭锁是一个并发构造,它允许一个或者多个线程等待指定操作完成,其中最重要的两个方法分别是countdown()和await();
例如:CountDownlatch latch=new CountDownlatch(2);
latch.await设置线程的等待位置,线程每调用一次latch.countdown()递减锁存器都会减一,直到减到0,await之后的代码才会继续执行。(如果有多个线程的话,那么多个线程都会在await处卡住,并且递减锁存器减到0时所有线程都会并发执行)。
另外,await方法有一个重载返回值为boole型的await(tian类timeout,TimeUnit)方法,可以设置等待超时时间,在以下三种情况下,线程进入await(timeout,TimeUnit)方法之后会一直等待:
计数器没有减为0
当前运行的线程没有被中断
等待没有超时
若满足以上三种条件,当计数器键位0的时候,会返回true;
若当前线程被中断或者等待时被中断,会抛出InterruptedException 异常,若等待超时,则会返回false;
附一段代码:
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BlockingQ {
public static void main(String[] args) throws InterruptedException {
ExecutorService es=Executors.newFixedThreadPool(3);
Person p1=new Person("张三",18);
Person p2=new Person("李四",19);
Person p3=new Person("王五",20);
final Map map=new HashMap<Integer,Person>();
map.put(1, p1);
map.put(2, p2);
map.put(3, p3);
final CountDownLatch blatch=new CountDownLatch(1);
final CountDownLatch elatch=new CountDownLatch(3);
for(int i=0;i<=2;i++){
final int index =i+1;
Runnable run1=new Runnable() {
@Override
public void run() {
try {
blatch.await();
Thread.sleep((long) (Math.random()*10));
if(map.keySet().contains(index)){
System.out.println(map.get(index).toString()+"选手到达了");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
finally{
elatch.countDown();
}
}
};
es.submit(run1);
}
System.out.println("比赛开始");
blatch.countDown();
elatch.await();
System.out.println("比赛结束");
es.shutdown();
}
}
执行结果为:
比赛开始
Person [name=王五, age=20]选手到达了
Person [name=张三, age=18]选手到达了
Person [name=李四, age=19]选手到达了
比赛结束
注意:CountDownLatch是不可重复的过程,也就是说计数无法被重置
CyclicBarrier(以下统称为cb)栅栏,和countdownlatch的(以下简称cdl)作用基本一样,但是两者也有些许的差异
cb与cdl计数相反,它是加计数,所以没有countdown方法,调用await方法会让计数器加1,若加1后没有到达指定的值,那么线程就会阻塞,当线程再调用await方法后达到了计数值,所有线程都会并发执行await方法之后的代码,并且计数器将会被重置,这也是栅栏可以重复利用的原因。
5.ExecutorService
执行其服务,并发包中很重要的一个概念,本身是一个异步执行机制,相当于Executor线程池。ExecutorService是一个接口,实现类有两种:ThreadPoolExecutor、ScheduledThreadPoolExecutor
创建ExecutorService的方式有三种:
ExecutorService executorService1 =Executors.newSingleThreadExecutor();
ExecutorService executorService2 =Executors.newFixedThreadPool(10);
ExecutorService executorService3 =Executors.newScheduledThreadPool(10);
从图中可以很容易的看出线程与执行器服务之间的关系,在线程之外重新开启了一个线程,异步执行需要操作的代码。
ExecutorService的使用主要有以下五种方法:
ExecutorService service=Executors.SingleThreadExecutor();
1.service.execute(new Runnable{
执行代码。。。
})
这个方法只是单方面告知执行器服务去执行execute里的线程,无论执行的效果如何都不会给你返回结果。所以用这个方法并不能检测到线程的执行结果,但是executorservice还有其他的方法供你使用。
2.Future future=service.submit(newRunnable(){
执行代码。。。
})
注意到了没,这个方法会返回一个future的返回值,future将会接收从内部代码块里return的信息,但是这里我们用的仍然是runnable,所以得到的返回值为null。
3.Future future=service.submit(newCallable(){
执行代码。。。
})
所以这个方法将submit的参数替换为了Callable(),众所周知Callable()开启线程与Runnable()的区别在于可以带有返回值,所以现在future可以光明正大的去接收submit方法的返回值了。
注意:future.get()是会阻塞的。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class executors {
public static void main(String[] args) {
ExecutorService service=Executors.newSingleThreadExecutor();
service.submit(new Runnable(){
@Override
public void run() {
System.out.println(1/0);
}
});
System.out.println("hahaja");
service.shutdown();
}
}
大家认为这段代码的最终执行结果会是什么?
一般都会认为程序会抛出一个 java.lang.ArithmeticException
的异常吧,但是实际上并不会,代码的最终结果为
Hahaja
实际上代码在里面是已经执行过的,但是executorservice自动忽略了这个异常,这与execute不太一样,当你使用executor来提交线程时,最终的运行结果还是会抛出这个异常,所以说submit和execute除了返回值以外这两个方法还是有些许的区别。
4.Stringstr=service.invokeAny(Set<Callable<String>>)
此方法需要传入一个Callable的集合类,收到的返回值是此集合类中的某一个Callable的返回结果,但是线程的返回结果并不是唯一的,多个线程在多次运行的时候返回结果并不一样。并且如果集合中的线程有一个结束或者抛出了异常,其他的Callable将会被取消。
5.List<Future<String>>future=service.invokeAll(Set<Callable<String>>)
此方法和invokeAny一样需要传入一个Callable的集合类,但是返回值不在是其中一个Callable的返回值,而是整个Callable的返回值的集合。
如果这个集合中的某个callable有异常,也会抛出,但是在invokeAny中,如果某个线程出现了异常,异常并不会抛出,而且该线程永远不可能作为返回值返回。
目前最常用的关闭方法是.shutdown(),要注意的是这个方法在执行完之后并不会立即关闭线程,而是等待所有的线程都执行完毕后才会关闭,这种效果同样适用于main方法,比如当我们调用了shutdown()之后,如果有线程没有执行完,而main方法走完了,那么此时JVM并不会关闭,而是等待所有线程执行完毕后才会关闭。
如果你想立即关闭执行器服务,调用.shutdownnow(),它会立即中断并关闭所有的线程,包括已提交还未执行的线程,这样的话就无法保证线程是否是中断关闭还是执行结束。
线程池执行者,其包含的线程池可以包含不同数量的线程,线程的数量由以下两个参数决定:
CorePoolSize 核心线程池大小
maximumPoolSize 最大线程池大小
除了这两个参数以外,线程池执行者还有其他的参数可以配置:
keepAliveTime |
线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间 |
TimeUnit |
keepAliveTime时间单位 |
workQueue |
阻塞任务队列 |
threadFactory |
新建线程工厂 |
RejectedExecutionHandler |
当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理 |
1.当线程池的数量小于corepoolsize时,即使有空闲线程,也会创建新的线程
2.当线程池的corepoolsize满时,线程会放到workqueue中等待调度
3.如果workqueue也满了,并且线程的数量小于maximumpoolsize的时候,新提交的任务会创建新的线程来执行
4.当maximumpoolsize满了的时候,新增的任务会交给 RejectedExecutionHandler来处理
5.当线程数量超过corepoolsize的时候,并且已达到keepAliveTime,那么会 关闭多余的空闲线程
6.allowCoreThreadTimeOut(true)这里面为true的时候,到达keepAliveTime, 核心线程池中的空闲线程也会被关闭,闲话少说,上图:
它是一个能够将线程延后执行,或者间隔固定时间多次执行的ExecutorService,创建它可以用Executors的工厂创建。具体方法的实现可以参看并发包文档,这里不做进一步说明。(以后有时间会在详细补上)
1. Lock
java在并发引入了锁的概念,与之前的同步代码块synchronized不同,lock可以在并发情况下提高读写的效率,当调用同步代码块synchronized的时候,无论我们进行读还是写操作,都只会让一个线程去执行同步代码块里的代码,其他线程只能在锁外等待锁资源被释放,而且如果是由于代码等原因导致当前线程被阻塞住了,导致锁资源不会被释放,很容易造成死锁的现象。这在高并发情况下也是很常见的现象,而Lock这个接口就能很好的帮助我们解决这种问题。
Lock中常用的上锁的方法有四种,分别是.lock(),tryLock(),.tryLock(Longtime,TimeUnits unit),.lockinterablity(),下面就详细的解释一下这四种方法的用法。
.lock()是最常用的锁的方法,简单来讲就是线程调用这个方法来获取锁,如果已经有线程获取了锁,那么其他线程在调用这个方法的时候就会阻塞住,但是我们在使用lock()方法之前,需要了解Lock和synchronized不同,不是java内置的特性,而是一个单独的类,所以任何代码在获取了锁之后,都要在最后调用unlock()方法来释放锁的资源,所以我们经常会看到如下的组合:
Lock lock=new ReentrantLock();
lock.lock();
try {
//执行代码
} catch (Exception e) {
//处理异常
}
finally{
lock.unlock();}
tryock()这个方法是有返回值的,当有线程去调用这个方法的时候,会立即去获取锁的资源,如果获取成功,那么返回true,获取失败返回false,而且这个方法不会被阻塞住,会立即返回,配合if()使用最佳:
Lock lock=new ReentrantLock();
if(lock.tryLock()){
//如果获取到了锁
try {
//执行代码
} catch (Exception e) {
//处理异常
}
finally{lock.unlock();}
}else{
//如果没有获取到锁。。。
}
tryLock(Long time,TimeUnit unit)这个方法与tryLock基本类似,唯一的区别在于多了两个参数,时间和时间单位,也就是说当线程通过此方法去获取锁的时候,如果没有获取到锁,会先阻塞一段时间,直到超时,会返回false。
.lockinterablity()这个方法比较特殊,当线程使用这个方法获取不到锁的时候,线程会中断阻塞状态。但是需要注意的是,此方法仅限于没有获取到锁资源的线程,对于已经获取到了锁资源的线程,是不会中断线程的执行。
6.1 ReadWriteLock
顾名思义读写锁,其中有两个比较重要的方法readlock()和writelock(),这两个方法会将线程的读写过程分开来上锁。举个例子,当有两个线程同时对一个资源进行读操作,而其中有一个线程调用了readlock(),不会影响另一个线程。但是需要注意的是,如果一个线程先进性此资源的读操作,而另一个线程要对这个资源进行写操作的话,需要等待前一个线程的读锁释放后才能继续执行,相对的,如果前一个线程先进行了写的操作,后一个线程无论要进行读还是写操作,都会等前一个线程释放的锁之后才能继续执行。
Java.util.concurrent包里除了以上的知识外,还引入了其他东西,如ForkJoinPool(关于线程的框架),AtomicBoolean原子性布尔/AtomicInteger原子性整型/Atomiclong原子性长整型等其他知识点,有兴趣的小伙伴可以研究一下。
以上是JAVA高并发包里的基本知识,具体细节之后会在做补充,欢迎各位小伙伴在评论区里讨论^^