Spring批处理如何在写入数据之前处理数据列表
问题描述:
我试图从数据库读取客户端数据并将处理后的数据写入平面文件。 但是在写入数据之前我需要处理整个ItemReader
的结果。Spring批处理如何在写入数据之前处理数据列表
例如,我从数据库中读取行客户端:
public class Client {
private String id;
private String subscriptionCode;
private Boolean activated;
}
但我想算,写多少用户被激活,通过subscriptionCode分组:
public class Subscription {
private String subscriptionCode;
private Integer activatedUserCount;
}
我不知道如何执行使用ItemReader
/ItemProcessor
/ItemWriter
,你能帮助我吗?
BatchConfiguration:
@CommonsLog
@Configuration
@EnableBatchProcessing
@EnableAutoConfiguration
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Client, Client> chunk(1000)
.reader(new ListItemReader<Client>(new ArrayList<Client>() { // Just for test
{
add(Client.builder().id("1").subscriptionCode("AA").activated(true).build());
add(Client.builder().id("2").subscriptionCode("BB").activated(true).build());
add(Client.builder().id("3").subscriptionCode("AA").activated(false).build());
add(Client.builder().id("4").subscriptionCode("AA").activated(true).build());
}
}))
.processor(new ItemProcessor<Client, Client>() {
public Client process(Client item) throws Exception {
log.info(item);
return item;
}
})
.writer(new ItemWriter<Client>() {
public void write(List<? extends Client> items) throws Exception {
// Only here I can use List of Client
// How can I process this list before to fill Subscription objects ?
}
})
.build();
}
@Bean
public Job job1(Step step1) throws Exception {
return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer()).start(step1).build();
}
}
主要应用:
public class App {
public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
System.exit(SpringApplication.exit(SpringApplication.run(BatchConfiguration.class, args)));
}
}
答
我发现基于ItemProcessor
一个解决方案:
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Client, Subscription> chunk(1000)
.reader(new ListItemReader<Client>(new ArrayList<Client>() {
{
add(Client.builder().id("1").subscriptionCode("AA").activated(true).build());
add(Client.builder().id("2").subscriptionCode("BB").activated(true).build());
add(Client.builder().id("3").subscriptionCode("AA").activated(false).build());
add(Client.builder().id("4").subscriptionCode("AA").activated(true).build());
}
}))
.processor(new ItemProcessor<Client, Subscription>() {
private List<Subscription> subscriptions;
public Subscription process(Client item) throws Exception {
for (Subscription s : subscriptions) { // try to retrieve existing element
if (s.getSubscriptionCode().equals(item.getSubscriptionCode())) { // element found
if(item.getActivated()) {
s.getActivatedUserCount().incrementAndGet(); // increment user count
log.info("Incremented subscription : " + s);
}
return null; // existing element -> skip
}
}
// Create new Subscription
Subscription subscription = Subscription.builder().subscriptionCode(item.getSubscriptionCode()).activatedUserCount(new AtomicInteger(1)).build();
subscriptions.add(subscription);
log.info("New subscription : " + subscription);
return subscription;
}
@BeforeStep
public void initList() {
subscriptions = Collections.synchronizedList(new ArrayList<Subscription>());
}
@AfterStep
public void clearList() {
subscriptions.clear();
}
})
.writer(new ItemWriter<Subscription>() {
public void write(List<? extends Subscription> items) throws Exception {
log.info(items);
// do write stuff
}
})
.build();
}
但我要保持第二Subscription
列表为ItemProcessor
(我不知道,如果是线程安全的,高效的?)。你对这个解决方案有什么看法?
答
如果我从您的意见了解你需要做激活帐户的总结,对不对?
您可以为您正在处理的每个Client
创建一个Subscription
,并使用ItemWriterLister.afterWrite
将以上创建的Subscription
的项目写入数据库。
不知道我是否完全理解了这个问题,你确实有'ItemProcessor'的权利..你可以在你的个人客户端实例上做任何处理 - 除了你打算执行的处理以外? – 2014-10-11 15:12:15
我想统计有多少用户已将'activated'标志设置为'true',按subscriptionCode进行分组。所以我需要一个'客户'列表来确定我的'订阅'列表。但是使用'chunk',我只能逐行处理......而不是一个组。这个例子中的结果尝试应该是一个List of 2 subscription:'Subscription(subscriptionCode = AA,activatedUserCount = 2)'和'Subscription(subscriptionCode = BB,activatedUserCount = 1)' – Aure77 2014-10-11 15:32:11