Java线程II
1.四种线程池
ExecutorsService有一些管理Thread的方法,可以在api中查看
execute(Runnable) 在未来某个时间执行给定的命令(线程)。
submit(Runnable) 提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。
submit(Callable) 提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
invokeAny(...) 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
invokeAll(...) 执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。
shutdown() 和 shutdownNow()区别 点击打开链接
I. newSingleThreadExecutor 单线程的线程池。
II. newCachedThreadPool 可缓存线程池
只有非核心线程,最大线程数很大(Int.Max(values)),它会为每一个任务添加一个新的线程,这边有一个超时机制,当空闲的线程超过60s内没有用到的话,就会被回收。缺点就是没有考虑到系统的实际内存大小。
III. newFixedThreadPool 定长线程池
一个有指定的线程数的线程池,有核心的线程,里面有固定的线程数量,响应的速度快。正规的并发线程,多用于服务器。固定的线程数由系统资源设置。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 固定容量的线程池
* @author Administrator
*
*/
public class FixedThreadPoolTest {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(2);//最多容纳2个线程的线程池
for (int i = 0; i < 7; i++) {//创建7个线程
int no = i;
Runnable runnable = new Runnable(){
@Override
public void run() {
try {
System.out.println("into "+no);
Thread.sleep(1000L);
System.out.println("end "+no);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
es.execute(runnable);//线程放入线程池
}
es.shutdown();//执行完线程后,关闭ExecutorService,
System.out.println("Main end!");
}
}
IV.newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 周期延迟线程池
* @author Administrator
*
*/
public class ScheduledThreadPoolTest {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
Runnable runnable = new Runnable(){
@Override
public void run() {
System.out.println("zzz");
}
};
//1秒后开始执行,之后每过三秒再次执行
pool.scheduleAtFixedRate(runnable, 1, 3, TimeUnit.SECONDS);
}
}
2.自定义线程池
ThreadPoolExecutor 是创建线程池的底层类,上面的四种线程底层都是使用此类的构造方法
I.ThreadFactory 线程工厂
import java.util.concurrent.ThreadFactory;
/**
* 线程工厂
* @author Administrator
*
*/
public class ThreadFactoryTest implements ThreadFactory{
@Override
public Thread newThread(Runnable arg0) {
Thread t = new Thread(arg0);
return t;
}
public static void main(String[] args) {
T t = new T();
ThreadFactoryTest tft = new ThreadFactoryTest();
tft.newThread(t).start();
}
}
class T implements Runnable{
@Override
public void run() {
System.out.println("zzz");
}
}
使用默认的线程工厂和被拒绝的执行处理程序创建新的线程池
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 自定义线程池
* @author Administrator
*
*/
public class MyThreadPool{
public static ExecutorService newMyThreadPool(int min,int max,int time){
//corePoolSize 核心线程数量 maximumPoolSize 线程最大数量 keepAliveTime 线程空闲时,保持存活时间 unit //保持存活时间的单位 workQueue 队列
//用给定的初始参数和默认的线程工厂及被拒绝的执行处理程序创建新的 ThreadPoolExecutor。
//ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, //BlockingQueue<Runnable> workQueue)
//handler 饱和策略
//用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。
//ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, //BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
//threadFactory 线程工厂
//用给定的初始参数和默认被拒绝的执行处理程序创建新的 ThreadPoolExecutor
//ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, //BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
//用给定的初始参数创建新的 ThreadPoolExecutor
//ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, //BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
//handler 当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略还处理新提交的任务
//AbortPolicy:直接抛出异常,默认情况下采用这种策略
//CallerRunsPolicy:只用调用者所在线程来运行任务
//DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
//DiscardPolicy:不处理,丢弃掉
//workQueue 用于保存等待执行的任务的阻塞队列
//ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,按FIFO原则进行排序
//LinkedBlockingQueue:一个基于链表结构的阻塞队列,吞吐量高于ArrayBlockingQueue。静态工厂方法Excutors.newFixedThreadPool()使用了这个队列
//SynchronousQueue: 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,//吞吐量高于LinkedBlockingQueue,静态工厂方法Excutors.newCachedThreadPool()使用了这个队列
//PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
return new ThreadPoolExecutor(min, max, time, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
}
}
3.Future和FutureTask
Future功能 1.判断任务是否完成,2.中断任务 3.获取任务执行结果
可取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future 的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算
FutureTask的使用场景
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class FutureTaskTest {public static void main(String[] args) {
Worker w1 = new Worker("张三");
FutureTask<String> ft1 = new FutureTask<>(w1);
//因为FutureTask<V>实现了Runnable,所以可以使用ft作为参数运行线程
//因为FutureTask<V>实现了Callable<V>,
//Callable<V>接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。
//但是 Runnable 不会返回结果,并且无法抛出经过检查的异常
Thread t1 = new Thread(ft1);
t1.start();
try {
//阻塞等到返回结果,然后才会往下执行
System.out.println(ft1.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
Worker w2 = new Worker("李四");
FutureTask<String> ft2 = new FutureTask<>(w2);
Thread t2 = new Thread(ft2);
t2.start();
try {
//阻塞等到返回结果,然后才会往下执行
System.out.println(ft2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
System.out.println("main stop");
}
}
class Worker implements Callable<String>{
private String name;
public Worker(String name){
this.name = name;
}
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(1);
return name+"工作完成";
}
}
4.Fork/join
把一个任务分成N个小任务,执行每个小任务返回相应的值,汇总得到总结果
第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。
第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。Fork/Join使用两个类来完成以上两件事情:
ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承
ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:
RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。
ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class ForkJoinTest {
public static void main(String[] args) {
//ForkJoinPool 继承了AbstractExecutorService,AbstractExecutorService实现了ExecutorService
//ExecutorService.submit()提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。
ForkJoinPool fjp = new ForkJoinPool();
CountTask ct = new CountTask(1,100);
Future<Integer> f = fjp.submit(ct);//异步执行任务,并立即返回一个Future 对象
try {//得到结果
System.out.println("结果=="+f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
//继承RecursiveTask<Object>类用于有返回值的任务
class CountTask extends RecursiveTask<Integer>{
private static final long serialVersionUID = 1L;
public static final int threshold = 2; //阈值 ,当前任务大于阈值时,会继续细分
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
//System.out.println(Thread.currentThread().getName());
//等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
//合并子任务
sum = leftResult + rightResult;
}
return sum;
}
}
5. Spring 的ThreadPoolTaskExecutor线程池
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
ThreadPoolTaskExecutor执行线程的方法,不带返回值,当阻塞队列满了的时候,饱和策略抛异常
带返回值
写一个有返回值的例子
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
public class TestCallable implements Callable<String>{
private String name;
public TestCallable(String name){
this.name = name;
}
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(5);
return name;
}
}
109行获取一个线程,执行给定任务,111行,获取任务的返回值
调用时,会等待5秒,然后输出“张三”;