运行阶段并发任务
运行阶段并发任务
使用Phaser类执行阶段并发任务,是Java并发API提供的最复杂和强大的一个功能,这种机制在分阶段执行并发任务时非常有用。Phaser类提供了在每个阶段结尾同步线程的功能,因此在所有线程已经结束第一阶段任务之前,没有线程会在第二阶段开始执行。
与其它同步功能一样,需要在参与到同步操作中的任务数作为参数初始化Phaser类,但是可已增加或减少,动态地修改此参数。
在本节中,学习如何使用Phaser类同步三个并发任务,在三个不同的目录及其子目录下,检索24小时之内修改过的后缀名为.log的文件。分成三个阶段执行任务:
- 获得指定目录及其子目录下的后缀名为.log的文件。
- 过滤在1章创建的列表,删除掉24小时之前修改的文件。
- 输出结果到控制台。
在阶段1、2的结尾,判断列表是否还有其它元素,如果没有,线程则结束运行,从Phaser类中清除。
准备工作
本范例通过Eclipse开发工具实现。如果使用诸如NetBeans的开发工具,打开并创建一个新的Java项目。
实现过程
通过如下步骤完成范例:
-
创建名为FileSearch的类,并指定其实现Runnable接口。这个类实现在一个目录及其子目录中,检索24小时内修改的特定后缀名文件:
public class FileSearch implements Runnable {
-
定义私有字符串属性,存储检索开始的初始目录路径:
private final String initPath;
-
定义私有字符串属性,存储将要检索的文件后缀名:
private final String fileExtension;
-
定义私有List属性,存储寻找到的符合标准的文件完整路径:
private List<String> results;
-
最后,定义私有Phaser属性,控制不同任务阶段的同步:
private Phaser phaser;
-
接下来,实现类构造函数,初始化类属性。参数包括初始目录的完整路径,文件后缀名和phaser:
public FileSearch(String initPath, String fileExtension, Phaser phaser) { this.initPath = initPath; this.fileExtension = fileExtension; this.phaser = phaser; this.results = new ArrayList<>(); }
-
现在,实现run()方法用到的一些附属类。第一个是directoryProcess()方法, 接收File对象为参数,用来处理所有文件和子目录。在每个目录中,通过传递目录路径为参数进行一次递归调用。对于每个文件,调用fileProcess()方法:
private void directoryProcess(File file) { File list[] = file.listFiles(); if ( list != null ){ for( int i =0 ; i < list.length ; i ++) { if (list[i].isDirectory()) { directoryProcess(list[i]); } else { fileProcess(list[i]); } } } }
-
然后实现fileProcess()方法,传递文件对象为参数,判断此文件后缀名是否符合检索要求,如果符合,此方法将文件绝对路径存到结果列表中:
private void fileProcess(File file) { if(file.getName().endsWith(fileExtension)) { results.add(file.getAbsolutePath()); } }
-
现在实现filterResults()方法,由于在第一阶段不用传任何参数以及过滤文件列表,所以只是删除24小时之前修改的文件。首先,创建一个空列表,得到当前时间:
private void filterResults() { List<String> newResults = new ArrayList<>(); long actualDate = new Date().getTime();
-
然后,遍历结果列表所有元素,用结果列表中每个路径创建File对象,得到最后修改时间:
for (int i = 0 ; i < results.size() ; i ++) { File file = new File(results.get(i)); long fileDate = file.lastModified();
-
将这个时间与当前时间比对,如果时间差小于1天,将完整路径加载到新的结果列表中:
if((actualDate - fileDate) < TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)) { newResults.add(results.get(i)); } }
-
最后,将旧的结果列表赋值到新的列表上:
results =newResults; }
-
接下来,实现checkResults()方法,将在第一、二阶段结尾调用来判断结果列表是否为空,此方法不需要传参:
private boolean checkResults() {
-
首先,检查结果列表长度,如果是0,输出相应信息到控制台。然后,调用Phaser对象的arrvieAndDeregister()方法通知这个线程已经完成当前阶段并且离开阶段操作:
if (results.isEmpty()) { System.out.printf("%s : Phase %d : 0 results.\n", Thread.currentThread().getName(), phaser.getPhase()); System.out.printf("%s : Phase %d : End.\n", Thread.currentThread().getName(), phaser.getPhase()); phaser.arriveAndDeregister(); return false;
-
如果结果列表非空,输出相应信息到控制台。然后,调用Phaser对象的arrvieAndDeregister()方法通知这个线程已经完成当前阶段并且将被阻塞,直到所有在阶段操作里的参与的线程都完成当前阶段:
} else { System.out.printf("%s : Phase %d : %d results.\n", Thread.currentThread().getName(), phaser.getPhase(), results.size()); phaser.arriveAndAwaitAdvance(); return true; } }
-
最后一个附属类是showInfo()翻噶发,输出结果列表元素到控制台:
private void showInfo() { for ( int i = 0 ; i < results.size() ; i ++) { File file = new File(results.get(i)); System.out.printf("%s : %s\n", Thread.currentThread().getName(), file.getAbsolutePath()); } phaser.arriveAndAwaitAdvance(); }
-
现在开始实现执行操作的run()方法,使用前面描述的附属类。同时实现控制阶段之间变化的Phaser对象。首先,调用Phaser对象的arrvieAndAwaitAdvance()方法,检索直到所有线程创建完成后才能开始:
@Override public void run() { phaser.arriveAndAwaitAdvance();
-
然后,输出启动检索任务的信息到控制台:
System.out.printf("%s : Starting.\n", Thread.currentThread().getName());
-
检查initPath属性存储的目录名,使用directoryProcess()方法在此目录及其子目录中寻找特定后缀名的文件:
File file = new File(initPath); if(file.isDirectory()) { directoryProcess(file); }
-
使用checkResults()方法判断是否还有其它结果,如果无,使用return关键字结束线程执行:
if( !checkResults()) { return; }
-
使用filterResults()方法过滤结果列表:
filterResults();
-
再次使用checkResults()方法判断是否还有其它结果,如果无,使用return关键字结束线程执行:
if( !checkResults()) { return; }
-
使用showInfo()方法输出最终的结果列表到控制台,撤销此线程,输出线程终止的信息:
showInfo(); phaser.arriveAndDeregister(); System.out.printf("%s : Work completed.\n", Thread.currentThread().getName());
-
现在,实现范例的主类,创建一个包含main()方法的Main类:
public class Main { public static void main(String[] args) {
-
创建Phaser对象,包含三个参与者:
Phaser phaser = new Phaser(3);
-
分别用不同的初始目录路径,创建三个FileSearch对象,检索后缀名为.log的文件:
FileSearch system = new FileSearch("C:\\Windows", "log", phaser); FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser); FileSearch documents = new FileSearch("C:\\Documents And Settings", "log", phaser);
-
创建和启动执行第一个FileSearch对象的线程:
Thread systemThread = new Thread(system, "System"); systemThread.start();
-
创建和启动执行第二个FileSearch对象的线程:
Thread appsThread = new Thread(apps, "Apps"); appsThread.start();
-
创建和启动执行第三个FileSearch对象的线程:
Thread documentsThread = new Thread(documents, "Documents"); documentsThread.start();
-
等待三个线程终止:
try { systemThread.join(); appsThread.join(); documentsThread.join(); } catch (InterruptedException e) { e.printStackTrace(); }
-
使用isFinalized()方法记录Phaser对象的最终标志值:
System.out.printf("Terminated : " + phaser.isTerminated() + "\n");
工作原理
首先创建在每个阶段结尾控制线程同步的Phaser对象,Phaser构造函数参数是参与者数量。在本范例中,Phaser有三个参与者。这个数字是Phaser的线程执行数量,在Phaser改变阶段并且唤醒已休眠的线程之前,这些线程需要执行arrvieAndAwaitAdvance()方法。
一旦Phaser已被创建,加载使用三个不同的FileSearch对象执行的三个线程。
本范例中,使用的是Windows操作系统的路径,如果你在其它操作系统上开发,修改成相对应环境的路径,例如/var/log,或类似的。
FileSearch对象中的run()方法第一个指令是调用Phaser对象的arrvieAndAwaitAdvance()方法。如前所述,Phaser直到想要同步的线程数。当一个线程调用此方法时,Phaser减少需要当前阶段终止的线程数并且设置此线程休眠,直到所有线程完成当前阶段。在run()方法开始调用此方法确保所有FileSearch线程创建完成后,才开始执行。
在阶段一、二结尾,判断两个是已经生成结果即结果列表有数据,还是没有生成结果即列表为空。第一种情况中,checkResults()方法调用前面提到的arrvieAndAwaitAdvance()方法。另一种情况中,如果列表为空,线程里无法继续操作列表,所以结束其运行。但是需要通知Phaser对象将减少一个参与者。因为,使用arrvieAndDeregister(),通知phaser线程已经结束当前阶段,不需要参与后续阶段的操作,所以phaser不需要等待,继续运行。
showInfo()方法实现在阶段三的结尾,调用phaser的arrvieAndAwaitAdvance()方法,用来保证所有线程在同一时间结束。当此方法结束执行时,调用phaser的arrvieAndDeregister()方法,用来撤销phaser的线程,如前所述。所以当所有线程结束时,phaser将没有参与者。
最后,main()方法等待三个线程运行完成,调用phaser的isTerminated()方法。当phaser没有参与者时,则进入所谓的终止状态,并且返回true。当撤销phaser的所有线程之后,将进入终止状态,并且在控制台中输出true。
Phaser对象包含两种状态:
- **活动:**在允许新参与者注册以及在每个阶段结尾同步时,Phaser进入此状态。在这种状态中,Phaser的功能如本节所述。Java并发API中未提及此状态。
- **终止:**默认情况下,当Phaser中的所有参与者已被撤销,也就是没有参与者时,Phaser进入此状态。进一步说,当方法onAdvance()返回true时,Phaser进入终止状态。如果重写此方法,可以改变默认属性。当Phaser处于终止状态时,同步方法arrvieAndAwaitAdvance()立即返回,不做任何同步操作。
Phaser类的一个显著特性是从相关联的方法到phaser不用控制任何异常。不像其它同步功能,休眠在phaser中的线程不响应中断事件,也不抛出InterruptedException异常。Phaser类只有一个异常,后续介绍。
下图显示本范例在控制台输出的执行结果:
首先展示前两个阶段的执行,能够看到阶段二的Apps线程因为结果列表为空而结束。当运行这个范例时,可以看到一些线程是如何在其它阶段之前执行完一个阶段,以及它们如何等待所有线程执行完一个阶段,直到继续执行其它阶段之前,
扩展学习
Phaser类还提供了阶段变化相关的方法,如下所示:
- arrive():此方法通知Phaser类,一个参与者已经完成当前阶段,但它无须等待其它继续执行的参与者。因为不与其它线程同步,所以请谨慎使用此方法。
- awaitAdvance(int phase):此方法设置当前线程休眠,直到phaser参数的所有参与者已经完成当前阶段,即传参的数量与phaser的当前阶段相同。如果传参与phaser的档期那阶段不相等,方法结束执行。
- awaitAdvanceInterruptibly(int phaser):此方法与awaitAdvance(int phase)相同,但是如果方法中正在休眠的线程被中断,会抛出InterruptedException异常。
Phaser中注册参与者
当创建Phaser对象时,需要指明phaser中有多少参与者。Phaser类有两个增加phaser参与者数量的方法,如下所示:
- register():此方法增加一个参与者到Phaser中,新的参与者将被认定未到达当前阶段。
- bulkRegister(int Parties):此方法增加特定数量的参与者到Phaser中,新的参与者将被认定未到达当前阶段。
Phaser类中提供的减少参与者数量的唯一方法是arrvieAndDeregister(),通知phaser线程已经结束当前阶段,并且不再继续阶段化操作。
强制Phaser终止
当phaser没有参与者时,则进入终止状态。Phaser类提供forceTermination()方法来改变phaser的状态,并使之进入终止状态而不考虑phaser中注册的参与者数量。这种机制在其中一个参与者出现错误情况时有用,最好的方法就是终止phaser。
当phaser处于终止状态时,awaitAdvance()和arrvieAndAwaitAdvance()方法立即返回一个负数,代替通常返回的正数。如果想直到phaser是否被终止,需要核实这些方法(awaitAdvance()和arrvieAndAwaitAdvance())的返回值,确定phaser是否已被终止。
更多关注
- 第九章“测试并发应用”中的“监控Phaser类”小节。