One Producer十个消费者文件处理与Executors.newSingleThreadExecutor()
问题描述:
我有一个LinkedBlockingQueue任意挑选的容量为10,并有1000行输入文件。据我所知,我在服务类的main
方法中有一个ExecutorService
类型变量,它使用Executors.newSingleThreadExecutor()
- 一个单独的线程调用buffer.readline(),直到文件line == null
,然后处理 - 在循环使用Executors.newSingleThreadExecutor()
- 经常线程处理行并将它们写入输出文件,直到!queue.take().equals("Stop")
。但是,在写入一些行到文件后,当我处于调试模式时,我看到队列的容量最终达到最大值(10),并且处理线程不执行queue.take()
。所有线程都处于running
状态,但进程在queue.put()
后停止。什么会导致这个问题,并且是否可以使用线程池或多个处理器变量的组合来解决,而不是使用单个变量?One Producer十个消费者文件处理与Executors.newSingleThreadExecutor()
纲要在服务main
方法的当前状态:
//app settings to get values for keys within a properties file
AppSettings appSettings = new AppSettings();
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
maxProdThreads = 1;
maxConsThreads = 10;
ExecutorService execSvc = null;
for (int i = 0; i < maxProdThreads; i++) {
execSvc = Executors.newSingleThreadExecutor();
execSvc.submit(new ReadJSONMessage(appSettings,queue));
}
for (int i = 0; i < maxConsThreads; i++) {
execSvc = Executors.newSingleThreadExecutor();
execSvc.submit(new ProcessJSONMessage(appSettings,queue));
}
读取方法的代码:
buffer = new BufferedReader(new FileReader(inputFilePath));
while((line = buffer.readLine()) != null){
line = line.trim();
queue.put(line);
}
处理和写代码:
while(!(line=queue.take()).equals("Stop")){
if(line.length() > 10)
{
try {
if(processMessage(line, outputFilePath) == true)
{
++count;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public boolean processMessage(String line, String outputFilePath){
CustomObject cO = new CustomObject();
cO.setText(line);
writeToFile1(cO,...);
writeToFile2(cO,...);
}
public void writeOutputAToFile(CustomObject cO,...){
synchronized(cO){
...
org.apache.commons.io.FileUtils.writeStringToFile(...)
}
}
public void writeOutputBToFile(CustomObject cO,...){
synchronized(cO){
...
org.apache.commons.io.FileUtils.writeStringToFile(...)
}
}
答
在处理和编写代码..确保所有资源都正确关闭..可能是资源可能不能正常关闭,因为线程继续运行,ExecutorService找不到空闲线程...
请张贴您的读写代码。我感觉到你以某种方式阻挡在那里。 –
不是你的答案,但我建议在你的循环外部放置一个'Executors.newCachedThreadPool()'并使用它。这将允许您在完成后使用一次调用('exeSvc.shutdown()')关闭所有线程。 – teppic
进行线程转储以查看线程被阻塞的位置。 – teppic