Spring Batch 入门之 CSV-to-XML

Spring Batch CSV to XML

学习使用Spring batch从CSV文件读取记录,并使用 StaxEventItemWriter 输出经过处理的记录转换为 XML 的数据。

Spring Batch 入门之 CSV-to-XML

上面的图显示了构成Spring Batch领域语言的关键概念。
作业有一到多个步骤,每个步骤只有一个 ItemReader、一个 ItemProcessor 和一个 ItemWriter
使用 JobLauncher 来启动作业,并且需要使用 JobRepository 存储关于当前正在运行的进程的元数据。

项目概述

在这个应用程序中,我们将执行以下任务:

  1. 使用 FlatFileItemReader 从CSV文件读取交易记录。
  2. 使用 CustomItemProcessor 进行项目的业务处理。当 ItemReader 读取一个项目,而 ItemWriter 写入它们时,ItemProcessor 提供一个转换或应用其他业务处理的访问点。
  3. 使用 StaxEventItemWriter 获取 CustomItemProcessor 的处理结果,并将它转换成 XML 类型数据作为最终输出。
  4. 查看 XML 文件检验结果

工程结构

Spring Batch 入门之 CSV-to-XML

Maven 依赖

sqlite-jdbcmysql-connector-java 可以选择其中一个。
当选择其中一种时,同时也要在 applicationContext.xml 文件中做出相应的改动。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.littlefxc.example</groupId>
    <artifactId>Spring-CSV-to-XML</artifactId>
    <version>1.0-snapshot</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring.version>5.0.9.RELEASE</spring.version>
        <spring.batch.version>4.0.1.RELEASE</spring.batch.version>
        <sqlite.version>3.8.11.2</sqlite.version>
        <mysql.version>5.1.47</mysql.version>
    </properties>

    <dependencies>
        <!-- SQLite database driver -->
        <dependency>
            <groupId>org.xerial</groupId>
            <artifactId>sqlite-jdbc</artifactId>
            <version>${sqlite.version}</version>
        </dependency>
        <!-- MySQL database driver -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-oxm</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-core</artifactId>
            <version>${spring.batch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
        </dependency>
    </dependencies>

</project>

applicationContext.xml

我们将使用 FlatFileItemReader 读取 CSV 文件。
我们将使用它的标准配置,包括 DefaultLineMapperDelimitedLineTokenizerBeanWrapperFieldSetMapper 类。
为了在XML文件中输出记录,我们将使用 StaxEventItemWriter 作为标准编写器。

注意: 标签中的数据库脚本,初期选择了sqlite,再然后测试了mysql。
在生产环境中这个标签一般是不需要的。
如果你要选择sqlite或者其它的数据库,你需要在数据源的配置(dataSource)、jobRepository中进行相应的配置。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">

    <!-- 1. 自动创建批量工作的数据库:这里使用sqlite -->
    <jdbc:initialize-database data-source="dataSource">
        <!--<jdbc:script location="org/springframework/batch/core/schema-drop-sqlite.sql"/>-->
        <!--<jdbc:script location="org/springframework/batch/core/schema-sqlite.sql"/>-->
        <jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql"/>
        <jdbc:script location="org/springframework/batch/core/schema-mysql.sql"/>
    </jdbc:initialize-database>

    <!-- 2. 连接数据库 -->
    <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <!--<property name="driverClassName" value="org.sqlite.JDBC"/>-->
        <!--<property name="url" value="jdbc:sqlite:Spring-CSV-to-XML.sqlite"/>-->
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://192.168.120.63:3306/batch?useSSL=false"/>
        <property name="username" value="root"/>
        <property name="password" value="123456"/>
    </bean>

    <!-- 3. 事务管理 -->
    <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>

    <!-- 4. 为JobLauncher,Job和Step实现提供CRUD操作 -->
    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
        <property name="dataSource" ref="dataSource"/>
        <property name="transactionManager" ref="transactionManager"/>
        <!--<property name="databaseType" value="sqlite"/>-->
        <property name="databaseType" value="mysql"/>
    </bean>

    <!-- 5. 基于内存,为JobLauncher,Job和Step实现提供CRUD操作 -->
    <!--<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="transactionManager" />
    </bean> -->
    <!-- 5. JobLauncher表示一个简单的接口,用于使用给定的 JobParameter 启动作业 -->
    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
    </bean>

    <!-- 6. batch 输入 -->
    <bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader">

        <!-- 输入资源 -->
        <property name="resource" value="input/record.csv"/>

        <!-- 将输入资源转化为对象 -->
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <property name="names" value="username,user_id,transaction_date,transaction_amount"/>
                    </bean>
                </property>
                <property name="fieldSetMapper">
                    <bean class="com.littlefxc.examples.batch.service.RecordFieldSetMapper"/>
                </property>
            </bean>
        </property>
    </bean>

    <!-- 7. batch的处理器 -->
    <bean id="itemProcessor" class="com.littlefxc.examples.batch.service.CustomItemProcessor"/>

    <!-- 8. batch 输出:输出为xml -->
    <bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
        <property name="resource" value="file:xml/output.xml"/>
        <property name="marshaller" ref="recordMarshaller"/>
        <property name="rootTagName" value="transactionRecord"/>
    </bean>
    <!-- 8. batch 输出:输出到mysql -->
    <!--<bean id="itemWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <property name="dataSource" ref="dataSource" />
        <property name="sql" value="INSERT INTO transaction_record (user_id, username, transaction_date, amount) VALUES (:userId, :username, :transactionDate, :amount)" />
        <property name="itemSqlParameterSourceProvider">
            <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
        </property>
    </bean>-->

    <!-- 8.1. xml->object 与 object->xml -->
    <bean id="recordMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
        <property name="classesToBeBound">
            <list>
                <value>com.littlefxc.examples.batch.model.Transaction</value>
            </list>
        </property>
    </bean>

    <!-- 9. 配置batch的输入(6)、处理器(7)、输出(8) -->
    <batch:job id="firstBatchJob">
        <batch:step id="step1">
            <batch:tasklet>
                <batch:chunk reader="itemReader" processor="itemProcessor" writer="itemWriter" commit-interval="10"/>
            </batch:tasklet>
        </batch:step>
    </batch:job>

</beans>

RecordFieldSetMapper

ItemReader 的属性,作用是将 FieldSet 转换为对象

package com.littlefxc.examples.batch.service;

import com.littlefxc.examples.batch.model.Transaction;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import java.text.ParseException;
import java.text.SimpleDateFormat;

/**
 * 将 FieldSet 转换为对象
 * @author fengxuechao
 * @date 2019/1/4
 **/
public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {

    public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
        SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
        Transaction transaction = new Transaction();

        transaction.setUsername(fieldSet.readString("username"));
        transaction.setUserId(fieldSet.readInt("user_id"));
        transaction.setAmount(fieldSet.readDouble("transaction_amount"));
        String dateString = fieldSet.readString("transaction_date");
        try {
            transaction.setTransactionDate(dateFormat.parse(dateString));
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return transaction;
    }
}

CustomItemProcessor

自定义实现接口 ItemProcessor, 作为 ItemReaderItemWriter 的转换点。

package com.littlefxc.examples.batch.service;

import com.littlefxc.examples.batch.model.Transaction;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import java.text.ParseException;
import java.text.SimpleDateFormat;

/**
 * 将读取到的数据集合转换为对象
 * @author fengxuechao
 * @date 2019/1/4
 **/
public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {

    public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
        SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
        Transaction transaction = new Transaction();

        transaction.setUsername(fieldSet.readString("username"));
        transaction.setUserId(fieldSet.readInt("user_id"));
        transaction.setAmount(fieldSet.readDouble("transaction_amount"));
        String dateString = fieldSet.readString("transaction_date");
        try {
            transaction.setTransactionDate(dateFormat.parse(dateString));
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return transaction;
    }
}

模型

package com.littlefxc.examples.batch.model;

import lombok.Data;

import javax.xml.bind.annotation.XmlRootElement;
import java.util.Date;

/**
 * @author fengxuechao
 */
@Data
@XmlRootElement(name = "transactionRecord")
public class Transaction {

    private String username;

    private int userId;

    private Date transactionDate;

    private double amount;
}

record.csv

devendra, 1234, 31/10/2015, 10000
john    , 2134, 3/12/2015 , 12321
robin   , 2134, 2/02/2015 , 23411

启动程序

package com.littlefxc.examples.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class App {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext();
        context.setConfigLocations("classpath:spring-context.xml");
        context.refresh();

        JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
        Job job = (Job) context.getBean("firstBatchJob");
        System.out.println("Starting the batch job");
        try {
            JobExecution execution = jobLauncher.run(job, new JobParameters());
            System.out.println("Job Status : " + execution.getStatus());
            System.out.println("Job completed");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Job failed");
        }
    }
}

验证

<?xml version="1.0" encoding="UTF-8"?>
<transactionRecord>
    <transactionRecord>
        <amount>10000.0</amount>
        <transactionDate>2015-10-31T00:00:00+08:00</transactionDate>
        <userId>1234</userId>
        <username>devendra</username>
    </transactionRecord>
    <transactionRecord>
        <amount>12321.0</amount>
        <transactionDate>2015-12-03T00:00:00+08:00</transactionDate>
        <userId>2134</userId>
        <username>john</username>
    </transactionRecord>
    <transactionRecord>
        <amount>23411.0</amount>
        <transactionDate>2015-02-02T00:00:00+08:00</transactionDate>
        <userId>2134</userId>
        <username>robin</username>
    </transactionRecord>
</transactionRecord>