如何处理Java中的多个流?

问题描述:

我正在尝试运行一个进程并使用其输入,输出和错误流进行操作。最显而易见的方法做,这是使用类似select(),但我能找到在Java中,做的唯一的事情是Selector.select(),这需要一个Channel。这似乎并不可能得到来自InputStreamOutputStreamFileStreamgetChannel()方法,但是,这并不在这里帮助)如何处理Java中的多个流?

所以一个Channel,而不是我写了一些代码来查询所有流:

while(!out_eof || !err_eof) 
{ 
    while(out_str.available()) 
    { 
     if((bytes = out_str.read(buf)) != -1) 
     { 
      // Do something with output stream 
     } 
     else 
      out_eof = true; 
    } 
    while(err_str.available()) 
    { 
     if((bytes = err_str.read(buf)) != -1) 
     { 
      // Do something with error stream 
     } 
     else 
      err_eof = true; 
    } 
    sleep(100); 
} 

它的工作原理,但它永远不会终止。当一个流到达文件的末尾,available()返回零所以read()不叫,我们从来没有得到回报-1将指示EOF。

一个解决方案是,以检测EOF无阻塞方式。我无法在任何地方看到文档。或者,有没有更好的方式来做我想做的事情?

我看到这个问题在这里: link text ,虽然它并不完全做我想做的,我也许可以使用这个想法,产卵单独的线程为每个数据流,为特定的问题,我现在有。但当然,这不是唯一的方法吗?当然,必须有一种方法可以从多个流中读取,而无需为每个流使用线程?

正如您所说,解决方案outlined in this Answer是从Process读取stdout和stderr的传统方式。每个线程都是一条线路,尽管它有点烦人。

事实上,你将不得不去产卵要监视每个流线程的路线。如果你的用例允许把有问题的进程的stdout和stderr结合起来,你只需要一个线程,否则需要两个线程。

我花了相当一段时间在我们的一个项目中得到正确的结果,我必须启动一个外部过程,接受它的输出并对它做某些事情,同时寻找错误和进程终止,能够在Java应用程序的用户取消操作时终止它。

我创建了一个相当简单的类来封装鉴赏部分,其run()方法看起来是这样的:

public void run() { 
    BufferedReader tStreamReader = null; 
    try { 
     while (externalCommand == null && !shouldHalt) { 
      logger.warning("ExtProcMonitor(" 
          + (watchStdErr ? "err" : "out") 
          + ") Sleeping until external command is found"); 
      Thread.sleep(500); 
     } 
     if (externalCommand == null) { 
      return; 
     } 
     tStreamReader = 
       new BufferedReader(new InputStreamReader(watchStdErr ? externalCommand.getErrorStream() 
         : externalCommand.getInputStream())); 
     String tLine; 
     while ((tLine = tStreamReader.readLine()) != null) { 
      logger.severe(tLine); 
      if (filter != null) { 
       if (filter.matches(tLine)) { 
        informFilterListeners(tLine); 
        return; 
       } 
      } 
     } 
    } catch (IOException e) { 
     logger.logExceptionMessage(e, "IOException stderr"); 
    } catch (InterruptedException e) { 
     logger.logExceptionMessage(e, "InterruptedException waiting for external process"); 
    } finally { 
     if (tStreamReader != null) { 
      try { 
       tStreamReader.close(); 
      } catch (IOException e) { 
       // ignore 
      } 
     } 
    } 
} 

在主叫方面,它看起来是这样的:

Thread tExtMonitorThread = new Thread(new Runnable() { 

     public void run() { 
      try { 
       while (externalCommand == null) { 
        getLogger().warning("Monitor: Sleeping until external command is found"); 
        Thread.sleep(500); 
        if (isStopRequested()) { 
         getLogger() 
           .warning("Terminating external process on user request"); 
         if (externalCommand != null) { 
          externalCommand.destroy(); 
         } 
         return; 
        } 
       } 
       int tReturnCode = externalCommand.waitFor(); 
       getLogger().warning("External command exited with code " + tReturnCode); 
      } catch (InterruptedException e) { 
       getLogger().logExceptionMessage(e, "Interrupted while waiting for external command to exit"); 
      } 
     } 
    }, "ExtCommandWaiter"); 

    ExternalProcessOutputHandlerThread tExtErrThread = 
      new ExternalProcessOutputHandlerThread("ExtCommandStdErr", getLogger(), true); 
    ExternalProcessOutputHandlerThread tExtOutThread = 
      new ExternalProcessOutputHandlerThread("ExtCommandStdOut", getLogger(), true); 
    tExtMonitorThread.start(); 
    tExtOutThread.start(); 
    tExtErrThread.start(); 
    tExtErrThread.setFilter(new FilterFunctor() { 

     public boolean matches(Object o) { 
      String tLine = (String)o; 
      return tLine.indexOf("Error") > -1; 
     } 
    }); 

    FilterListener tListener = new FilterListener() { 
     private boolean abortFlag = false; 

     public boolean shouldAbort() { 
      return abortFlag; 
     } 

     public void matched(String aLine) { 
      abortFlag = abortFlag || (aLine.indexOf("Error") > -1); 
     } 

    }; 

    tExtErrThread.addFilterListener(tListener); 
    externalCommand = new ProcessBuilder(aCommand).start(); 
    tExtErrThread.setProcess(externalCommand); 
    try { 
     tExtMonitorThread.join(); 
     tExtErrThread.join(); 
     tExtOutThread.join(); 
    } catch (InterruptedException e) { 
     // when this happens try to bring the external process down 
     getLogger().severe("Aborted because auf InterruptedException."); 
     getLogger().severe("Killing external command..."); 
     externalCommand.destroy(); 
     getLogger().severe("External command killed."); 
     externalCommand = null; 
     return -42; 
    } 
    int tRetVal = tListener.shouldAbort() ? -44 : externalCommand.exitValue(); 

    externalCommand = null; 
    try { 
     getLogger().warning("command exit code: " + tRetVal); 
    } catch (IllegalThreadStateException ex) { 
     getLogger().warning("command exit code: unknown"); 
    } 
    return tRetVal; 

不幸的是我不必为自给自足的可运行示例,但也许这有助于。 如果我不得不再次这样做,我会再看看使用Thread.interrupt()方法而不是自制的停止标志(头脑声明它不稳定!),但我留下了另一次。 :)