单元测试必须手动中断的异步计算
我有一个类可以异步记录eyetracking数据。有记录过程的方法有start
和stop
。数据收集在一个集合中,只有在记录线程完成其工作时才能访问集合。它基本上封装了所有的线程和同步,所以我的库的用户不必这样做。单元测试必须手动中断的异步计算
全副缩短代码(仿制药和错误处理省略):
public class Recorder {
private Collection accumulatorCollection;
private Thread recordingThread;
private class RecordingRunnable implements Runnable {
...
public void run() {
while(!Thread.currentThread().isInterrupted()) {
// fetch data and collect it in the accumulator
synchronized(acc) { acc.add(Eyetracker.getData()) }
}
}
}
public void start() {
accumulatorCollection = new Collection();
recordingThread = new Thread(new RecordingRunnable(accumulatorCollection));
recordingThread.start();
}
public void stop() {
recordingThread.interrupt();
}
public void getData() {
try {
recordingThread.join(2000);
if(recordingThread.isAlive()) { throw Exception(); }
}
catch(InterruptedException e) { ... }
synchronized(accumulatorCollection) { return accumulatorCollection; }
}
}
的用法很简单:
recorder.start();
...
recorder.stop();
Collection data = recorder.getData();
我对整个事情的问题是如何对其进行测试。目前,我做这样的:
recorder.start();
Thread.sleep(50);
recorder.stop();
Collection data = recorder.getData();
assert(stuff);
这工作,但它具有不确定性,并减慢测试套件相当多的(我画这些测试,集成测试,所以他们必须单独运行绕过这个问题)。
有没有更好的方法?
使用CountDownLatch有更好的方法。
测试的不确定性部分来自两个变量在时间茎你不占:
- 创建和启动一个线程需要时间和线程可能没有开始执行的可运行时
Thread.start()
回报(runnable将被执行,但可能稍后)。 - 停止/中断将打断Runnable中的while循环,但不会立即执行,可能稍后。
这是CountDownLatch
进来的地方:它为您提供有关另一个线程执行的位置的精确信息。例如。让第一个线程等待锁存器,而第二个线程作为可运行的最后一条语句“倒计时”,现在第一个线程知道runnable已完成。 CountDownLatch
也充当同步器:无论第二个线程写入内存,现在都可以被第一个线程读取。
而不是使用中断,你也可以使用一个volatile
布尔值。读取volatile
变量的任何线程都可以保证看到任何其他线程设置的最后一个值。
一个CountDownLatch
也可以给出一个超时,其对可挂测试是非常有用的:如果你要等待很长时间,则可以中止整个测试(例如关机执行人,中断线程),并抛出一个AssertionError
。在下面的代码中,我重新使用了超时来等待收集一定数量的数据,而不是“睡眠”。
作为优化,使用Executor(ThreadPool)而不是创建和启动线程。后者是相对昂贵的,使用Executor可以真正有所作为。
在更新的代码下方,我将它作为应用程序运行(main
方法)。(编辑28/02/17:检查maxCollect> 0 while循环)
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class Recorder {
private final ExecutorService executor;
private Thread recordingThread;
private volatile boolean stopRecording;
private CountDownLatch finishedRecording;
private Collection<Object> eyeData;
private int maxCollect;
private final AtomicBoolean started = new AtomicBoolean();
private final AtomicBoolean stopped = new AtomicBoolean();
public Recorder() {
this(null);
}
public Recorder(ExecutorService executor) {
this.executor = executor;
}
public Recorder maxCollect(int max) { maxCollect = max; return this; }
private class RecordingRunnable implements Runnable {
@Override public void run() {
try {
int collected = 0;
while (!stopRecording) {
eyeData.add(EyeTracker.getData());
if (maxCollect > 0 && ++collected >= maxCollect) {
stopRecording = true;
}
}
} finally {
finishedRecording.countDown();
}
}
}
public Recorder start() {
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("already started");
}
stopRecording = false;
finishedRecording = new CountDownLatch(1);
eyeData = new ArrayList<Object>();
// the RecordingRunnable created below will see the values assigned above ('happens before relationship')
if (executor == null) {
recordingThread = new Thread(new RecordingRunnable());
recordingThread.start();
} else {
executor.execute(new RecordingRunnable());
}
return this;
}
public Collection<Object> getData(long timeout, TimeUnit tunit) {
if (started.get() == false) {
throw new IllegalStateException("start first");
}
if (!stopped.compareAndSet(false, true)) {
throw new IllegalStateException("data already fetched");
}
if (maxCollect <= 0) {
stopRecording = true;
}
boolean recordingStopped = false;
try {
// this establishes a 'happens before relationship'
// all updates to eyeData are now visible in this thread.
recordingStopped = finishedRecording.await(timeout, tunit);
} catch(InterruptedException e) {
throw new RuntimeException("interrupted", e);
} finally {
stopRecording = true;
}
// if recording did not stop, do not return the eyeData (could stil be modified by recording-runnable).
if (!recordingStopped) {
throw new RuntimeException("recording");
}
// only when everything is OK this recorder instance can be re-used
started.set(false);
stopped.set(false);
return eyeData;
}
public static class EyeTracker {
public static Object getData() {
try { Thread.sleep(1); } catch (Exception ignored) {}
return new Object();
}
}
public static void main(String[] args) {
System.out.println("Starting.");
ExecutorService exe = Executors.newSingleThreadExecutor();
try {
Recorder r = new Recorder(exe).maxCollect(50).start();
int dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
System.out.println("Collected " + dsize);
r.maxCollect(100).start();
dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
System.out.println("Collected " + dsize);
r.maxCollect(0).start();
Thread.sleep(100);
dsize = r.getData(2000, TimeUnit.MILLISECONDS).size();
System.out.println("Collected " + dsize);
} catch (Exception e) {
e.printStackTrace();
} finally {
exe.shutdownNow();
System.out.println("Done.");
}
}
}
编码愉快:)
比环境屏障快吗? –
@huseyintugrulbuyukisik没有更快或更慢我认为,但我发现CountDownLatch更容易理解为CyclicBarrier。一旦你掌握了它,CyclicBarrier也是一个很好的工具。 – vanOekel
你睡着了,所以你可以收集数据,或以确保过程已经开始? – shmosel
我这样做来收集数据,否则我只会得到一个单一的数据包左右。 –
看起来您正在重塑[Future](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html),您可以通过[将任务提交给一个ExecutorService](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#submit(java.lang.Runnable))。 –