Spring Boot整合Spring Batch

引言

Spring Batch是处理大量数据操作的一个框架,主要用来读取大量数据,然后进行一定的处理后输出指定的形式。比如我们可以将csv文件中的数据(数据量几百万甚至几千万都是没问题的)批处理插入保存到数据库中,就可以使用该框架,但是不管是数据资料还是网上资料,我看到很少有这样的详细讲解。所以本片博文的主要目的边讲解的同时边实战(其中的代码都是经过实践的)。同样地先从Spring Boot对Batch框架的支持说起,最后一步一步进行代码实践!


一、Spring Boot对Batch框架的支持

1、Spring Batch框架的组成部分

1)JobRepository:用来注册Job容器,设置数据库相关属性。

2)JobLauncher:用来启动Job的接口

3)Job:我们要实际执行的任务,包含一个或多个

4)Step:即步骤,包括:ItemReader->ItemProcessor->ItemWriter

5)ItemReader:用来读取数据,做实体类与数据字段之间的映射。比如读取csv文件中的人员数据,之后对应实体person的字段做mapper

6)ItemProcessor:用来处理数据的接口,同时可以做数据校验(设置校验器,使用JSR-303(hibernate-validator)注解),比如将中文性别男/女,转为M/F。同时校验年龄字段是否符合要求等

7)ItemWriter:用来输出数据的接口,设置数据库源。编写预处理SQL插入语句

以上七个组成部分,只需要在配置类中逐一注册即可,同时配置类需要开启@EnableBatchProcessing注解

 
@Configuration @EnableBatchProcessing // 开启批处理的支持 @Import(DruidDBConfig.class) // 注入datasource public class CsvBatchConfig { } 

2、批处理流程图

如下流程图即可以解释在配置类中为什么需要这么定义,具体请看实战部分的代码。

Spring Boot整合Spring Batch

 

二、实战

1、添加依赖

1)spring batch依赖

 
<!-- spring batch --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> 

2)校验器依赖

 
<!-- hibernate validator --> <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator</artifactId> <version>6.0.7.Final</version> </dependency> 

3)mysql+druid依赖

 
<!-- mysql connector--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.35</version> </dependency> <!-- alibaba dataSource --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.12</version> </dependency> 

4)test测试依赖

 
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> 

2、application.yml配置

当job发布开始执行任务时,spring batch会自动生成相关的batch开头的表。这些表一开始是不存在的!需要在application配置文件中做相关的设置。

Spring Boot整合Spring Batch

 

 
# batch batch: job:  # 默认自动执行定义的Job(true),改为false,需要jobLaucher.run执行 enabled: false  # spring batch在数据库里面创建默认的数据表,如果不是always则会提示相关表不存在 initialize-schema: always  # 设置batch表的前缀 # table-prefix: csv-batch 

Spring Boot整合Spring Batch

 

3、数据源配置

 
datasource: username: root password: 1234 url: jdbc:mysql://127.0.0.1:3306/db_base?useSSL=false&serverTimezone=UTC&characterEncoding=utf8 driver-class-name: com.mysql.jdbc.Driver 

注册DBConfig配置类:之后通过import导入batch配置类中

View Code

4、编写batch配置类

在配置类中,注册Spring Batch的各个组成部分即可,其中部分说明已在代码中注释.

View Code

5、定义处理器

