java 线程池理解

重新理解 java 的线程池和怎么自定义一个合适的线程池

1. 怎么自定义线程池呢?

java的线程池核心就是 ThreadPoolExecutor,后面的四种线程池也是配置不同的ThreadPoolExecutor,如:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,  0L, TimeUnit.MILLISECONDS,
    	   new LinkedBlockingQueue<Runnable>());
}

接下来看下,ThreadPoolExecutor 的构造函数:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
		TimeUnit unit,
		BlockingQueue<Runnable> workQueue,
		ThreadFactory threadFactory, 
		RejectedExecutionHandler handler);

主要的参数有:
(1) corePoolSize 核心线程数量

(2) maximumPoolSize 最大线程数量

(3) unit 超时时间单位 ( TimeUnit.SECONDS 等)

(4) keepAliveTime 超时时间,当线程空闲超过keepAlive会被回收,保证线程数量不大于核心线程数量,表示核心线程已经够应对当前的任务了

(5) workQueue 任务阻塞队列,用来缓存待执行的任务

(6) ThreadFactory 负责创建Runnable

(7) RejectedExecutionHandler 拒绝策略,当任务缓存队列满了,线程也是最大数量了,此时执行拒绝策略

2. blockingQueue:

阻塞式队列,内部是一个先进先出的队列,不过是线程安全的
(1)获取元素:如果队列为空时,会阻塞等待队列中有元素再返回
(2)添加元素:如果队列已满,会阻塞等到队列有空位时再放入

种类

  1. ArrayBlockingQueue 阻塞有界队列,底层采用数组实现,读写操作都要获得锁

  2. LinkedBlockingQueue 阻塞*的队列,底层使用链表,所以队列大小默认值是Integer.MAX_VALUE,也可以指定容量大小。同ArrayBlockingQueue 不一致的是LinkedBlockingQueue 对添加和移除方法使用单独的锁控制,ArrayBlockingQueue 使用同一个锁控制
    参考:https://blog.****.net/javazejian/article/details/77410889?locationNum=1&fps=1

  3. DelayQueue 阻塞*的队列,当其指定的延迟时间到了才可以队列获取到元素。往队列中插入数据的操作(生产者)不会被阻塞,而获取数据的(消费者)会被阻塞

  4. PriorityBlockingQueue 基于堆的*并发安全的优先级队列

  5. SynchronousQueue 内部没有数据缓存空间,队列头元素是排队要插入数据的线程,配对的生产者和消费者线程之间直接传递数据,并不会将数据缓冲数据到队列中

3. ThreadPoolExecutor 任务执行流程

java 线程池理解

4. 拒绝策略 RejectedExecutionHandler:

当线程池线程数量和缓存队列都满了,就会执行拒绝策略来处理这种情况:

(1)AbortPolicy 直接中断抛出运行时RejectedExecutionException异常

(2)CallerRunsPolicy 在调用的线程执行运行该任务

(3)DiscardOldestPolicy 在任务队列中删除第一个元素

(4)DiscardPolicy 直接抛弃任务

5. 四种常用线程池

可见本质上都是通过ThreadPoolExecutor设置形成不同特点的线程池

(1)newCachedThreadPool
无限的线程数量 和 利用SynchronousQueue不需要缓存任务直接执行,适时回收

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

(2)newSingleThreadExecutor
单一线程,无限长度的缓存队列

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

(3)newFixedThreadPool
定长的线程数,无限长度的缓存队列

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

(4)ScheduledExecutorService
利用 DelayedWorkQueue 延迟执行定时任务

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}