Spring批处理如何在写入数据之前处理数据列表

问题描述:

我试图从数据库读取客户端数据并将处理后的数据写入平面文件。 但是在写入数据之前我需要处理整个ItemReader的结果。Spring批处理如何在写入数据之前处理数据列表

例如,我从数据库中读取行客户端:

public class Client { 
    private String id; 
    private String subscriptionCode; 
    private Boolean activated; 
} 

但我想算,写多少用户被激活,通过subscriptionCode分组:

public class Subscription { 
    private String subscriptionCode; 
    private Integer activatedUserCount; 
} 

我不知道如何执行使用ItemReader/ItemProcessor/ItemWriter,你能帮助我吗?

BatchConfiguration:

@CommonsLog 
@Configuration 
@EnableBatchProcessing 
@EnableAutoConfiguration 
public class BatchConfiguration { 

    @Autowired 
    private JobBuilderFactory jobBuilderFactory; 

    @Autowired 
    private StepBuilderFactory stepBuilderFactory; 

    @Bean 
    public Step step1() { 
     return stepBuilderFactory.get("step1") 
       .<Client, Client> chunk(1000) 
       .reader(new ListItemReader<Client>(new ArrayList<Client>() { // Just for test 
        { 
         add(Client.builder().id("1").subscriptionCode("AA").activated(true).build()); 
         add(Client.builder().id("2").subscriptionCode("BB").activated(true).build()); 
         add(Client.builder().id("3").subscriptionCode("AA").activated(false).build()); 
         add(Client.builder().id("4").subscriptionCode("AA").activated(true).build()); 
        } 
       })) 
       .processor(new ItemProcessor<Client, Client>() { 
        public Client process(Client item) throws Exception { 
         log.info(item); 
         return item; 
        } 
       }) 
       .writer(new ItemWriter<Client>() { 
        public void write(List<? extends Client> items) throws Exception { 
         // Only here I can use List of Client 
         // How can I process this list before to fill Subscription objects ? 
        } 
       }) 
       .build(); 
    } 

    @Bean 
    public Job job1(Step step1) throws Exception { 
     return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer()).start(step1).build(); 
    } 
} 

主要应用:

public class App { 
    public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException { 
     System.exit(SpringApplication.exit(SpringApplication.run(BatchConfiguration.class, args))); 
    } 
} 
+0

不知道我是否完全理解了这个问题,你确实有'ItemProcessor'的权利..你可以在你的个人客户端实例上做任何处理 - 除了你打算执行的处理以外? – 2014-10-11 15:12:15

+0

我想统计有多少用户已将'activated'标志设置为'true',按subscriptionCode进行分组。所以我需要一个'客户'列表来确定我的'订阅'列表。但是使用'chunk',我只能逐行处理......而不是一个组。这个例子中的结果尝试应该是一个List of 2 subscription:'Subscription(subscriptionCode = AA,activatedUserCount = 2)'和'Subscription(subscriptionCode = BB,activatedUserCount = 1)' – Aure77 2014-10-11 15:32:11

我发现基于ItemProcessor一个解决方案:

@Bean 
public Step step1() { 
    return stepBuilderFactory.get("step1") 
     .<Client, Subscription> chunk(1000) 
     .reader(new ListItemReader<Client>(new ArrayList<Client>() { 
     { 
      add(Client.builder().id("1").subscriptionCode("AA").activated(true).build()); 
      add(Client.builder().id("2").subscriptionCode("BB").activated(true).build()); 
      add(Client.builder().id("3").subscriptionCode("AA").activated(false).build()); 
      add(Client.builder().id("4").subscriptionCode("AA").activated(true).build()); 
     } 
     })) 
     .processor(new ItemProcessor<Client, Subscription>() { 
     private List<Subscription> subscriptions; 

     public Subscription process(Client item) throws Exception { 
      for (Subscription s : subscriptions) { // try to retrieve existing element 
      if (s.getSubscriptionCode().equals(item.getSubscriptionCode())) { // element found 
       if(item.getActivated()) { 
       s.getActivatedUserCount().incrementAndGet(); // increment user count 
       log.info("Incremented subscription : " + s); 
       }        
       return null; // existing element -> skip 
      } 
      } 
      // Create new Subscription 
      Subscription subscription = Subscription.builder().subscriptionCode(item.getSubscriptionCode()).activatedUserCount(new AtomicInteger(1)).build(); 
      subscriptions.add(subscription); 
      log.info("New subscription : " + subscription); 
      return subscription; 
     } 

     @BeforeStep 
     public void initList() { 
      subscriptions = Collections.synchronizedList(new ArrayList<Subscription>()); 
     } 

     @AfterStep 
     public void clearList() { 
      subscriptions.clear(); 
     } 
     }) 
     .writer(new ItemWriter<Subscription>() {     
     public void write(List<? extends Subscription> items) throws Exception { 
      log.info(items); 
      // do write stuff 
     }     
     }) 
     .build(); 
} 

但我要保持第二Subscription列表为ItemProcessor(我不知道,如果是线程安全的,高效的?)。你对这个解决方案有什么看法?

+0

有什么更好的想法? – Aure77 2014-11-05 20:05:41

+0

什么都没有?不知道 ? – Aure77 2014-12-05 14:33:49

如果我从您的意见了解你需要做激活帐户的总结,对不对?
您可以为您正在处理的每个Client创建一个Subscription,并使用ItemWriterLister.afterWrite将以上创建的Subscription的项目写入数据库。

+0

这些只是侦听对象的定义由大块。但是我怎么能通过'List '到我的'ItemWriter'来处理呢? 我无法声明'.Client,Subscription> chunk(1000)',因为一个'Client'不能在一个'Subscription'中转换(块处理逐项)...... – Aure77 2014-10-11 15:56:46

+0

使用监听器来写东西并不常见。此外,ItemWriter写入方法总是让我访问List a Client。但你给了我一个想法:使用'ItemProcessor'(看我的安装程序) – Aure77 2014-10-11 19:44:41