只需要实现ItemProcessor接口,重写process方法,输入的参数是从ItemReader读取到的数据,返回的数据给ItemWriter

 
/** * @author jian * @date 2019/4/28 * @description * CSV文件数据处理及校验 * 只需要实现ItemProcessor接口,重写process方法,输入的参数是从ItemReader读取到的数据,返回的数据给ItemWriter */ public class CvsItemProcessor extends ValidatingItemProcessor<Person> { private Logger logger = LoggerFactory.getLogger(CvsItemProcessor.class); @Override public Person process(Person item) throws ValidationException { // 执行super.process()才能调用自定义的校验器 logger.info("processor start validating..."); super.process(item); // 数据处理,比如将中文性别设置为M/F if ("男".equals(item.getGender())) { item.setGender("M"); } else { item.setGender("F"); } logger.info("processor end validating..."); return item; } } 

6、定义校验器

定义校验器:使用JSR-303(hibernate-validator)注解,来校验ItemReader读取到的数据是否满足要求。如不满足则不会进行接下来的批处理任务。

View Code

7、定义监听器:

监听Job执行情况,则定义一个类实现JobExecutorListener,并定义Job的Bean上绑定该监听器

 
/** * @author jian * @date 2019/4/28 * @description * 监听Job执行情况,则定义一个类实现JobExecutorListener,并定义Job的Bean上绑定该监听器 */ public class CsvJobListener implements JobExecutionListener { private Logger logger = LoggerFactory.getLogger(CsvJobListener.class); private long startTime; private long endTime; @Override public void beforeJob(JobExecution jobExecution) { startTime = System.currentTimeMillis(); logger.info("job process start..."); } @Override public void afterJob(JobExecution jobExecution) { endTime = System.currentTimeMillis(); logger.info("job process end..."); logger.info("elapsed time: " + (endTime - startTime) + "ms"); } } 

三、测试

1、person.csv文件

csv文件时以逗号为分隔的数据表示字段,回车表示一行(条)数据记录

 
1,Zhangsan,21,男 2,Lisi,22,女 3,Wangwu,23,男 4,Zhaoliu,24,男 5,Zhouqi,25,女 

放在resources下,在ItemReader中读取的该路径即可

Spring Boot整合Spring Batch

 

2、person实体

person.csv中的字段与之对应,并在该实体中可以添加校验注解,如@Size表示该字段的长度范围,如果超过规定。则会被校验检测到,批处理将不会进行!

View Code

3、数据表

 
CREATE TABLE `person` ( `id` int(11) NOT NULL, `name` varchar(10) DEFAULT NULL, `age` int(11) DEFAULT NULL, `gender` varchar(2) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1 

一开始表是没有数据的

Spring Boot整合Spring Batch

 

4、测试类

需要注入发布器,与job任务。同时可以使用后置参数灵活处理,最后调用JobLauncher.run方法执行批处理任务

 
@RunWith(SpringRunner.class) @SpringBootTest public class BatchTest { @Autowired SimpleJobLauncher jobLauncher; @Autowired Job importJob; @Test public void test() throws Exception{ // 后置参数:使用JobParameters中绑定参数 JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(importJob, jobParameters); } } 

5、测试结果

 
.... 2019-05-09 15:23:39.576 INFO 18296 --- [ main] com.lijian.test.BatchTest : Started BatchTest in 6.214 seconds (JVM running for 7.185) 2019-05-09 15:23:39.939 INFO 18296 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importCsvJob]] launched with the following parameters: [{time=1557386619763}] 2019-05-09 15:23:39.982 INFO 18296 --- [ main] com.lijian.config.batch.CsvJobListener : job process start... 2019-05-09 15:23:40.048 INFO 18296 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step] 2019-05-09 15:23:40.214 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.282 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.283 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.283 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.283 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.283 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.283 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.283 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.283 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.284 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.525 INFO 18296 --- [ main] com.lijian.config.batch.CsvJobListener : job process end... 2019-05-09 15:23:40.526 INFO 18296 --- [ main] com.lijian.config.batch.CsvJobListener : elapsed time: 543ms 2019-05-09 15:23:40.548 INFO 18296 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importCsvJob]] completed with the following parameters: [{time=1557386619763}] and the following status: [COMPLETED] 2019-05-09 15:23:40.564 INFO 18296 --- [ Thread-5] com.alibaba.druid.pool.DruidDataSource : {dataSource-1} closed 

查看表中数据: select * from person;

Spring Boot整合Spring Batch

 

若继续插入数据,并且测试校验器是否生效,则将person.csv更改为如下内容:

 
6,springbatch,24,男 7,springboot,23,女 

由于实体类中JSR校验注解对name长度范围进行了检验,即添加了 @Size(min=2, max=8) 的注解。故会报错显示校验不通过,批处理将不会进行。

 
... Started BatchTest in 5.494 seconds (JVM running for 6.41) 2019-05-09 15:30:02.147 INFO 20368 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importCsvJob]] launched with the following parameters: [{time=1557387001499}] 2019-05-09 15:30:02.247 INFO 20368 --- [ main] com.lijian.config.batch.CsvJobListener : job process start... 2019-05-09 15:30:02.503 INFO 20368 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step] 2019-05-09 15:30:02.683 INFO 20368 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:30:02.761 ERROR 20368 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step step in job importCsvJob org.springframework.batch.item.validator.ValidationException: size must be between 2 and 8 ... 

引言

