Spring Batch 之 ItemReader


本文重点介绍 ItemReader,如何从不同数据源读取数据;以及异常处理及重启机制。

 

JdbcPagingItemReader

从数据库中读取数据

@Configuration
public class DBJdbcDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("dbJdbcDemoWriter")
    private ItemWriter<? super Customer> dbJdbcDemoWriter;

    @Autowired
    private DataSource dataSource;

    @Bean
    public Job DBJdbcDemoJob(){
        return jobBuilderFactory.get("DBJdbcDemoJob")
                .start(dbJdbcDemoStep())
                .build();

    }

    @Bean
    public Step dbJdbcDemoStep() {
        return stepBuilderFactory.get("dbJdbcDemoStep")
                .<Customer,Customer>chunk(100)
                .reader(dbJdbcDemoReader())
                .writer(dbJdbcDemoWriter)
                .build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<Customer> dbJdbcDemoReader() {
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();

        reader.setDataSource(this.dataSource);
        reader.setFetchSize(100); //批量读取
        reader.setRowMapper((rs,rowNum)->{
            return Customer.builder().id(rs.getLong("id"))
                    .firstName(rs.getString("firstName"))
                    .lastName(rs.getString("lastName"))
                    .birthdate(rs.getString("birthdate"))
                    .build();

        });

        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from Customer");
        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.ASCENDING);
        queryProvider.setSortKeys(sortKeys);

        reader.setQueryProvider(queryProvider);

        return reader;

    }
}

       Job 和 ItermWriter不是本文介绍重点,此处举例,下面例子相同

@Component("dbJdbcDemoWriter")
public class DbJdbcDemoWriter implements ItemWriter<Customer> {
    @Override
    public void write(List<? extends Customer> items) throws Exception {
        for (Customer customer:items)
            System.out.println(customer);

    }
}


FlatFileItemReader

从CVS文件中读取数据


@Configuration
public class FlatFileDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("flatFileDemoWriter")
    private ItemWriter<? super Customer> flatFileDemoWriter;

    @Bean
    public Job flatFileDemoJob(){
        return jobBuilderFactory.get("flatFileDemoJob")
                .start(flatFileDemoStep())
                .build();

    }

    @Bean
    public Step flatFileDemoStep() {
        return stepBuilderFactory.get("flatFileDemoStep")
                .<Customer,Customer>chunk(100)
                .reader(flatFileDemoReader())
                .writer(flatFileDemoWriter)
                .build();
    }

    @Bean
    @StepScope
    public FlatFileItemReader<Customer> flatFileDemoReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("customer.csv"));
        reader.setLinesToSkip(1);

        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"});

        DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper((fieldSet -> {
            return Customer.builder().id(fieldSet.readLong("id"))
                    .firstName(fieldSet.readString("firstName"))
                    .lastName(fieldSet.readString("lastName"))
                    .birthdate(fieldSet.readString("birthdate"))
                    .build();
        }));
        lineMapper.afterPropertiesSet();

        reader.setLineMapper(lineMapper);

        return reader;

    }
}

 

StaxEventItemReader

从XML文件中读取数据

@Configuration
public class XmlFileDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("xmlFileDemoWriter")
    private ItemWriter<? super Customer> xmlFileDemoWriter;

    @Bean
    public Job xmlFileDemoJob(){
        return jobBuilderFactory.get("xmlFileDemoJob")
                .start(xmlFileDemoStep())
                .build();

    }

    @Bean
    public Step xmlFileDemoStep() {
        return stepBuilderFactory.get("xmlFileDemoStep")
                .<Customer,Customer>chunk(10)
                .reader(xmlFileDemoReader())
                .writer(xmlFileDemoWriter)
                .build();
    }

    @Bean
    @StepScope
    public StaxEventItemReader<Customer> xmlFileDemoReader() {
        StaxEventItemReader<Customer> reader = new StaxEventItemReader<>();

        reader.setResource(new ClassPathResource("customer.xml"));
        reader.setFragmentRootElementName("customer");


        XStreamMarshaller unMarshaller = new XStreamMarshaller();
        Map<String,Class> map = new HashMap<>();
        map.put("customer",Customer.class);
        unMarshaller.setAliases(map);
        reader.setUnmarshaller(unMarshaller);


        return reader;

    }
}

 

MultiResourceItemReader

从多个文件读取数据

@Configuration
public class MultipleFileDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("flatFileDemoWriter")
    private ItemWriter<? super Customer> flatFileDemoWriter;

    @Value("classpath*:/file*.csv")
    private Resource[] inputFiles;

    @Bean
    public Job multipleFileDemoJob(){
        return jobBuilderFactory.get("multipleFileDemoJob")
                .start(multipleFileDemoStep())
                .build();

    }

    @Bean
    public Step multipleFileDemoStep() {
        return stepBuilderFactory.get("multipleFileDemoStep")
                .<Customer,Customer>chunk(50)
                .reader(multipleResourceItemReader())
                .writer(flatFileDemoWriter)
                .build();
    }

    private MultiResourceItemReader<Customer> multipleResourceItemReader() {

        MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>();

        reader.setDelegate(flatFileReader());
        reader.setResources(inputFiles);

        return reader;
    }

    @Bean
    public FlatFileItemReader<Customer> flatFileReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("customer.csv"));
       // reader.setLinesToSkip(1);

        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"});

        DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper((fieldSet -> {
            return Customer.builder().id(fieldSet.readLong("id"))
                    .firstName(fieldSet.readString("firstName"))
                    .lastName(fieldSet.readString("lastName"))
                    .birthdate(fieldSet.readString("birthdate"))
                    .build();
        }));
        lineMapper.afterPropertiesSet();

        reader.setLineMapper(lineMapper);

        return reader;

    }
}

 

