从Spring Writer启动Runnable批处理分区步骤

问题描述:

我有一个Spring批处理作业,包含分区步骤和分区步骤正在处理块。从Spring Writer启动Runnable批处理分区步骤

我可以从方法public void write(List<? extends VO> itemsToWrite)进一步启动新线程(执行Runnable)吗?

基本上,笔者写到这里使用Lucene索引和自作家有chunk-sizeList项目,我认为除以List成段,每段传递到一个新的Runnable

这是一个很好的方法吗?

我编写了一个示例,它工作大部分时间,但卡住了几次。

有什么我需要担心的吗?或者有没有什么内置的春季批次来实现这一目标?

我不想写单个线程发生的整个块。我希望进一步划分大块。

Lucene的IndexWriter是线程安全的一个方法是上市here

示例代码 - 作家将获取我打开从线程池中的线程项目的List?即使我等待游泳池终止为一个块,是否会有任何顾虑?

@Override 
    public void write(List<? extends IndexerInputVO> inputItems) throws Exception { 


     int docsPerThread = Constants.NUMBER_OF_DOCS_PER_INDEX_WRITER_THREADS; 
     int docSize = inputItems.size(); 
     int remainder = docSize%docsPerThread; 
     int poolSize = docSize/docsPerThread; 

     ExecutorService executor = Executors.newFixedThreadPool(poolSize+1); 


     int fromIndex=0; 
     int toIndex = docsPerThread; 

     if(docSize < docsPerThread){ 
      executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems)); 
     }else{ 
      for(int i=1;i<=poolSize;i++){ 
       executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems.subList(fromIndex, toIndex))); 
       fromIndex+=docsPerThread; 
       toIndex+=docsPerThread; 
      } 

      if(remainder != 0){ 
       toIndex=docSize; 
       executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems.subList(fromIndex, toIndex))); 
      } 
     } 

     executor.shutdown(); 

     while(executor.isTerminated()){ 
      ; 
     } 

我不确定在作者中启动新线程是个好主意。 这些线程超出了Spring批处理框架的范围,因此您需要为上述实现关闭和取消策略。如果一个段的处理失败,则可能导致整个队列失败。

作为替代方法,我可以建议您将自定义的作家列表从作家推荐到下一步,如官方文档中所述passingDataToFutureSteps