如何处理Java中的多个流?
我正在尝试运行一个进程并使用其输入,输出和错误流进行操作。最显而易见的方法做,这是使用类似select()
,但我能找到在Java中,做的唯一的事情是Selector.select()
,这需要一个Channel
。这似乎并不可能得到来自InputStream
或OutputStream
(FileStream
有getChannel()
方法,但是,这并不在这里帮助)如何处理Java中的多个流?
所以一个Channel
,而不是我写了一些代码来查询所有流:
while(!out_eof || !err_eof)
{
while(out_str.available())
{
if((bytes = out_str.read(buf)) != -1)
{
// Do something with output stream
}
else
out_eof = true;
}
while(err_str.available())
{
if((bytes = err_str.read(buf)) != -1)
{
// Do something with error stream
}
else
err_eof = true;
}
sleep(100);
}
它的工作原理,但它永远不会终止。当一个流到达文件的末尾,available()
返回零所以read()
不叫,我们从来没有得到回报-1将指示EOF。
一个解决方案是,以检测EOF无阻塞方式。我无法在任何地方看到文档。或者,有没有更好的方式来做我想做的事情?
我看到这个问题在这里: link text ,虽然它并不完全做我想做的,我也许可以使用这个想法,产卵单独的线程为每个数据流,为特定的问题,我现在有。但当然,这不是唯一的方法吗?当然,必须有一种方法可以从多个流中读取,而无需为每个流使用线程?
事实上,你将不得不去产卵要监视每个流线程的路线。如果你的用例允许把有问题的进程的stdout和stderr结合起来,你只需要一个线程,否则需要两个线程。
我花了相当一段时间在我们的一个项目中得到正确的结果,我必须启动一个外部过程,接受它的输出并对它做某些事情,同时寻找错误和进程终止,能够在Java应用程序的用户取消操作时终止它。
我创建了一个相当简单的类来封装鉴赏部分,其run()方法看起来是这样的:
public void run() {
BufferedReader tStreamReader = null;
try {
while (externalCommand == null && !shouldHalt) {
logger.warning("ExtProcMonitor("
+ (watchStdErr ? "err" : "out")
+ ") Sleeping until external command is found");
Thread.sleep(500);
}
if (externalCommand == null) {
return;
}
tStreamReader =
new BufferedReader(new InputStreamReader(watchStdErr ? externalCommand.getErrorStream()
: externalCommand.getInputStream()));
String tLine;
while ((tLine = tStreamReader.readLine()) != null) {
logger.severe(tLine);
if (filter != null) {
if (filter.matches(tLine)) {
informFilterListeners(tLine);
return;
}
}
}
} catch (IOException e) {
logger.logExceptionMessage(e, "IOException stderr");
} catch (InterruptedException e) {
logger.logExceptionMessage(e, "InterruptedException waiting for external process");
} finally {
if (tStreamReader != null) {
try {
tStreamReader.close();
} catch (IOException e) {
// ignore
}
}
}
}
在主叫方面,它看起来是这样的:
Thread tExtMonitorThread = new Thread(new Runnable() {
public void run() {
try {
while (externalCommand == null) {
getLogger().warning("Monitor: Sleeping until external command is found");
Thread.sleep(500);
if (isStopRequested()) {
getLogger()
.warning("Terminating external process on user request");
if (externalCommand != null) {
externalCommand.destroy();
}
return;
}
}
int tReturnCode = externalCommand.waitFor();
getLogger().warning("External command exited with code " + tReturnCode);
} catch (InterruptedException e) {
getLogger().logExceptionMessage(e, "Interrupted while waiting for external command to exit");
}
}
}, "ExtCommandWaiter");
ExternalProcessOutputHandlerThread tExtErrThread =
new ExternalProcessOutputHandlerThread("ExtCommandStdErr", getLogger(), true);
ExternalProcessOutputHandlerThread tExtOutThread =
new ExternalProcessOutputHandlerThread("ExtCommandStdOut", getLogger(), true);
tExtMonitorThread.start();
tExtOutThread.start();
tExtErrThread.start();
tExtErrThread.setFilter(new FilterFunctor() {
public boolean matches(Object o) {
String tLine = (String)o;
return tLine.indexOf("Error") > -1;
}
});
FilterListener tListener = new FilterListener() {
private boolean abortFlag = false;
public boolean shouldAbort() {
return abortFlag;
}
public void matched(String aLine) {
abortFlag = abortFlag || (aLine.indexOf("Error") > -1);
}
};
tExtErrThread.addFilterListener(tListener);
externalCommand = new ProcessBuilder(aCommand).start();
tExtErrThread.setProcess(externalCommand);
try {
tExtMonitorThread.join();
tExtErrThread.join();
tExtOutThread.join();
} catch (InterruptedException e) {
// when this happens try to bring the external process down
getLogger().severe("Aborted because auf InterruptedException.");
getLogger().severe("Killing external command...");
externalCommand.destroy();
getLogger().severe("External command killed.");
externalCommand = null;
return -42;
}
int tRetVal = tListener.shouldAbort() ? -44 : externalCommand.exitValue();
externalCommand = null;
try {
getLogger().warning("command exit code: " + tRetVal);
} catch (IllegalThreadStateException ex) {
getLogger().warning("command exit code: unknown");
}
return tRetVal;
不幸的是我不必为自给自足的可运行示例,但也许这有助于。 如果我不得不再次这样做,我会再看看使用Thread.interrupt()方法而不是自制的停止标志(头脑声明它不稳定!),但我留下了另一次。 :)