并发编程系列(2)- 线程池代码实战
上一节并发编程系列(1)- 线程池原理 已经初步介绍线程池的原理、线程池的创建,但是实际在项目中应该如何使用线程池呢?
一、使用场景介绍
当遇到重复性、且互相无依赖的任务的时候,可以用多线程来实现。
比如:上传多个文件、解析多个文件等等,这些行为相互之间不会影响(即一个任务不依赖与另一个任务的执行成功与否,也不依赖于其执行的结果)。
如果任务执行相互依赖,那么可以用异步去实现。
二、代码实战
下面用一个简单的例子来熟悉下线程池的使用。
1、线程池工厂类-用于创建合适的线程池:
首先创建一个FILE_EXECUTOR,用于创建多线程读取文件的线程池。
(注意:在实际项目开发中,要注意线程池隔离,即不同作用的线程池需要单独定义,我这里有个读取文件任务的线程池,定义为FILE_EXECUTOR;若有上传文件的,可以再定义一个UPLOAD_EXECUTOR)。
不同线程池因任务类型的不同,参数设置也不同;若不注意线程隔离,多个任务共用同一个线程池,很可能会因为某个任务执行失败、线程池停止,导致别的任务也停止执行。
线程池工厂类如下:
/**
* Created by wanggenshen
* Date: on 2018/11/3 16:25.
* Description: 线程池工厂类
*/
public final class ThreadPoolFactory {
private static int CORE_POOL_SIZE = 5;
private static int MAX_POOL_SIZE = 10;
private static int KEEPALIVE_SECONDS = 120;
private static int QUEUE_SIZE = 120;
// 创建一个名为FILE_EXECUTOR的线程池,用于多线程读取文件字数
public static final ExecutorService FILE_EXECUTOR;
public ThreadPoolFactory() {
}
static {
// 线程池创建
FILE_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAX_POOL_SIZE,
(long)KEEPALIVE_SECONDS,
TimeUnit.SECONDS,
new ArrayBlockingQueue(QUEUE_SIZE),
new ThreadFactoryBuilder().setNameFormat("thread-pool-%d").build(),
new ThreadPoolFactory.MyRejectedExecutionHandler());
}
/**
* 自定义Reject策略
*/
static class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
2、写一个字数统计的工具
为方便演示,字数统计的功能也用伪代码来表示。
fileMap用来存放文件(key-文件序号;value-文件对应字数);
为模拟演示效果,在读取文件的时候,用Thread.sleep(1000)来模拟读取文件花费的时间;
/**
* Created by wanggenshen
* Date: on 2018/11/3 16:51.
* Description: 字数统计工具
*/
public class WordsCountUtil {
// 模拟文件 ,key——文件序号;value:文件字数
private static final Map<Integer, Long> fileMap = Maps.newHashMap();
static {
fileMap.put(1, 1000L);
fileMap.put(2, 1000L);
fileMap.put(3, 1000L);
fileMap.put(4, 1000L);
fileMap.put(5, 1000L);
fileMap.put(6, 1000L);
fileMap.put(7, 1000L);
fileMap.put(8, 1000L);
fileMap.put(9, 1000L);
fileMap.put(10, 1000L);
}
/**
* 字数统计
* @param fileId
* @return
*/
public static long countWords (int fileId) {
try {
// 模拟读取文件字数执行的时间,每个文件执行时间为1s
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return fileMap.get(fileId);
}
}
3、测试
(1)单线程操作
模拟有10个文件,每个文件读取时间花费1s,所以使用单线程总共花费时间为10s。
public class Test {
private static final ExecutorService poolExecutor = ThreadPoolFactory.FILE_EXECUTOR;
// 单线程
public static void singleThreadTest (List<Integer> fileIds) {
long startTime = System.currentTimeMillis();
for (Integer fileId : fileIds) {
System.out.println("文件字数:" + WordsCountUtil.countWords(fileId));
}
long endTime = System.currentTimeMillis();
System.out.println("单线程花费的时间:" + (endTime - startTime) / 1000 + "秒");
}
public static void main(String[] args) throws InterruptedException {
List<Integer> fileIds = Lists.newArrayList();
fileIds.add(1);
fileIds.add(2);
fileIds.add(3);
fileIds.add(4);
fileIds.add(5);
fileIds.add(6);
fileIds.add(7);
fileIds.add(8);
fileIds.add(9);
fileIds.add(10);
singleThreadTest(fileIds);
}
}
测试时间为:
(2)多线程测试
public class Test {
private static final ExecutorService poolExecutor = ThreadPoolFactory.FILE_EXECUTOR;
// 多线程测试
public static void multiThreadTest (List<Integer> fileIds) throws InterruptedException {
long startTime = System.currentTimeMillis();
List<List<Integer>> subList = Lists.partition(fileIds, 5);
for (List<Integer> list : subList) {
poolExecutor.submit(new Runnable() {
@Override
public void run() {
processData(list);
}
});
}
poolExecutor.shutdown();
// 设置等待时间,监测ExecutorService是否已经关闭;若关闭则返回true,否则返回false
while (!poolExecutor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
System.out.println("Thead is working ");
}
long endTime = System.currentTimeMillis();
System.out.println("多线程花费的时间:" + (endTime - startTime) / 1000 + "秒");
}
public static void processData (List<Integer> fileIds) {
for (Integer fileId : fileIds) {
System.out.println("文件字数:" + WordsCountUtil.countWords(fileId));
}
}
public static void main(String[] args) throws InterruptedException {
List<Integer> fileIds = Lists.newArrayList();
fileIds.add(1);
fileIds.add(2);
fileIds.add(3);
fileIds.add(4);
fileIds.add(5);
fileIds.add(6);
fileIds.add(7);
fileIds.add(8);
fileIds.add(9);
fileIds.add(10);
multiThreadTest(fileIds);
}
}
多线程测试过程中,文件数不变为10。
当我们用Lists.partition(fileIds, 5);
将10个fileIds分成2个小的list并当做任务传入processData(), 线程池会开启2个线程去执行这2个任务。
所以花费时间为 10 /2 = 5s
线程池创建线程数量的大小是如何确定的呢?当我们用Lists.partition(fileIds, 2);
将10个fileIds分成5个小的list并当做任务传入入processData(),线程池会开启5个线程去执行这个5个任务。
所以花费时间为10/5=2s
如果我们用Lists.partition(fileIds, 1);
将10个fileIds分成1个小的list并当做任务传入入processData(),最后发现执行的时间为10s(线程池只会创建一个线程去执行),与单线程执行无异,所以需要我们设置合理的任务数。
实际执行的时间跟线程池数量、任务执行情况都有影响,所以在实际开发中需要我们设置合理的线程池数量,同时设置合理的任务数。