JAVA线程池的几种使用方式以及线程同步详解
JAVA创建线程的方法
- 继承Thread类
public class MyThread extends Thread {
@Override
public void run(){
// do something
}
}
- 实现Runnable接口
public class MyRunnable implements Runnable{
@Override
public void run() {
}
}
或者直接在使用的时候
new Thread(() -> {
log.info("run....");
}).start();
- 使用Callable和Future创建线程
创建Callable接口的实现类,并实现call()方法,该call()方法将作为线程执行体,并且有返回值
创建Callable实现类的实例,使用FutureTask类来包装Callable对象,该FutureTask对象封装了该Callable对象的call()方法的返回值。(FutureTask是一个包装器,它通过接受Callable来创建,它同时实现了Future和Runnable接口。
使用FutureTask对象作为Thread对象的target创建并启动新线程
调用FutureTask对象的get()方法来获得子线程执行结束后的返回值
public class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
// TODO Auto-generated method stub
return 1111;
}
}
线程池的使用
JAVA中使用ThreadPoolExecutor来创建线程池,参数如下:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* 核心线程数,
* 核心线程数时最小的存活线程数量
* 设置allowCoreThreadTimeout=true(默认false)时,核心线程数为0,所限线程都会超时关闭
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* 最大线程数,超过最大线程数将不会再创建线程,新进来的任务会等待或被拒绝
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* 线程空闲时间
* 线程空闲会等待keepAliveTime之后,超时退出,直到线程数为核心线程数
* 如果allowCoreThreadTimeOut=true,会直到线程数为0
* @param unit the time unit for the {@code keepAliveTime} argument
* keepAliveTime的单位
* TimeUnit是一个枚举类型,其包括:
* NANOSECONDS : 1微毫秒 = 1微秒 / 1000
* MICROSECONDS : 1微秒 = 1毫秒 / 1000
* MILLISECONDS : 1毫秒 = 1秒 /1000
* SECONDS : 秒
* MINUTES : 分
* HOURS : 小时
* DAYS : 天
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* 线程池中的任务队列:维护着等待执行的Runnable对象
* SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,无缓冲的等待队列
* 如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,
* 使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大
* LinkedBlockingQueue:无界缓存的等待队列,这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;
* 如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,
* 这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize
* ArrayBlockingQueue:有界缓存的等待队列,可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,
* 如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误
* DelayQueue:无界缓存的等待队列,队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。
* 这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* 任务拒绝处理器,默认是AbortPolicy
* ThreadPoolExecutor类有几个内部实现类来处理这类情况:
* AbortPolicy 丢弃任务,抛运行时异常
* CallerRunsPolicy 执行任务
* DiscardPolicy 忽视,什么都不会发生
* DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
同时在Executors中提供了几种内定的线程池,如下:
public class Executors {
/**
* 创建一个最大线程数目固定的线程池,该线程池用一个共享的无界队列来存储提交的任务。
* 参数nThreads指定线程池的最大线程数
* 创建线程池时,如果线程池没有接收到任何任务,则线程池中不会创建新线程
* 在线程池中线程数目少于最大线程数时,每来一个新任务就创建一个新线程
* 当线程数达到最大线程数时,不再创建新线程,新来的任务存储在队列中
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new java.util.concurrent.ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* 参数threadFactory是线程工厂类,主要用于自定义线程池中创建新线程时的行为
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new java.util.concurrent.ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
/**
* 这两个方法用于创建ForkJoin框架中用到的ForkJoinPool线程池
* 参数用于指定并行数
*/
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
/**
* 默认使用当前机器可用的CPU个数作为并行数
*/
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
/**
* 用于创建只有一个线程的线程池
* 线程池中的任务使用无界队列存储
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new java.util.concurrent.ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new java.util.concurrent.ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
/**
* 用于创建线程数数目可以随着实际情况自动调节的线程池
*/
public static ExecutorService newCachedThreadPool() {
return new java.util.concurrent.ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new java.util.concurrent.ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
/**
* 用于创建只有一个线程的线程池,并且该线程定时周期性地执行给定的任务
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
/**
* 用于创建一个线程池,线程池中得线程能够周期性地执行给定的任务
* corePoolSize是核心线程数,由于使用了无界队列,所有执行线程最大只有核心线程数
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
/**
* 用于包装现有的线程池,包装之后的线程池不能修改,相当于final的
*/
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedExecutorService(executor);
}
/**
* 用于包装可以周期性执行任务的线程池,包装之后的线程池不能修改,相当于final
*/
public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedScheduledExecutorService(executor);
}
/**
* 返回默认的工厂方法类,默认的工厂方法为线程池中新创建的线程命名为
* pool-[虚拟机中线程池编号]-thread-[线程编号]
*/
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
/** Cannot instantiate. */
private Executors() {}
}
线程同步的方法
- 使用synchronized关键字
synchronized有几种用法,用一张表说明用法和作用域
用法 | 被锁的对象 | 代码实例 |
---|---|---|
实例方法 | 类的实例对象 | public synchronized void funOne() {} |
静态方法 | 类对象 | public static synchronized void funThree(){} |
实例对象 | 类的实例对象 | public void funFive(){ synchronized (this){} } |
class对象 | 类对象 | public void funSix(){ synchronized (SynchronizedSample.class){ } } |
object对象 | object对象 | private String lock = “”; public void funSeven(){ synchronized (lock){} } |
- wait与notify
wait():使一个线程处于等待状态,并且释放所持有的对象的lock。
sleep():使一个正在运行的线程处于睡眠状态,是一个静态方法,调用此方法要捕捉InterruptedException异常。
notify():唤醒一个处于等待状态的线程,注意的是在调用此方法的时候,并不能确切的唤醒某一个等待状态的线程,而是由JVM确定唤醒哪个线程,而且不是按优先级。
Allnotity():唤醒所有处入等待状态的线程,注意并不是给所有唤醒线程一个对象的锁,而是让它们竞争。 - 使用特殊域变量(volatile)实现线程同步
关键字volatile的使用目前存在很大的混淆,volatile保证可见性,但不能保证原子性,所以并不能保证线程同步的,只是在一些特殊情况下的一种弱同步机制
比如下面的代码的运行结果
begin size: 5000, count:0
over size: 5000, count:4981
private volatile int count = 0;
private void add() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.count++;
}
public void test(int size) {
count = 0;
log.info("begin size: {}, count:{}", size, count);
for (int i = 0; i < size; i++) {
new Thread(() -> add()).start();
}
log.info("over size: {}, count:{}", size, count);
}
这里就涉及到了内存模型的问题 具体说来, JVM中存在一个主存区(Main Memory或Java Heap Memory),对于所有线程进行共享,而每个线程又有自己的工作内存(Working Memory),工作内存中保存的是主存中某些变量的拷贝,线程对所有变量的操作并非发生在主存区,而是发生在工作内存中,而线程之间是不能直接相互访问,变量在程序中的传递,是依赖主存来完成的。
对于volatile修饰的变量,jvm虚拟机只是保证从主内存加载到线程工作内存的值是最新的
举例说一下
假如线程1,线程2都去获取主内存中的值,发现主内存中count的值都是5,就会将这个5的值放入工作内存中,而在线程1中修改之后再刷新到主存中,这时主存的count值就变为6了,而线程2早就把5取到了工作内存中,因此修改之后也是6,刷新到主存中,这样就发生了上面程序的情况了
volatile还有一个重要的作用是禁止指令重排序优化,举个例子
下面代码中如果initFlag不用volatile修饰的话,就不能保证程序在运行的时候numOne,numTwo,numThree,str的赋值一定在initFlag之前,多线程并发访问的时候就会有可能出现initFlag为true,但是前面的赋值并没有完成
private int numOne;
private int numTwo;
private int numThree;
private String str;
private volatile boolean initFlag = false;
public void init(){
numOne = 1;
numTwo = 2;
numThree = 3;
str = "abc";
initFlag = true;
}
这里有个名词叫做内存屏障Memory Barrier或Memory Fence
内存屏障也称为内存栅栏或栅栏指令,是一种屏障指令,它使CPU或编译器对屏障指令之前和之后发出的内存操作执行一个排序约束。
这通常意味着在屏障之前发布的操作被保证在屏障之后发布的操作之前执行。
内存屏障可以被分为以下几种类型
LoadLoad屏障:
抽象场景:Load1; LoadLoad; Load2
Load1 和 Load2 代表两条读取指令。在Load2要读取的数据被访问前,保证Load1要读取的数据被读取完毕。
StoreStore屏障:
抽象场景:Store1; StoreStore; Store2
Store1和 Store2代表两条写入指令。在Store2写入执行前,保证Store1的写入操作对其它处理器可见
LoadStore屏障:
抽象场景:Load1; LoadStore; Store2
在Store2被写入前,保证Load1要读取的数据被读取完毕。
StoreLoad屏障:
抽象场景:Store1; StoreLoad; Load2
在Load2读取操作执行前,保证Store1的写入对所有处理器可见。StoreLoad屏障的开销是四种屏障中最大的。在大多数处理器的实现中,这个屏障是个万能屏障,兼具其它三种内存屏障的功能。
- 使用重入锁实现线程同步
ReenreantLock类的常用方法有:
ReentrantLock() : 创建一个ReentrantLock实例
lock() :获得锁
unlock() : 释放锁
- 使用局部变量实现线程同步
如果使用ThreadLocal管理变量,则每一个使用该变量的线程都获得该变量的副本,副本之间相互独立,这样每一个线程都可以随意修改自己的变量副本,而不会对其他线程产生影响。
ThreadLocal 类的常用方法 ThreadLocal() : 创建一个线程本地变量 get() : 返回此线程局部变量的当前线程副本中的值 initialValue() : 返回此线程局部变量的当前线程的"初始值" set(T value) : 将此线程局部变量的当前线程副本中的值设置为value
- 使用阻塞队列实现线程同步
例如使用LinkedBlockingQueue来实现线程的同步
LinkedBlockingQueue是一个基于已连接节点的,范围任意的blocking queue。
队列是先进先出的顺序(FIFO),LinkedBlockingQueue 类常用方法 LinkedBlockingQueue() :
创建一个容量为Integer.MAX_VALUE的LinkedBlockingQueue put(E e) :
在队尾添加一个元素,如果队列满则阻塞 size() : 返回队列中的元素个数 take() : 移除并返回队头元素
- 使用原子变量实现线程同步
AtomicInteger类常用方法:
AtomicInteger(int initialValue) : 创建具有给定初始值的新的
AtomicIntegeraddAddGet(int dalta) : 以原子方式将给定值与当前值相加
get() : 获取当前值
参考
[1]https://www.cnblogs.com/goody9807/p/6522176.html
欢迎关注微信交流