用@KafkaListener批量接收消息
之前介绍了如何在SpringBoot中集成Kafka,但是默认情况下,@KafkaListener都是一条一条消费,如果想要一次消费一个批量的话,我们都知道,在kafka原生的API可以通过poll(num)来获取一次获取num条消息:
那么使用在Springboot中使用@KafkaListener能否实现批量监听呢?
看了spring-kafka的官方文档介绍,可以知道自1.1版本之后,@KafkaListener开始支持批量消费,只需要设置batchListener参数为true
https://docs.spring.io/spring-kafka/reference/html/_reference.html
下面是我在项目中使用到的方法:
1
2
3
4
5
6
7
8
9
10
|
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.setBatchListener(true);//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);//设置提交偏移量的方式
return factory;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//每一批数量
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
|
开始监听,批量消费后采用JPA的方式批量写入数据库,这里containerFactory = “batchFactory”要指定为批量消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
@KafkaListener(topics = "${tgs.kafka.topics}", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
logger.info("records.size: " + records.size() + " in all");
List<B_ZDRYGK_ZDRYXX_FJ_HCB1> list = new ArrayList<B_ZDRYGK_ZDRYXX_FJ_HCB1>();
ObjectMapper mapper = new ObjectMapper();
B_ZDRYGK_ZDRYXX_FJ_HCB1 b_zdrygk_zdryxx_fj_hcb1 =null;
for (ConsumerRecord<?, ?> record : records) {
try {
b_zdrygk_zdryxx_fj_hcb1 = mapper.readValue(record.value().toString(), B_ZDRYGK_ZDRYXX_FJ_HCB1.class);
} catch (IOException e) {
e.printStackTrace();
}
if (null != b_zdrygk_zdryxx_fj_hcb1) {
list.add(b_zdrygk_zdryxx_fj_hcb1);
}
}
try {
List<B_ZDRYGK_ZDRYXX_FJ_HCB1> hcb1List = b_ZDRYGK_ZDRYXX_FJ_HCB1Repository.save(list);
b_ZDRYGK_ZDRYXX_FJ_HCB1Repository.flush();
logger.info("flush size: " + hcb1List.size());
} catch (Exception e) {
e.printStackTrace();
}finally{
logger.info("start commit offset");
ack.acknowledge();//手动提交偏移量
logger.info("stop commit offset");
}
}
|
运行结果如下图,每批消费100条消息: