从头认识SpringBatch批处理框架--实例场景一信用卡消费对账
场景说明
个人使用信用卡消费,银行定期发送银行卡消费账单,本例将模拟银行处理个人信用卡消费对账单对账,银行需要定期地把个人消费的记录导出成csv文件,然后交给对账系统处理。
主要流程:
(从credit-card-bill-201303.csv)读取数据---->处理数据----->写数据到 outputFile文件
项目结构
项目结构说明:
- CreditBill:信用卡消费记录领域对象
- CreditBillProcessor:信用卡消费记录处理类
- jobLaunch:调用批处理作业类
-
jobLaunchTest:Junit单元测试,使用Spring提供的测试框架
- credit-card-bill-201303.csv:信用卡消费账单文件
- job-context.xml:定义作业基础信息
- job.xml:定义作业文件
- outputFile.xml:输出处理过后的信用卡消费账单文件
项目实现步骤详解:
1.准备credit-card-bill-201303.csv对账文件
这里我们使用csv格式的文件,该文件的每一行表示信用卡消费记录,中间用逗号分隔,分别表示:
银行卡账户、账户名、消费金额、消费日期、消费场所如下图所示:
2.定义领域对象:
为了与账单文件形成映射,需要新建信用卡消费记录对象 -CreditBill,主要属性:银行卡账户、账户名、消费金额、消费日期、消费场所
具体代码如下:
package com.my.domain;/** * 信用卡消费记录领域对象 * 该类主要用于在ItemReader读取文件数据之后转换成领域对象CreditBill, * 以便于ItemProcessoe和ItemWriter操作使用 * @author wbw * */public class CreditBill { /** * 银行账户 */ private String accountID; /** * 账户名 */ private String name; /** * 消费金额 */ private double amount; /** * 消费日期 */ private String date; /** * 消费场所 */ private String address; /** * @return the 银行账户 */ public String getAccountID() { return accountID; } /** * @param 银行账户 the accountID to set */ public void setAccountID(String accountID) { this.accountID = accountID; } /** * @return the 账户名 */ public String getName() { return name; } /** * @param 账户名 the name to set */ public void setName(String name) { this.name = name; } /** * @return the 消费金额 */ public double getAmount() { return amount; } /** * @param 消费金额 the amount to set */ public void setAmount(double amount) { this.amount = amount; } /** * @return the 消费日期 */ public String getDate() { return date; } /** * @param 消费日期 the date to set */ public void setDate(String date) { this.date = date; } /** * @return the 消费场所 */ public String getAddress() { return address; } /** * @param 消费场所 the address to set */ public void setAddress(String address) { this.address = address; } }
3.定义job-context.xml批处理基础信息
该配置文件主要是配置作业仓库、作业调度器、事务管理器,具体代码如下:
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd" default-autowire="byName"> <!--定义作业仓库SpringBatch提供了两种作业仓库来记录job执行期产生的信息:一种是内存,另一种是数据库 --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> </bean> <!--定义作业调度器,用来启动Job --> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository"/> </bean> <!--事务管理器,用于springbatch框架在对数据操作过程中体统事务能力--> <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/></beans>
4.定义job.xml文件
job.xml文件主要配置批处理作业Job、Step、ItemReader(读数据)、ItemProcessoe(处理数据)、 ItemWriter(写数据) 具体的配置如下图:
<?xml version="1.0" encoding="UTF-8"?><bean:beans xmlns="http://www.springframework.org/schema/batch" xmlns:bean="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.2.xsd"> <!--引入job-context.xml配置文件--> <bean:import resource="classpath:job-context.xml"/> <!--定义billJob billStep 包含读数据 处理数据 写数据--> <job id="billJob"> <step id="billStep"> <tasklet transaction-manager="transactionManager"> <!--commit-interval="2" 表示任务提交间隔的大小 此处表示每处理2条数据 进行一次写入操作--> <chunk reader="csvItemReader" writer="csvItemWriter" processor="creditBillProcessor" commit-interval="2"> </chunk> </tasklet> </step> </job> <!-- 读取信用卡账单文件,CSV格式 --> <bean:bean id="csvItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"> <!--设置读取的文件资源--> <bean:property name="resource" value="classpath:credit-card-bill-201303.csv"/> <!--将文本中的每行记录转换为领域对象--> <bean:property name="lineMapper"> <bean:bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <!--引用lineTokenizer--> <bean:property name="lineTokenizer" ref="lineTokenizer"/> <!--fieldSetMapper根据lineTokenizer中定义的names属性映射到领域对象中去--> <bean:property name="fieldSetMapper"> <bean:bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"> <bean:property name="prototypeBeanName" value="creditBill"> </bean:property> </bean:bean> </bean:property> </bean:bean> </bean:property> </bean:bean> <!-- lineTokenizer 定义文本中每行的分隔符号 以及每行映射成FieldSet对象后的name列表 --> <bean:bean id="lineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <bean:property name="delimiter" value=","/> <bean:property name="names"> <bean:list> <bean:value>accountID</bean:value> <bean:value>name</bean:value> <bean:value>amount</bean:value> <bean:value>date</bean:value> <bean:value>address</bean:value> </bean:list> </bean:property> </bean:bean> <!-- 写信用卡账单文件,CSV格式 --> <bean:bean id="csvItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step"> <bean:property name="resource" value="file:outputFile.csv"/> <bean:property name="lineAggregator"> <bean:bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator"> <bean:property name="delimiter" value=","></bean:property> <bean:property name="fieldExtractor"> <bean:bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"> <bean:property name="names" value="accountID,name,amount,date,address"> </bean:property> </bean:bean> </bean:property> </bean:bean> </bean:property> </bean:bean> <!--领域对象 并标注为原型--> <bean:bean id="creditBill" scope="prototype" class="com.my.domain.CreditBill"> </bean:bean> <!--负责业务数据的处理--> <bean:bean id="creditBillProcessor" scope="step" class="com.my.processor.CreditBillProcessor"> </bean:bean></bean:beans>
5.创建ItemProcessor该类主要是用于处理ItemReader读取的数据,通常包括数据过滤、数据转换等操作,此处我们只是简单的打印账单信息。具体代码如下:
/** * */package com.my.processor;import org.springframework.batch.item.ItemProcessor;import com.my.domain.CreditBill;/** * 处理数据器 * @author wbw * */public class CreditBillProcessor implements ItemProcessor<CreditBill, CreditBill> { public CreditBill process(CreditBill bill) throws Exception { System.out.println("信用卡消费领域对象:"+bill.getAccountID()+"-"+bill.getName()+"-"+bill.getAmount()+"-"+bill.getAddress()); return bill; }}
6.启动Job我们这里介绍两种启动运行方式,
第一种:main方法
/** * */package com.my.batch;import org.springframework.batch.core.Job;import org.springframework.batch.core.JobExecution;import org.springframework.batch.core.JobParameters;import org.springframework.batch.core.launch.JobLauncher;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;/** * 测试 * @author wbw * */public class JobLaunch { /** * @param args */ @SuppressWarnings("resource") public static void main(String[] args) { //初始化应用上下文 ApplicationContext context = new ClassPathXmlApplicationContext("job.xml"); //获取作业调度,根据Bean的名称从Spring的上下文获取 JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher"); //获取任务对象 Job job = (Job) context.getBean("billJob"); try { JobExecution result = launcher.run(job, new JobParameters()); System.out.println(result.toString()); } catch (Exception e) { e.printStackTrace(); } }}
第二种:Junit单元测试package com.my.batch;import org.junit.After;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.batch.core.Job;import org.springframework.batch.core.JobExecution;import org.springframework.batch.core.JobParameters;import org.springframework.batch.core.launch.JobLauncher;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations={"job.xml"})public class JobLaunchTest { @Autowired private JobLauncher jobLauncher; @Autowired@Qualifier("billJob") private Job job; @Before public void setUp() throws Exception { } @After public void tearDown() throws Exception { } @Test public void billJob() throws Exception { JobExecution result = jobLauncher.run(job, new JobParameters()); System.out.println(result.toString()); }}
执行Job后 outputFile文件已录入数据控制台信息:
这里只显示了两条信用卡消费账单信息,因为我们在job.xml文件中设置任务提交间隔的大小 每处理2条数据 进行一次写入操作。如下图
总结:
从这个实例场景中,我们学会了使用SpringBatch已经提供好的功能组件和基础设施,快速搭建批处理应用。了解了SpringBatch的一些基本概念:
JobRepository:作业仓库,负责Job、Step执行过程中的状态保存;
JobLauncher:作业调度器,提供执行Job的入口;
Job:作业,由一个或多个的Step组成;
Step:作业步,Job的一个执行环节,由一个或多个的Step组成Job
Tasklet:Stp中具体执行逻辑的操作,可以重复执行,可以设置具体的同步、异步操作
Chunk:给定数量的Item集合
Item:一条数据记录
ItenReader:从数据源(文件、数据库或消息队列等)中获取Item
ItemProcessor:在Item写数据之前,对数据进行数据过滤、数据清洗、数据转换、数据校验等操作
ItemWriter:将Item数据记录批量写入数据源(文件、数据库或消息队列等)
未完待续-----------
再分享一下我老师大神的人工智能教程吧。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到我们人工智能的队伍中来!https://blog.****.net/jiangjunshow