从Spring Writer启动Runnable批处理分区步骤
问题描述:
我有一个Spring批处理作业,包含分区步骤和分区步骤正在处理块。从Spring Writer启动Runnable批处理分区步骤
我可以从方法public void write(List<? extends VO> itemsToWrite)
进一步启动新线程(执行Runnable
)吗?
基本上,笔者写到这里使用Lucene索引和自作家有chunk-size
的List
项目,我认为除以List
成段,每段传递到一个新的Runnable
。
这是一个很好的方法吗?
我编写了一个示例,它工作大部分时间,但卡住了几次。
有什么我需要担心的吗?或者有没有什么内置的春季批次来实现这一目标?
我不想写单个线程发生的整个块。我希望进一步划分大块。
Lucene的IndexWriter
是线程安全的一个方法是上市here
示例代码 - 作家将获取我打开从线程池中的线程项目的List
?即使我等待游泳池终止为一个块,是否会有任何顾虑?
@Override
public void write(List<? extends IndexerInputVO> inputItems) throws Exception {
int docsPerThread = Constants.NUMBER_OF_DOCS_PER_INDEX_WRITER_THREADS;
int docSize = inputItems.size();
int remainder = docSize%docsPerThread;
int poolSize = docSize/docsPerThread;
ExecutorService executor = Executors.newFixedThreadPool(poolSize+1);
int fromIndex=0;
int toIndex = docsPerThread;
if(docSize < docsPerThread){
executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems));
}else{
for(int i=1;i<=poolSize;i++){
executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems.subList(fromIndex, toIndex)));
fromIndex+=docsPerThread;
toIndex+=docsPerThread;
}
if(remainder != 0){
toIndex=docSize;
executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems.subList(fromIndex, toIndex)));
}
}
executor.shutdown();
while(executor.isTerminated()){
;
}
答
我不确定在作者中启动新线程是个好主意。 这些线程超出了Spring批处理框架的范围,因此您需要为上述实现关闭和取消策略。如果一个段的处理失败,则可能导致整个队列失败。
作为替代方法,我可以建议您将自定义的作家列表从作家推荐到下一步,如官方文档中所述passingDataToFutureSteps