异常处理及重启机制。

       对于chunk-oriented step,Spring Batch提供了管理状态的工具。如何在一个步骤中管理状态是通过ItemStream接口为开发人员提供访问权限保持状态的组件。这里提到的这个组件是ExecutionContext实际上它是键值对的映射。map存储特定步骤的状态。该ExecutionContext使重启步骤成为可能,因为状态在JobRepository中持久存在。

执行期间出现错误时,最后一个状态将更新为JobRepository。下次作业运行时,最后一个状态将用于填充ExecutionContext然后
可以继续从上次离开的地方开始运行。

检查ItemStream接口:
    将在步骤开始时调用open()并执行ExecutionContext;
    用DB填充值; update()将在每个步骤或事务结束时调用,更新ExecutionContext;
    完成所有数据块后调用close();

                  Spring Batch 之 ItemReader

下面我们构造个例子,

 准备个cvs文件,在第33条数据,添加一条错误名字信息 ;当读取到这条数据时,抛出异常终止程序。

Spring Batch 之 ItemReader

 

ItemReader测试代码


@Component("restartDemoReader")
public class RestartDemoReader implements ItemStreamReader<Customer> {


    private Long curLine = 0L;
    private boolean restart = false;

    private FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();

    private ExecutionContext executionContext;
    RestartDemoReader
    public () {
        
        reader.setResource(new ClassPathResource("restartDemo.csv"));

        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id", "firstName", "lastName", "birthdate"});

        DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper((fieldSet -> {
            return Customer.builder().id(fieldSet.readLong("id"))
                    .firstName(fieldSet.readString("firstName"))
                    .lastName(fieldSet.readString("lastName"))
                    .birthdate(fieldSet.readString("birthdate"))
                    .build();
        }));
        lineMapper.afterPropertiesSet();

        reader.setLineMapper(lineMapper);
    }

    @Override
    public Customer read() throws Exception, UnexpectedInputException, ParseException,
            NonTransientResourceException {

        Customer customer = null;

        this.curLine++;
        //如果是重启,则从上一步读取的行数继续往下执行
        if (restart) {
            reader.setLinesToSkip(this.curLine.intValue()-1);
            restart = false;
            System.out.println("Start reading from line: " + this.curLine);
        }

        reader.open(this.executionContext);

        customer = reader.read();
        //当匹配到wrongName时,显示抛出异常,终止程序
        if (customer != null) {
            if (customer.getFirstName().equals("wrongName"))
                throw new RuntimeException("Something wrong. Customer id: " + customer.getId());
        } else {
            curLine--;
        }
        return customer;
    }

    /**
     * 判断是否是重启job
     * @param executionContext
     * @throws ItemStreamException
     */
    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        this.executionContext = executionContext;
        if (executionContext.containsKey("curLine")) {
            this.curLine = executionContext.getLong("curLine");
            this.restart = true;
        } else {
            this.curLine = 0L;
            executionContext.put("curLine", this.curLine.intValue());
        }

    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        System.out.println("update curLine: " + this.curLine);
        executionContext.put("curLine", this.curLine);

    }

    @Override
    public void close() throws ItemStreamException {

    }
}

   Job配置
       以10条记录为一个批次,进行读取

@Configuration
public class RestartDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    @Qualifier("flatFileDemoWriter")
    private ItemWriter<? super Customer> flatFileDemoWriter;

    @Autowired
    @Qualifier("restartDemoReader")
    private ItemReader<Customer> restartDemoReader;

    @Bean
    public Job restartDemoJob(){
        return jobBuilderFactory.get("restartDemoJob")
                .start(restartDemoStep())
                .build();

    }

    @Bean
    public Step restartDemoStep() {
        return stepBuilderFactory.get("restartDemoStep")
                .<Customer,Customer>chunk(10)
                .reader(restartDemoReader)
                .writer(flatFileDemoWriter)
                .build();
    }
}

 

      当我们第一次执行时,程序在33行抛出异常异常,curline值是30;

Spring Batch 之 ItemReader

     

    这时,我们可以查询数据库 batch_step_excution表,发现curline值已经以 键值对形式,持久化进数据库(上文以10条数据为一个批次;故33条数据异常时,curline值为30)

    Spring Batch 之 ItemReader

     接下来,我们更新wrongName,再次执行程序;
     程序会执行open方法,判断数据库step中map是否存在curline,如果存在,则是重跑,即读取curline,从该批次开始往下继续执行;

Spring Batch 之 ItemReader

Spring Batch 之 ItemReader