java 线程池理解
重新理解 java 的线程池和怎么自定义一个合适的线程池
1. 怎么自定义线程池呢?
java的线程池核心就是 ThreadPoolExecutor,后面的四种线程池也是配置不同的ThreadPoolExecutor,如:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
接下来看下,ThreadPoolExecutor 的构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
主要的参数有:
(1) corePoolSize 核心线程数量
(2) maximumPoolSize 最大线程数量
(3) unit 超时时间单位 ( TimeUnit.SECONDS 等)
(4) keepAliveTime 超时时间,当线程空闲超过keepAlive会被回收,保证线程数量不大于核心线程数量,表示核心线程已经够应对当前的任务了
(5) workQueue 任务阻塞队列,用来缓存待执行的任务
(6) ThreadFactory 负责创建Runnable
(7) RejectedExecutionHandler 拒绝策略,当任务缓存队列满了,线程也是最大数量了,此时执行拒绝策略
2. blockingQueue:
阻塞式队列,内部是一个先进先出的队列,不过是线程安全的
(1)获取元素:如果队列为空时,会阻塞等待队列中有元素再返回
(2)添加元素:如果队列已满,会阻塞等到队列有空位时再放入
种类
-
ArrayBlockingQueue 阻塞有界队列,底层采用数组实现,读写操作都要获得锁
-
LinkedBlockingQueue 阻塞无界的队列,底层使用链表,所以队列大小默认值是Integer.MAX_VALUE,也可以指定容量大小。同ArrayBlockingQueue 不一致的是LinkedBlockingQueue 对添加和移除方法使用单独的锁控制,ArrayBlockingQueue 使用同一个锁控制
参考:https://blog.****.net/javazejian/article/details/77410889?locationNum=1&fps=1 -
DelayQueue 阻塞无界的队列,当其指定的延迟时间到了才可以队列获取到元素。往队列中插入数据的操作(生产者)不会被阻塞,而获取数据的(消费者)会被阻塞
-
PriorityBlockingQueue 基于堆的无界并发安全的优先级队列
-
SynchronousQueue 内部没有数据缓存空间,队列头元素是排队要插入数据的线程,配对的生产者和消费者线程之间直接传递数据,并不会将数据缓冲数据到队列中
3. ThreadPoolExecutor 任务执行流程
4. 拒绝策略 RejectedExecutionHandler:
当线程池线程数量和缓存队列都满了,就会执行拒绝策略来处理这种情况:
(1)AbortPolicy 直接中断抛出运行时RejectedExecutionException异常
(2)CallerRunsPolicy 在调用的线程执行运行该任务
(3)DiscardOldestPolicy 在任务队列中删除第一个元素
(4)DiscardPolicy 直接抛弃任务
5. 四种常用线程池
可见本质上都是通过ThreadPoolExecutor设置形成不同特点的线程池
(1)newCachedThreadPool
无限的线程数量 和 利用SynchronousQueue不需要缓存任务直接执行,适时回收
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
(2)newSingleThreadExecutor
单一线程,无限长度的缓存队列
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
(3)newFixedThreadPool
定长的线程数,无限长度的缓存队列
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
(4)ScheduledExecutorService
利用 DelayedWorkQueue 延迟执行定时任务
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}