并发编程学习---java中的线程池

目录

  • 简介
  • ThreadPoolExecutor的execute()运行流程
  • 线程池的创建
  • 向线程池提交任务
  • 合理配置线程池
  • 线程池监控

1. 简介

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

  1. 降低资源消耗
    通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  2. 提高响应速度
    当任务到达时,任务可以不需要等到线程创建就能立即执行
  3. 提高线程的可管理性
    线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

线程池的主要处理流程

并发编程学习---java中的线程池
说明:

  1. 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程
  2. 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程
  3. 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务

2. ThreadPoolExecutor的execute()运行流程

并发编程学习---java中的线程池

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
  2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue
  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务
  4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。(注意,执行这一步骤需要获取全局锁)。

3. 线程池的创建

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
参数 说明
corePoolSize 当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程
maximumPoolSize 最大线程数
keepAliveTime 超时时间,超出核心线程数量以外的线程空余存活时间
unit 存活时间单位
workQueue 保存执行任务的队列
threadFactory 创建新线程使用的工厂
handler 当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。

饱和策略

策略名称 说明
AbortPolicy 直接抛出异常
CallerRunsPolicy 只用调用者所在线程来运行任务
DiscardOldestPolicy 丢弃队列里最近的一个任务,并执行当前任务
DiscardPolicy 不处理,丢弃掉

4. 向线程池提交任务

  • Executor.execute(Runnable command)
  • ExecutorService.submit(Callable task)

4.1 execute

用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功

ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(()->{
    System.out.println(1);
});
executorService.shutdown();

4.2 submit

用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值

public class CallableMethod implements Callable {
    @Override
    public Object call() throws Exception {
        System.out.println("实现callable的方式");
        return "callable";
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();
        Future future = es.submit(new CallableMethod());
        //会阻塞,等到线程执行完返回值
        System.out.println(future.get());
    }
}

5. 合理配置线程池

分类:

维度
任务性质 CPU密集型任务 IO密集型任务 混合型任务
优先级
任务执行时间

任务性质分类解决方案

性质 解决方案
CPU密集型任务 配置尽可能小的线程,如配置Ncpu+1个线程的线程池
IO密集型任务 应配置尽可能多的线程
混合型 拆分为一个CPU密集型和一个IO密集型

优先级分类解决方案
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。

6. 线程池监控

总线程数 = 排队线程数 + 活动线程数 + 执行完成的线程数。

ThreadPoolExecutor方法

方法 说明
getQueue() 当前排队线程队列
getActiveCount() 当前活动线程数
getCompletedTaskCount 执行完成线程数
getTaskCount() 总线程数

demo

 ExecutorService es = new ThreadPoolExecutor(50, 100, 0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(100000));
for (int i = 0; i < 100000; i++) {
    es.execute(() -> {
        System.out.print(1);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}
ThreadPoolExecutor tpe = ((ThreadPoolExecutor) es);

while (true) {
    System.out.println();
    int queueSize = tpe.getQueue().size();
    System.out.println("当前排队线程数:" + queueSize);

    int activeCount = tpe.getActiveCount();
    System.out.println("当前活动线程数:" + activeCount);

    long completedTaskCount = tpe.getCompletedTaskCount();
    System.out.println("执行完成线程数:" + completedTaskCount);

    long taskCount = tpe.getTaskCount();
    System.out.println("总线程数:" + taskCount);
    Thread.sleep(3000);
}

运行结果

---------
当前排队线程数:99950
当前活动线程数:50
执行完成线程数:0
总线程数:100000
---------
当前排队线程数:99800
当前活动线程数:50
执行完成线程数:150
总线程数:100000
---------
当前排队线程数:99650
当前活动线程数:50
执行完成线程数:300
总线程数:100000