Spring Batch是处理大量数据操作的一个框架,主要用来读取大量数据,然后进行一定的处理后输出指定的形式。比如我们可以将csv文件中的数据(数据量几百万甚至几千万都是没问题的)批处理插入保存到数据库中,就可以使用该框架,但是不管是数据资料还是网上资料,我看到很少有这样的详细讲解。所以本片博文的主要目的边讲解的同时边实战(其中的代码都是经过实践的)。同样地先从Spring Boot对Batch框架的支持说起,最后一步一步进行代码实践!


一、Spring Boot对Batch框架的支持

1、Spring Batch框架的组成部分

1)JobRepository:用来注册Job容器,设置数据库相关属性。

2)JobLauncher:用来启动Job的接口

3)Job:我们要实际执行的任务,包含一个或多个

4)Step:即步骤,包括:ItemReader->ItemProcessor->ItemWriter

5)ItemReader:用来读取数据,做实体类与数据字段之间的映射。比如读取csv文件中的人员数据,之后对应实体person的字段做mapper

6)ItemProcessor:用来处理数据的接口,同时可以做数据校验(设置校验器,使用JSR-303(hibernate-validator)注解),比如将中文性别男/女,转为M/F。同时校验年龄字段是否符合要求等

7)ItemWriter:用来输出数据的接口,设置数据库源。编写预处理SQL插入语句

以上七个组成部分,只需要在配置类中逐一注册即可,同时配置类需要开启@EnableBatchProcessing注解

 
@Configuration @EnableBatchProcessing // 开启批处理的支持 @Import(DruidDBConfig.class) // 注入datasource public class CsvBatchConfig { } 

2、批处理流程图

如下流程图即可以解释在配置类中为什么需要这么定义,具体请看实战部分的代码。

Spring Boot整合Spring Batch

 

二、实战

1、添加依赖

1)spring batch依赖

 
<!-- spring batch --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> 

2)校验器依赖

 
<!-- hibernate validator --> <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator</artifactId> <version>6.0.7.Final</version> </dependency> 

3)mysql+druid依赖

 
<!-- mysql connector--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.35</version> </dependency> <!-- alibaba dataSource --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.12</version> </dependency> 

4)test测试依赖

 
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> 

2、application.yml配置

当job发布开始执行任务时,spring batch会自动生成相关的batch开头的表。这些表一开始是不存在的!需要在application配置文件中做相关的设置。

Spring Boot整合Spring Batch

 

 
# batch batch: job:  # 默认自动执行定义的Job(true),改为false,需要jobLaucher.run执行 enabled: false  # spring batch在数据库里面创建默认的数据表,如果不是always则会提示相关表不存在 initialize-schema: always  # 设置batch表的前缀 # table-prefix: csv-batch 

3、数据源配置

 
datasource: username: root password: 1234 url: jdbc:mysql://127.0.0.1:3306/db_base?useSSL=false&serverTimezone=UTC&characterEncoding=utf8 driver-class-name: com.mysql.jdbc.Driver 

注册DBConfig配置类:之后通过import导入batch配置类中

View Code

4、编写batch配置类

在配置类中,注册Spring Batch的各个组成部分即可,其中部分说明已在代码中注释.

View Code

5、定义处理器

