深入学习Java线程池
深入学习Java线程池
先来看一下大体的架构:
先从最开始的开始吧,我们平时提交的任务都是 Runnable 类型的,可以看到 Executor 里面的 execute 方法就是接受一个Runnable类的参数
故我们平时可以直接实现Executor接口:
下面这种是线程池同步的执行每一个任务:
public class DirectExecutor implements Executor {
@Override
public void execute(Runnable command) {
// 同步执行,每个任务
command.run();
}
}
也可以才用传进来一个任务的时候单独启动一个线程来运行:
public class NewTaskExecutor implements Executor {
@Override
public void execute(Runnable command) {
// 每个任务都创建一个一个线程来执行
new Thread(command).start();
System.out.println(Thread.currentThread().getName());
}
}
1. ExecutorService
再来看ExecutorService:
public interface ExecutorService extends Executor {
/**
* 关闭线程池,已提交的任务继续执行,不接受新的任务提交
*/
void shutdown();
/**
* 关闭线程池,尝试关闭正在执行的任务,并且不再接受新的任务提交
* 比前面多了一个now,区别在于它可以去停止当前正在执行的任务
*/
List<Runnable> shutdownNow();
/**
* 线程池是否已经关闭了
*/
boolean isShutdown();
/**
* 如果调用了shutdown和shutdownnow方法后,所有任务都结束了,那么返回true
* 该方法必须在调用了shutdown和shutdownNow方法之后调用才会返回true
*/
boolean isTerminated();
/**
* 请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。
* 等待所有任务完成,并设置了超时时间
* 实际中是:先调用shutdown和shutdownNow,然后再调用该方法等待所有线程真正完成,最后的返回值表示是否超时
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交一个Callable任务
* 返回一个表示任务的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
* 该 Future 的 get 方法在成功完成时将会返回给定的结果。
* 因为Runable没有返回值,所有我们制定第二个参数来作为返回值
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
* 该 Future 的 get 方法在成功 完成时将会返回 null。
*/
Future<?> submit(Runnable task);
/**
* 执行所有任务,返回Future类型的一个list
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
* 给invokeAll设置了一个超时时间
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 只要其中的任意一个任务结束了就可以返回,返回是那个任务的结果
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 跟上面方法一样,带超时时间,超过超时时间,则抛出TimeoutException异常
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
可见ExecutorService在实现了Executor接口之后,增加了许多方法用于submit提交任务,关闭线程池等,还可以提交一个任务集合。
1.1 Runnable、Callable、Future、FutureTask
再来说一下submit的中的参数,一个是Runnable,一个是Callable,我们都知道Runnable不能取得方法执行完的结果,所以才有了Callable,Runnable 中提供了一个run方法让我们用来执行
@FunctionalInterface
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
注意上面有个 @FunctionalInterface
表明这个再Java8之后可以用lambda表达式代替的。然后再来看一个常用的接口Future:
可见Future 提供了get方法可以取得方法执行完的结果
接下来是RunnableFuture:
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
这样通过一个RunnableFuture将Runnable的run方法和Future的get方法整合在了一起,我们来看RunnableFuture的子类FutureTask:
可见我们传进去的Runnable和Callable最后都会变成FutureTask,我们往线程池中提交任务都是提交的FutureTask
最后我们来看一下他们之间的整体关系:
2. AbstractExecutorService
再来看ExecutorService下的AbstractExecutorService方法
可见我们传给 ExecutorService的Runnable,因为没有返回值,最后在调用其下面的抽象类的时候回调用newTaskFor方法将Runnable包装成Callable
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
再看FutureTask:
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
3. Executors
ThreadPoolExecutor是表示一个线程池,而Executors则是线程工厂的角色,通过Executors,我们可以得到一个拥有特定功能的线程池。
下面来看一下Executors提供了那些线程池创建方法:
3.1 newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool()
:该方法将返回一个固定线程数量的线程池,该线程池的数量将始终不变。当一个线的任务提交时,线程池中若有空闲的线程,则立即执行,否则将会将任务添加到任务队列中,等到线程空闲时,再处理任务队列中的任务,但是由于它采用的阻塞队列是 LinkedBlockingQueue
,是一个最大值很大(Integer.MAX_VALUE)的队列,也可以认为是无界队列(个人看法),当线程池中的任务处理不及时的时候,而一边又疯狂的提交任务,将会导致OOM发生。
3.2 newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newSingleThreadExecutor
:可见这个方法,只会创建一个线程的线程池。多余的任务还是会被添加到 LinkedBlockingQueue
中,也会有OOM情况的发生。
3.3 newSingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
newSingleThreadScheduledExecutor
: 该方法将返回一个ScheduledExecutorService
对象,而且线程池的大小为1。ScheduledExecutorService
接口在 ExecutorService
接口之上扩展了在给定时间执行某任务的功能,如在某个固定的延时之后执行或者周期性执行某个任务,后面再细说一下。
3.4 newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
newScheduleThreadPool
:该方法也返回一个ScheduledExecutorService
对象,不过可以指定线程的数量。
3.4 newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newCachedThreadPool
:该方法将返回一个可根据实际情况调整的线程数量的线程池,线程池的数量不固定,我们可以看见上面的方法中设置的是corePoolSize为0,maximumPoolSize为整数最大值,保活时间为60秒,阻塞队列为SynchronousQueue
,故线程池中有空闲线程可以复用的话,则会优先复用空闲线程,如果所有的线程都在工作的话,新的任务提交,直接会创建新的线程处理任务,所有线程处理完任务后,将会返回线程池进行复用。如果同时又大量任务提交,那么将会开启等量的线程,这样也会导致OOM。
4. 计划任务
我们在上面已经知道了ScheduleExecutorService可以根据时间需要对线程进行调度,则:
-
schedule
:会在给定时间,对任务进行一次调度 -
scheduleAtFixedRate
:周期性调度,任务的调度频率是一定的。它是以上一个任务开始执行时间为起点,之后的period时间,调度下一次任务,即period包含了线程执行的时间的。 -
scheduleWithFixedDelay
:周期性调度,在上一个任务结束后,再经过delay时间进行任务调度。
一个例子:
public class ScheduledExecutorServiceDemo {
public static void main(String[] args) {
ScheduledExecutorService ses= Executors.newScheduledThreadPool(10);
ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(System.currentTimeMillis()/1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},0,2, TimeUnit.SECONDS);
}
}
线程睡眠1秒,间隔两秒执行
1535267595
1535267597
1535267599
1535267601
1535267603
1535267605
1535267607
但是如果我们睡眠是5秒的话,那么
1535267752
1535267757
1535267762
1535267767
可见如果间隔时间period比线程执行时间短的话,还是得等到线程执行完才开始下一个。
此外,调度程序并不会保证任务会无限期执行的,如果任务本身抛出了异常,那么后面的执行任务都将会中断,故需要做好及时处理异常。
5. WorkQueue
5.1 ArrayBlockingQueue
一个由数组组成的有界阻塞队列
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
...
}
构造ArrayBlockingQueue必须要指定容量,也可以指定是否是公平的,公平的情况下会按照FIFO的顺序等待执行任务
5.2 LinkedBlockingQueue
一个由链表组成的有界阻塞队列
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
...
}
LinkedBlockingQueue
:默认的容量为Integer最大值,故其实也是一个有界队列,按照FIFO顺序等待执行
5.3 PriorityBlockingQueue
一个理由堆实现的含有优先级的阻塞队列
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
public PriorityBlockingQueue(Collection<? extends E> c) {
...
}
PriorityBlockingQueue
:的默认初始元素个数为11个,默认情况下元素采取自然顺序升序排列,但不能保证相同优先级之间的顺序
5.4 DelayQueue
private final PriorityQueue<E> q = new PriorityQueue<E>();
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
DelayQueue
:一个支持延时的无界阻塞队列,使用PriorityBlockingQueue实现的,可以用于缓存(设置有效期),定时任务调度
5.5 SynchronousQueue
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
SynchronousQueue
:一个不存储元素的阻塞队列,每一个put操作都需要等待一个take操作,相当于一个传递的效果
5.6 LinkedTransferQueue
一个由链表组成的无界阻塞队列
public LinkedTransferQueue() {
}
public LinkedTransferQueue(Collection<? extends E> c) {
this();
addAll(c);
}
LinkedTransferQueue
:里面包含了tryTransfer和transfer两个方法
- transfer:如果当前有消费者正在等待接收元素,transfer可以把生成传入等待队列中的元素立刻传递给消费者,否则直接放入队列尾部
- tryTransfer:试探生成者是否可以直接将元素传递给消费者
5.7 LinkedBlockingDeque
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
public LinkedBlockingDeque(Collection<? extends E> c) {
...
}
LinkedBlockingDeque
:一个由链表组成的双向阻塞队列
6. ThreadPoolExecutor
上面的Executors创建出来的各种线程池,其核心都是利用的ThreadPoolExecutor,下面来介绍一下它:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
这是构造ThreadPoolExecutor的最全的构造方法:
-
corePoolSize
:指定了线程池中的线程数量 -
maximumPoolSize
:线程池中最大线程数量 -
keepAliveTime
:当线程池线程超过corePoolSize时,多余的空闲线程的存活时间。即,超过了corePoolSize的空间时间,在多长时间被销毁 -
unit
:keepAliveTime的单位 -
workQueue
:任务队列,被提交但尚未被执行的任务 -
threadFactory
:线程工厂,用于创建线程,可以利用guava线程工厂,默认也可 -
handler
:拒绝策略,当任务太多,来不及处理,如何执行拒绝任务
下面来说一下线程的执行逻辑:
7. 拒绝策略
JVM一共提供了四种拒绝策略
7.1 AbortPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
该策略会直接抛出异常,阻止系统正常工作
7.2 CallerRunsPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
只要线程未关闭,该策略直接调用者线程中执行将被丢弃的任务,这样的话不会真正抛弃任务,但会影响提交线程的性能。
7.3 DiscardOldestPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
丢弃最开始的(即将被执行的)任务,并尝试再次提交任务。
7.4 DiscardPolicy
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
该策略将直接丢弃无法处理的任务,不予任何处理
7.5 自定义拒绝策略
在前面提供的四种策略中,如果没有能满足你需求的,可以实现拒绝策略接口,重写方法
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
下面是一个例子:
public class RejectThreadPoolDemo {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyTask myTask=new MyTask();
ExecutorService executorService= new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString()+" is discard");
}
});
for (int i = 0; i < 20; i++) {
executorService.submit(myTask);
Thread.sleep(10);
}
executorService.shutdown();
}
}
我们设置的corePoolSize和maximumPoolSize都为5,等待队列长度为10,使用的是Executors的默认线程工厂,自定义的拒绝策略就是输出一下,剩下的和直接丢弃一样的,下面是运行结果:
pool-1-thread-1 1535338398869
pool-1-thread-2 1535338398879
pool-1-thread-3 1535338398889
pool-1-thread-4 1535338398899
pool-1-thread-5 1535338398909
[email protected] is discard
[email protected] is discard
[email protected] is discard
[email protected] is discard
[email protected] is discard
pool-1-thread-1 1535338399869
pool-1-thread-2 1535338399879
pool-1-thread-3 1535338399890
pool-1-thread-4 1535338399899
pool-1-thread-5 1535338399909
pool-1-thread-1 1535338400869
pool-1-thread-2 1535338400879
pool-1-thread-3 1535338400890
pool-1-thread-4 1535338400900
pool-1-thread-5 1535338400910
可见,最开始的5个,由corePoolSize执行,然后后面的10个进入等待队列,再剩下的5个就被抛弃了,然后后面等待队列中的10个开始执行。
8. 自定义线程创建
我们可以使用ThreadFactory进行线程的自定义创建
public class ThreadFactoryDemo {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
try {
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyTask myTask=new MyTask();
ExecutorService executorService=new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t=new Thread(r);
t.setDaemon(true);
System.out.println("create"+t.getName());
return t;
}
});
for (int i = 0; i < 5; i++) {
executorService.submit(myTask);
}
Thread.sleep(1000);
executorService.shutdown();
}
}
我们将所有的线程都设置为守护线程,主线程结束,则他们都得结束,则结果为:
createThread-0
createThread-1
createThread-2
createThread-3
createThread-4
Thread-0 1535338945922
Thread-1 1535338945922
Thread-2 1535338945922
Thread-3 1535338945922
Thread-4 1535338945922
可见,我们将创建线程的过程输出了,还有线程进入Sleep之前的话输出了,但是sleep之后的由于主线程结束而没有输出。
9. 线程池的扩展
当我们想要监控线程池的时候,比如获取每个任务的开始执行和结束执行时间,ThreadPoolExecutor提供beforeExecute
、afterExecute
和terminated
三个接口对线程池进行控制。
在ThreadPoolExecutor中的Worker中的runWorker(JDK1.8)方法中
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
可见它会在之前调用beforeExecute和在finally语句块中调用afterExecute方法,而默认的是空的方法:
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
我们可以对他们进行扩展用来对线程进行进行监控和调试
下面是一个例子:
public class ExtThreadPool {
public static class MyTask implements Runnable{
private String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("正在执行 ID: "+Thread.currentThread().getId()+" NAME: "+name);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService=new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行:"+((MyTask) r).name);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完成:"+((MyTask) r).name);
}
@Override
protected void terminated() {
System.out.println("线程退出");
}
};
for (int i = 0; i < 5; i++) {
MyTask myTask=new MyTask("Task-"+i);
executorService.execute(myTask);
Thread.sleep(100);
}
executorService.shutdown();
}
}
我们在执行前,执行中,执行后,和退出都加了输出:
准备执行:Task-0
正在执行 ID: 12 NAME: Task-0
准备执行:Task-1
正在执行 ID: 13 NAME: Task-1
准备执行:Task-2
正在执行 ID: 14 NAME: Task-2
准备执行:Task-3
正在执行 ID: 15 NAME: Task-3
准备执行:Task-4
正在执行 ID: 16 NAME: Task-4
执行完成:Task-0
执行完成:Task-1
执行完成:Task-2
执行完成:Task-3
执行完成:Task-4
线程退出
可见,最后的terminated()方法要在线程池关闭之后才会退出
10. 线程池线程数量
一个经验公式:
Ncpu:CPU数量
Ucpu:目标CPU的使用率,0<=Ucpu<=1
W/C:等待时间与计算时间的比率
则最优的池的大小为:
Nthreads=Ncpu*Ucpu*(1+W/C)