流水线中的Java
为实现Runnable的每个管道组件创建一个类。给每个组件一个ConcurrentLinkedQueue来保存要处理的数据;每个组件都会在无限循环(在其run()方法)中轮询该队列,并在数据处理完成后对其进行处理。每个前面的组件都会将其输出添加到下一个组件的队列中。现在将每个可运行的代码分配给一个线程,启动线程并开始将数据提供给第一个组件的队列。
如果一个组件发现它的队列是空的,那么你可能想让它睡半秒钟左右。
您可能还想为每个将在run()方法中跳出无限循环的组件添加一个cancel()方法。
public class Decode implements Runnable {
private boolean cancel = false;
private ConcurrentLinkedQueue<Data> queue = new ConcurrentLinkedQueue<>();
private FetchOperands nextComponent;
public void run() {
while(!cancel) {
Data data = queue.poll();
if(data != null) {
...
nextComponent.enqueue(data);
} else (Thread.sleep(500);
}
}
public void enqueue(Data data) {
queue.offer(data);
}
public void cancel() {
cancel = true;
}
public void setFetchOperands(FetchOperands nextComponent) {
this.nextComponent = nextComponent;
}
}
public class Main implements Runnable {
public void run() {
Decode decode = new Decode();
FetchOperands fetchOperands = new FetchOperands();
decode.setFetchOperands(fetchOperands);
Thread t1 = new Thread(decode);
Thread t2 = new Thread(fetchOperands);
t1.start();
t2.start();
t1.join();
t2.join();
}
}
非常感谢你,我会尝试这个。你的例子中的队列,在decode类中声明,它被设置为private.Will它对其他类可见轮询数据(我是否有意义)? – user2262755 2013-04-09 17:10:36
队列对其他对象是不可见的,因为您只希望其他对象能够将数据添加到队列中,而不是从队列中删除数据。前面的组件,在这种情况下是FetchOperation组件,应该有一个对Decode组件的引用,并且应该调用Decode组件的enqueue方法以便沿着管道传递数据。 – 2013-04-09 17:12:20
管道模式是在将问题分为较小 可重复使用的代码组件有帮助的。这是一种简单但功能强大的结构 模式,可将复杂的逻辑组织成更小的可重用组件,可独立添加/删除/修改。
我不知道如何使用它。无论如何谢谢你的回复。 – user2262755 2013-04-09 17:11:36
这不是流水线的意思。 – delnan 2013-04-09 16:43:37