jdk线程池实现原理分析
目录
为什么需要线程池
先来看看数据库连接池吧。我们在实际生产中使用数据库,基本上应该没人是直接使用jdbc的吧,而是都心照不宣的使用了数据库连接池。那么为什么?
最简单的回答就是为了连接复用。那么我们为什么需要复用数据库连接呢?简单回答就是数据库连接资源有限,并且新建数据库连接是个比较重的操作,比如建立tcp连接,mysql服务器需要鉴权等,如果每个请求都新建一个连接,那么这个请求的响应就会变慢,连接的建立和销毁占用了很多时间。所以解决的方式就是通过池化技术,将封装连接的Connection对象进行缓存,处理请求的时候是在连接池中获得已有连接,从而省去了连接建立的过程。
其实对于很多资源的池化的目的都是为了复用资源,最大化资源的利用率。比如数据库连接池,线程池等等。而实现资源池化的最基本的原理就是信号量,jdk中的Semaphore就是信号量的一个实现。在java中,资源一定是疯转在对象中的,所以资源的池化,体现在java中,资源池就是对象池。《thinking in java》中给出了一个使用Semaphore来实现对象池的很好的例子。
java的线程池也是为了资源复用,复用线程池中的线程来驱动任务的执行,如果采用per-request-per-thread的方式来响应请求,那么线程的创建和销毁要占用一部分时间,影响性能。因为java中线程(Thread对象)的创建是一个相对比较重的操作。所以为了解决这个问题,才引入了线程池。
下面就简单说明一下,java创建Thread对象,为什么是个重操作。
一台计算机上的计算资源(cpu)只能由操作系统来管理和分配,当应用程序需要计算资源的时候,需要向操作系统申请。而一台机器上的cpu资源是很有限的,而且为了完成一个计算任务,是需要cpu,内存,甚至外设之间相互配合才能完成的。但是cpu的运行速度比内存的速度要快上很多,任务执行期间,cpu就需要等内存响应,这段时间cpu就空转浪费了。为了解决这些硬件之间速度的差异,操作系统引入进程的概念,通过时分复用计算资源。所以我们可以认为进程可以代表计算机的计算资源。
所以,所有的计算任务,最终都是操作系统的进程(线程)来驱动完成的,而编程语言中的线程/进程可以认为只是编程语言给程序员的一个使用计算资源的接口。拿java的Thread来说,在java语言层面Thread代表的就是一个线程,它可以驱动任务的完成,但是Thread本身不具备计算能力的,可以简单认为这是一个委托模式:编程语言中的线程,将驱动任务执行委托给操作系统的线程/进程。我们每次调用Thread.start()的时候,会创建一个操作系统的集成,Thread#run()的时候,操作系统的进程来驱动完成这个任务。
编程语言要支持多线程编程,那么它就必须解决一个问题:编程语言中的线程和操作系统内核进程之间的映射关系。常见的关系有1:1,n:1,n:m。
java中的线程,采用的就是1:1的模型,即一个Thread对象,都对应了一个操作系统的线程。
linux中的fork()方法可以创建一个进程,方式就是完全copy父进程,包括内存,特别是父进程占用的内存资源较多的时候,这个copy动作是比较耗时的。那么另外一个函数就是clone(),它也是通过拷贝父进程,只是是延迟拷贝,真的使用的时候才会发生拷贝动作。
一个java的线程,对应了一个操作系统内核LWP(light weight processor),而LWP的创建最终就是调用了clone()方法。而clone()函数的的调用,需要切换到内核态,即会发生进程的上线文切换,进程的上下文切换也是一个比较耗时的操作。
综上,简要说明了一下java中创建一个Thread对象是比较耗时的原因,所以引入了线程池。
番外篇:
说道编程语言中的线程和操作系统的进程之间的映射关系的时候,除了java采用的1:1模型,还有两个,一个是n;1和n:m。现在比较火的协程,就是采用的n:m的模型。编程语言中的"线程"的调度分配资源全是在应用层完成(即编程语言会提供"线程"的调度),而编程语言中的"线程"和操作系统的进程是n:m的,即创建一个编程语言中的"线程"不再需要对应创建一个操作系统的进程,具体要创建多少操作系统进程,如何创建,编程语言都屏蔽了,程序员关心的只是编程语言中的"线程"。创建编程语言中的"线程"中的线程时,没有了操作系统的系统调用,那么创建编程语言中的"线程"跟创建一个POJO对象的代价就差不多了,那么这个时候per-request-per-thread就有用武之地了,而线程池也就淡化了。
很多语言都已经实现了协程,如golang。而java也有实现,只是还没有官宣,它就是Fabric,相信这一天也不久了。
生产者-消费者模式
生产者-消费者模式在分布式环境中,应用的特别的多,常常会用它来实现应用之间的解耦,削峰,延迟响应等。而现在也有很多成熟的实现框架,如kafka等,可以说是分布式环境中不可或缺的一个组件。
生产者-消费者模式在的三要素:生产者,队列,消费者。
- 生产者:这很好理解,就是产生事件/消息/任务的一方,可以是一个线程,一个进程,也可以是一组线程,一组进程等。
- 队列:用户缓存事件/消息/任务。这个其实严格来说并不是生产者-消费者模型中必须的,没有它也可以实现生产者-消费者模型,比如生产者生产一个消息,必须等消费者消费完了才能生产下一个,而生产者也是一次只能消费一个消息,消费完了就只能等待生产者继续生产,这种场景是不需要队列就可以很简单的实现的。但是在实际中,这种场景比较少,更多的是需要这个缓冲队列,有了这个队列也可以实现更多的功能,如延迟响应,削峰等。所以实际中,这个缓冲队列都是有的。而且一般都是个阻塞队列。
- 消费者:这个也很好理解,它负责从队列中获取任务,然后消费。
promise模式
promise本身的资料非常多了,这里就不作过多的介绍,用给一个快餐店点餐的例子,大概说明一下(如果不喜欢点餐,也可以理解成点钟,可能更容易接受)。
promise的基本结构(图片来自https://www.infoq.cn/article/design-patterns-promise/?utm_source=infoq&utm_campaign=user_page&utm_medium=link)
想想子快餐店吃饭点餐的过程:走进饭店后,在前台服务员处开始点餐,点餐结束,服务员会给你一个小票,然后你就可以自己找位置坐下了,厨子做好餐食后,会通过小票找到你,通知你取餐。这个小票就是你点过餐的一个凭证。
如果没有这个小票系统,会是怎么样呢,点完餐后,就只能傻傻看着厨子,等厨子做好了,你直接拿走,这期间你什么也不能做,只能傻傻厨子做饭,你一旦离开了,谁知道你点过餐了,万一你是来骗吃骗喝的呢。有了小票系统,这个小票就可以证明你点过餐,点的是啥,那么做好了,就可以通过小票通知你取餐,你也就不用看着厨子了。
在多线程环境下,这些角色对应的是什么:
- 饭店:相当于计算资源池,比如线程池ThreadPoolExecutor。
- 点餐服务员:相当于线程池对外提供的提交任务的接口,如ExecutorService#submit()
- 你点的餐食:就是任务。比如Runnable任务,Callable任务。
- 厨子:驱动任务执行的线程。
- 小票:就是凭证,如Future接口,通过凭证可以获得任务执行的结果。
- 点餐的你:就是提交任务的那个线程,即调用ExecutorService#submit()接口的线程。
ThreadPoolExecutor
生产者-消费者实现
生产者-消费者模型中的三元素在ThreadPoolExecutor中的体现:
- 生产者:调用ExecutorService#submit()的那个线程。
- 队列:
这是一个阻塞队列,通过这个阻塞队列很容易实现线程之间的通信。
- 消费者:
,Worker是ThreadPoolExecutor的一个内部类,其中封装的 就是Thread。封装的目的就是为了让线程执行完任务后不退出run(),而是去workQueue中继续获取任务,workQueue中没有任务就阻塞等待。
ThreadPoolExecutor中和生产者-消费者相关的参数:
- corePoolSize,maxPoolSize,keepAliveTime,allowCoreThread,TimeOut 这几个参数都是用来控制线程池中线程的数量的
- ThreadFactory:用来控制线程池中线程产生的过程。一般我们都指定ThreadFactory,这样可以给线程池中的线程起一个有意义的名称,排查问题的时候就很方便。
- handler:在生产者-消费者模型中,一般当生产速率过快,更多的都是通过某种机制来告诉生产者降低生产速率,或者直接暂停生产,比如TCP的拥塞控制,通过对网络状态的收集来改变发送滑动窗口的大小,本质就是降低生产者的速率。但是对于线程池,生产者是用户线程,超出了线程池的控制范围。比如一个web网站,服务器处理不过来了,总不能让用户先别点网站了,不要产生请求了。所以线程池提供了一个拒绝策略,让用户来决定,当线程池处理不过来了,再有任务提交过来,该怎么处理(Executor的策略模式)。
- ctl:这个属性存在感不是很强。字面翻译过来就是控制(controll),其实就是线程池的状态。定义状态的目的就是管理线程池,比如什么时候接受用户提交任务,取消任务,响应中断的管理等,这些都会借助于线程池的状态。比如当调用executor.shutdown()后,不能再提交任务到线程池,但是已经提交的任务依然会执行完成,这就是通过状态控制的。对于中断任务执行,其实就是抛弃任务队列中的任务,并给正在执行任务的线程发送中断信号(Thread.interrupte()),如果这个线程正在Running状态,且任务中也没有主动去检测中断标志的代码,其实正在执行的任务是会执行完成的,忽略中断信号。但是如果线程正在休眠,如sleeping,或者等待Lock锁,因为这些的实现中会响应中断信号,所以可能会中断任务的执行(之所以是可能,原因就是这些响应中断信号方式都是抛出InterruptedException,并清零中断标志,如果任务中吞掉了这个异常,那跟没发生一样)。因为对于一个系统开发来说,线程池并不会关闭,而是复用的,即线程池的生命期跟jvm进程的生命期一样,所以对这些不太关注,更多的关注点都放在了前面的参数上了。
创建线程的逻辑,就是根据这些参数来控制的,具体的逻辑如下,当用户线程提交任务过来的时候,线程池驱动任务执行的过程;
- 如果线程池中线程数<corePoolSize,那么就创建新的线程来驱动任务执行。
- 如果当前线程数==corePoolSize,看workQueue是否满了,如果没有,则将任务放到WorkQueue中。
- 如果workQueue满了,如果当前线程数<maxPoolSize,则创建新线程
- 如果当前线程数=maxPoolSize,则执行handler指定的拒绝策略。
ps:上面的描述中,当任务提交过来,并没有判断是否有空闲线程的逻辑,有空闲线程,就使用空闲线程来执行任务。原因就是Executor的调度是借助阻塞队列的,除了提交时,创建了新线程,这个线程是直接先执行提交过来的任务(firstTask),这个任务执行完成后,线程就会调用阻塞队列的take方法去阻塞队列中拿任务,如果没有任务,该方法就会阻塞线程。所以并存在主动选择哪个线程驱动任务执行的过程,通过阻塞队列来调度的,这也是使用阻塞队列的一个好处吧。
promose模式实现
话不多说,直接来一个promose模式的结构图和Future之间的对应关系。
如下是Future的结果图。
其核心就在于FutureTask。重要的逻辑都在这个类中。想要知道通过Future怎么获得任务执行结果,看先FutureTask就知道了。这里也就不累赘的解读源代码了,明白了promis模式,看这个源代码是很容易的。
ps:
- Thread的run()方法的入参是Runnable类型,且run()方法是没有返回值的,但是ExecutorService#submit()入参可以是Callable,而且还能有返回值,这个返回值的获取的逻辑也在FutureTask中,但是Callable转Runnable的关键就在于RunnableAdapter。
- ExecutorService就支持Runnable和Callable任务,其他场景咋搞?比如有多个返回值,多个入参,这种的话使用ExecutorService,可以自己封装一下。另外就是可以使用CompletableFuture,它支持更多的任务类型,已经更明确的多线程任务之间依赖的表达方式。大部分的并行开发,CompletableFuture都能满足。
ScheduledThreadPoolExecutor
这个类是用于定时任务调度的。它继承了ThreadExecutorPool,最大的区别就在于阻塞队列上,它是指定使用了优先级队列,将最先执行执行的任务放在对头。
ps:优先级队列的实现常常都是堆这种数据结构,堆结构就是一个天然的优先级队列,对求topK的问题很方便。
线程池的使用
- 线程数的设置。线程数并不是越多越好,同一时刻能够并行执行的线程=cpu内核数,超过内核数的线程,都是通过时分共享的方式执行的,那就会设计线程切换,线程切换是会消耗cpu时间的,如果大量的线程切换,就会导致系统响应慢,但是cpu利用率却非常高。所以一般的规则就是:如果是cpu密集型的应用,那么线程池的线程数约等于cpu核数;如果是IO密集型,那么线程池的线程数约等于2*cpu核心数。这只是一个参考值,还需要结合压测以及线上实际情况来调整,比如IO就是比较慢,那么大量的线程处于休眠状态,可以适当增多线程数。
- 阻塞队列的设置。阻塞队列一定要显示设置为有界队列,不能使用无界队列。否则当请求量过大,一致堆积到阻塞队列会导致OOM,导致整个进程不可用。
- 拒绝策略的设置,线程池默认的拒绝策略就是丢弃,需要评估是否允许有提交的任务被丢弃,如果是比较重要的任务,那么就需要自己指定拒绝策略。实际工作中遇到一个小的问题,就是一个异步操作,将所有的任务都放到了一个队列,评估任务是可以丢弃的,所以也自定义了拒绝策略,但是 只是打印日志然后丢弃任务,最后上线后发现,有一个任务是不能丢弃的,大量任务提交过来后,这好有那个重要的任务提交过来,就被丢弃了,丢弃后后续处理就会有问题。解决方式想到了两种,一个是不用线程池队列,采用外部队列,即kafka来实现异步化。另一种方式就是和我们实际场景相关,因为那个重要的任务的流量很小,所以不是使用一个线程池,而是多个,将重要的任务提交到单独的线程池,这样不会应为其他大流量任务导致重要任务丢弃,但这只是这种场景能够极大的减少丢弃重要任务的几率。
- ThreadFactory的指定,虽然有默认值,最好还是自己指定,至少个线程起一个业务相关的名字,这样当出现问题的时候,从stacktrace中,通过线程名就可以很好的定位问题。
- ThreadLocal。
ThreadLocal本身的实现已经考虑了内存泄露的问题,比如ThreadLocal本身不持有数据,ThreadLocalMap是Thread的成员,只要Thread销毁,所有的ThreadLcoal数据就销毁了,当然对于线程池,这个方式没有太大的帮助,因为Thread不会销毁。所以Thread中的ThreadLocalMap变量就一直是存在的,如果在线程池线使用了较多的threadLocal变量,则每个线程中都持有了一个副本,可能可能导致内存泄露。这也是为什么有些地方不建议在线程池中使用ThreadLocal。如果业务场景需要使用,在任务结束后需要ThreadLocal#remove()删除变量,一个是为了防止内存泄露,另外一个就是为了防止使用线程池的时候因为线程复用ThreadLocal变量相互串绕的问题。
但是由于gc问题,对应的key已经被回收了,所以Entry.get()返回就是null,所以通过ThreadLocal#get()拿到的就是空,就有可能npe,所以在使用的时候要提供setInitialValue()方法,get()方法发现为null的时候,会调用该方法,只是这个时候拿到的都是初始值。所以最好别对ThreadLocal本身作什么会更改的逻辑计算,如果需要,通过ThreadLocal获得变量后,定义局部变量来做。
ps:ThreadLocal是线程本地的,如果跨线程了,就拿不到了。这是我在实际工作中遇到的一个场景,我们项目中有个全链路参数就是放到ThreadLocal,各组出现过不止一次多线程中,出现这个全链路参数丢失的情况,所以使用线程池的时候,如果有ThreadLocal需要稍微注意一下。
最后一个关于ThreadLocal实验的疑问:
在ThreadLocal的get(),set()都会调用如下方法,法相key为null(因为key为弱引用被回收了),就会将对应的value进行清除,保证value也会被回收。
但是在如下代码:
保证发生了full gc,但是还是能够拿到"bbbb"的值,debug发现key没有变成null,就有点郁闷了,是哪儿出现了问题?
结果发现:还是aaaa。这里暂时没想通,希望哪个大神看到可以指教一下。
总结:
本文并没有贴任何关于ThreadPoolExecutor的源代码解析,网上的资料非常多,各种解读都有。本文的重点是结合自己的理解和源代码的学习,说明一下线程池的设计思路,我觉得了解了这个,再去看源代码以及使用起来,就更加容易了。