只需要实现ItemProcessor接口,重写process方法,输入的参数是从ItemReader读取到的数据,返回的数据给ItemWriter

 
/** * @author jian * @date 2019/4/28 * @description * CSV文件数据处理及校验 * 只需要实现ItemProcessor接口,重写process方法,输入的参数是从ItemReader读取到的数据,返回的数据给ItemWriter */ public class CvsItemProcessor extends ValidatingItemProcessor<Person> { private Logger logger = LoggerFactory.getLogger(CvsItemProcessor.class); @Override public Person process(Person item) throws ValidationException { // 执行super.process()才能调用自定义的校验器 logger.info("processor start validating..."); super.process(item); // 数据处理,比如将中文性别设置为M/F if ("男".equals(item.getGender())) { item.setGender("M"); } else { item.setGender("F"); } logger.info("processor end validating..."); return item; } } 

6、定义校验器

定义校验器:使用JSR-303(hibernate-validator)注解,来校验ItemReader读取到的数据是否满足要求。如不满足则不会进行接下来的批处理任务。

View Code

7、定义监听器:

监听Job执行情况,则定义一个类实现JobExecutorListener,并定义Job的Bean上绑定该监听器

 
/** * @author jian * @date 2019/4/28 * @description * 监听Job执行情况,则定义一个类实现JobExecutorListener,并定义Job的Bean上绑定该监听器 */ public class CsvJobListener implements JobExecutionListener { private Logger logger = LoggerFactory.getLogger(CsvJobListener.class); private long startTime; private long endTime; @Override public void beforeJob(JobExecution jobExecution) { startTime = System.currentTimeMillis(); logger.info("job process start..."); } @Override public void afterJob(JobExecution jobExecution) { endTime = System.currentTimeMillis(); logger.info("job process end..."); logger.info("elapsed time: " + (endTime - startTime) + "ms"); } } 

三、测试

1、person.csv文件

csv文件时以逗号为分隔的数据表示字段,回车表示一行(条)数据记录

 
1,Zhangsan,21,男 2,Lisi,22,女 3,Wangwu,23,男 4,Zhaoliu,24,男 5,Zhouqi,25,女 

放在resources下,在ItemReader中读取的该路径即可

Spring Boot整合Spring Batch

 

2、person实体

person.csv中的字段与之对应,并在该实体中可以添加校验注解,如@Size表示该字段的长度范围,如果超过规定。则会被校验检测到,批处理将不会进行!

View Code

3、数据表

 
CREATE TABLE `person` ( `id` int(11) NOT NULL, `name` varchar(10) DEFAULT NULL, `age` int(11) DEFAULT NULL, `gender` varchar(2) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1 

一开始表是没有数据的

Spring Boot整合Spring Batch

 

4、测试类

需要注入发布器,与job任务。同时可以使用后置参数灵活处理,最后调用JobLauncher.run方法执行批处理任务

 
@RunWith(SpringRunner.class) @SpringBootTest public class BatchTest { @Autowired SimpleJobLauncher jobLauncher; @Autowired Job importJob; @Test public void test() throws Exception{ // 后置参数:使用JobParameters中绑定参数 JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis()) .toJobParameters(); jobLauncher.run(importJob, jobParameters); } } 

5、测试结果

 
.... 2019-05-09 15:23:39.576 INFO 18296 --- [ main] com.lijian.test.BatchTest : Started BatchTest in 6.214 seconds (JVM running for 7.185) 2019-05-09 15:23:39.939 INFO 18296 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importCsvJob]] launched with the following parameters: [{time=1557386619763}] 2019-05-09 15:23:39.982 INFO 18296 --- [ main] com.lijian.config.batch.CsvJobListener : job process start... 2019-05-09 15:23:40.048 INFO 18296 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step] 2019-05-09 15:23:40.214 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.282 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.283 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.283 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.283 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.283 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.283 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.283 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.283 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:23:40.284 INFO 18296 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor end validating... 2019-05-09 15:23:40.525 INFO 18296 --- [ main] com.lijian.config.batch.CsvJobListener : job process end... 2019-05-09 15:23:40.526 INFO 18296 --- [ main] com.lijian.config.batch.CsvJobListener : elapsed time: 543ms 2019-05-09 15:23:40.548 INFO 18296 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importCsvJob]] completed with the following parameters: [{time=1557386619763}] and the following status: [COMPLETED] 2019-05-09 15:23:40.564 INFO 18296 --- [ Thread-5] com.alibaba.druid.pool.DruidDataSource : {dataSource-1} closed 

Spring Boot整合Spring Batch

 

查看表中数据: select * from person;

Spring Boot整合Spring Batch

 

若继续插入数据,并且测试校验器是否生效,则将person.csv更改为如下内容:

 
6,springbatch,24,男 7,springboot,23,女 

由于实体类中JSR校验注解对name长度范围进行了检验,即添加了 @Size(min=2, max=8) 的注解。故会报错显示校验不通过,批处理将不会进行。

 
... Started BatchTest in 5.494 seconds (JVM running for 6.41) 2019-05-09 15:30:02.147 INFO 20368 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importCsvJob]] launched with the following parameters: [{time=1557387001499}] 2019-05-09 15:30:02.247 INFO 20368 --- [ main] com.lijian.config.batch.CsvJobListener : job process start... 2019-05-09 15:30:02.503 INFO 20368 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step] 2019-05-09 15:30:02.683 INFO 20368 --- [ main] c.lijian.config.batch.CvsItemProcessor : processor start validating... 2019-05-09 15:30:02.761 ERROR 20368 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step step in job importCsvJob org.springframework.batch.item.validator.ValidationException: size must be between 2 and 8 ...