Spring Batch 入门之 CSV-to-XML
Spring Batch CSV to XML
学习使用Spring batch从CSV文件读取记录,并使用 StaxEventItemWriter
输出经过处理的记录转换为 XML 的数据。
上面的图显示了构成Spring Batch领域语言的关键概念。
作业有一到多个步骤,每个步骤只有一个 ItemReader
、一个 ItemProcessor
和一个 ItemWriter
。
使用 JobLauncher
来启动作业,并且需要使用 JobRepository
存储关于当前正在运行的进程的元数据。
项目概述
在这个应用程序中,我们将执行以下任务:
- 使用
FlatFileItemReader
从CSV文件读取交易记录。 - 使用
CustomItemProcessor
进行项目的业务处理。当ItemReader
读取一个项目,而ItemWriter
写入它们时,ItemProcessor
提供一个转换或应用其他业务处理的访问点。 - 使用
StaxEventItemWriter
获取CustomItemProcessor
的处理结果,并将它转换成 XML 类型数据作为最终输出。 - 查看 XML 文件检验结果
工程结构
Maven 依赖
sqlite-jdbc
和 mysql-connector-java
可以选择其中一个。
当选择其中一种时,同时也要在 applicationContext.xml
文件中做出相应的改动。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.littlefxc.example</groupId>
<artifactId>Spring-CSV-to-XML</artifactId>
<version>1.0-snapshot</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>5.0.9.RELEASE</spring.version>
<spring.batch.version>4.0.1.RELEASE</spring.batch.version>
<sqlite.version>3.8.11.2</sqlite.version>
<mysql.version>5.1.47</mysql.version>
</properties>
<dependencies>
<!-- SQLite database driver -->
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>${sqlite.version}</version>
</dependency>
<!-- MySQL database driver -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
</dependencies>
</project>
applicationContext.xml
我们将使用 FlatFileItemReader
读取 CSV 文件。
我们将使用它的标准配置,包括 DefaultLineMapper
,DelimitedLineTokenizer
和 BeanWrapperFieldSetMapper
类。
为了在XML文件中输出记录,我们将使用 StaxEventItemWriter
作为标准编写器。
注意: 标签中的数据库脚本,初期选择了sqlite,再然后测试了mysql。
在生产环境中这个标签一般是不需要的。
如果你要选择sqlite或者其它的数据库,你需要在数据源的配置(dataSource)、jobRepository中进行相应的配置。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">
<!-- 1. 自动创建批量工作的数据库:这里使用sqlite -->
<jdbc:initialize-database data-source="dataSource">
<!--<jdbc:script location="org/springframework/batch/core/schema-drop-sqlite.sql"/>-->
<!--<jdbc:script location="org/springframework/batch/core/schema-sqlite.sql"/>-->
<jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql"/>
<jdbc:script location="org/springframework/batch/core/schema-mysql.sql"/>
</jdbc:initialize-database>
<!-- 2. 连接数据库 -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<!--<property name="driverClassName" value="org.sqlite.JDBC"/>-->
<!--<property name="url" value="jdbc:sqlite:Spring-CSV-to-XML.sqlite"/>-->
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.120.63:3306/batch?useSSL=false"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
</bean>
<!-- 3. 事务管理 -->
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
<!-- 4. 为JobLauncher,Job和Step实现提供CRUD操作 -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
<property name="dataSource" ref="dataSource"/>
<property name="transactionManager" ref="transactionManager"/>
<!--<property name="databaseType" value="sqlite"/>-->
<property name="databaseType" value="mysql"/>
</bean>
<!-- 5. 基于内存,为JobLauncher,Job和Step实现提供CRUD操作 -->
<!--<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" />
</bean> -->
<!-- 5. JobLauncher表示一个简单的接口,用于使用给定的 JobParameter 启动作业 -->
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository"/>
</bean>
<!-- 6. batch 输入 -->
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
<!-- 输入资源 -->
<property name="resource" value="input/record.csv"/>
<!-- 将输入资源转化为对象 -->
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="username,user_id,transaction_date,transaction_amount"/>
</bean>
</property>
<property name="fieldSetMapper">
<bean class="com.littlefxc.examples.batch.service.RecordFieldSetMapper"/>
</property>
</bean>
</property>
</bean>
<!-- 7. batch的处理器 -->
<bean id="itemProcessor" class="com.littlefxc.examples.batch.service.CustomItemProcessor"/>
<!-- 8. batch 输出:输出为xml -->
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" value="file:xml/output.xml"/>
<property name="marshaller" ref="recordMarshaller"/>
<property name="rootTagName" value="transactionRecord"/>
</bean>
<!-- 8. batch 输出:输出到mysql -->
<!--<bean id="itemWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter">
<property name="dataSource" ref="dataSource" />
<property name="sql" value="INSERT INTO transaction_record (user_id, username, transaction_date, amount) VALUES (:userId, :username, :transactionDate, :amount)" />
<property name="itemSqlParameterSourceProvider">
<bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
</property>
</bean>-->
<!-- 8.1. xml->object 与 object->xml -->
<bean id="recordMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
<property name="classesToBeBound">
<list>
<value>com.littlefxc.examples.batch.model.Transaction</value>
</list>
</property>
</bean>
<!-- 9. 配置batch的输入(6)、处理器(7)、输出(8) -->
<batch:job id="firstBatchJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="itemReader" processor="itemProcessor" writer="itemWriter" commit-interval="10"/>
</batch:tasklet>
</batch:step>
</batch:job>
</beans>
RecordFieldSetMapper
ItemReader
的属性,作用是将 FieldSet
转换为对象
package com.littlefxc.examples.batch.service;
import com.littlefxc.examples.batch.model.Transaction;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
/**
* 将 FieldSet 转换为对象
* @author fengxuechao
* @date 2019/1/4
**/
public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {
public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
Transaction transaction = new Transaction();
transaction.setUsername(fieldSet.readString("username"));
transaction.setUserId(fieldSet.readInt("user_id"));
transaction.setAmount(fieldSet.readDouble("transaction_amount"));
String dateString = fieldSet.readString("transaction_date");
try {
transaction.setTransactionDate(dateFormat.parse(dateString));
} catch (ParseException e) {
e.printStackTrace();
}
return transaction;
}
}
CustomItemProcessor
自定义实现接口 ItemProcessor
, 作为 ItemReader
和 ItemWriter
的转换点。
package com.littlefxc.examples.batch.service;
import com.littlefxc.examples.batch.model.Transaction;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
/**
* 将读取到的数据集合转换为对象
* @author fengxuechao
* @date 2019/1/4
**/
public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {
public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
Transaction transaction = new Transaction();
transaction.setUsername(fieldSet.readString("username"));
transaction.setUserId(fieldSet.readInt("user_id"));
transaction.setAmount(fieldSet.readDouble("transaction_amount"));
String dateString = fieldSet.readString("transaction_date");
try {
transaction.setTransactionDate(dateFormat.parse(dateString));
} catch (ParseException e) {
e.printStackTrace();
}
return transaction;
}
}
模型
package com.littlefxc.examples.batch.model;
import lombok.Data;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.Date;
/**
* @author fengxuechao
*/
@Data
@XmlRootElement(name = "transactionRecord")
public class Transaction {
private String username;
private int userId;
private Date transactionDate;
private double amount;
}
record.csv
devendra, 1234, 31/10/2015, 10000
john , 2134, 3/12/2015 , 12321
robin , 2134, 2/02/2015 , 23411
启动程序
package com.littlefxc.examples.batch;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class App {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext();
context.setConfigLocations("classpath:spring-context.xml");
context.refresh();
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("firstBatchJob");
System.out.println("Starting the batch job");
try {
JobExecution execution = jobLauncher.run(job, new JobParameters());
System.out.println("Job Status : " + execution.getStatus());
System.out.println("Job completed");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Job failed");
}
}
}
验证
<?xml version="1.0" encoding="UTF-8"?>
<transactionRecord>
<transactionRecord>
<amount>10000.0</amount>
<transactionDate>2015-10-31T00:00:00+08:00</transactionDate>
<userId>1234</userId>
<username>devendra</username>
</transactionRecord>
<transactionRecord>
<amount>12321.0</amount>
<transactionDate>2015-12-03T00:00:00+08:00</transactionDate>
<userId>2134</userId>
<username>john</username>
</transactionRecord>
<transactionRecord>
<amount>23411.0</amount>
<transactionDate>2015-02-02T00:00:00+08:00</transactionDate>
<userId>2134</userId>
<username>robin</username>
</transactionRecord>
</transactionRecord>