springboot-kafka 生产者和消费者分离
创建两个maven工程,produce和consume:
pom:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.31</version>
</dependency>
</dependencies>
一、produce生产者:
1、配置文件appilication.properties:
server.port = 8081
#kafka
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=localhost:9092
# 指定默认消费者group id
spring.kafka.consumer.group-id=myGroup
# 指定默认topic id
spring.kafka.template.default-topic= my-replicated-topic
# 指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency= 3
# 每次批量发送消息的数量
spring.kafka.producer.batch-size= 1000
#key-value序列化反序列化
#spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.buffer-memory=524288
2、消息实体:
package com.produce.dto;
import java.io.Serializable;
public class Message implements Serializable{
/**
*
*/
private static final long serialVersionUID = -8921653281090412125L;
private String msgId;
private String content;
/*此处省略get和set方法*/
@Override
public String toString() {
return "Message [msgId=" + msgId + ", content=" + content + "]";
}
}
3、生产者:
package com.produce.rest;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.produce.dto.Message;
import com.produce.util.DateUtil;
/**
* @Description: 生产者
*/
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 发送消息到kafka,主题为test
*/
public void sendTest(){
kafkaTemplate.send("test","hello,kafka " + DateUtil.getCurentTime());
}
/**
* 发送消息到kafka,主题为wtyymsg
*/
public void sendMessageDTO(){
String uuid = UUID.randomUUID().toString();
Message message = new Message();
message.setMsgId(uuid);
message.setContent(DateUtil.getCurentTime()+"时产生消息"+uuid);
kafkaTemplate.send("wtyymsg",JSON.toJSONString(message));
}
}
4、定时器定时发送:
package com.produce.rest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class KafkaScheduled {
private static Logger logger = LoggerFactory.getLogger(KafkaScheduled.class);
@Autowired
private KafkaProducer kafkaSender;
// 每隔20ms执行一次
@Scheduled(fixedRate = 1000 * 20)
public void testKafka() throws Exception {
logger.info("KafkaScheduled...start");
kafkaSender.sendTest();
}
/**
* 一分钟执行一次,发送一次消息
*/
@Scheduled(cron = "0 0/1 * * * ? ")
public void msgKafka() throws Exception {
logger.info("KafkaMsgScheduled...start");
kafkaSender.sendMessageDTO();
}
}
5、启动类:
package com.produce;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* @Description: 生产者启动
*/
@EnableScheduling
@SpringBootApplication
public class ProduceStart {
protected static Logger logger=LoggerFactory.getLogger(ProduceStart.class);
public static void main(String[] args) {
SpringApplication.run(ProduceStart.class, args);
logger.info("----------------SpringBoot Start Success-------------------");
}
}
二、消费者:
1、application.properties:仅端口号和生产者不一样(否则会出现端口号占用的情况):
server.port = 8082
2、消息实体:package com.consume.dto.Message,同生产者;
3、消费者:
package com.consume.rest;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import com.consume.dto.Message;
@Service
public class KafkaConsumer {
protected static Logger logger=LoggerFactory.getLogger(KafkaConsumer.class);
/**
* 监听test主题,有消息就读取
* @param message
*/
@KafkaListener(topics = {"test"})
public void consumer(String message){
System.out.println("test topic message : "+ message);
}
/**
* 监听wtyymsg主题消息
*/
@KafkaListener(topics = {"wtyymsg"})
public void consumeMsgDTO(String message){
Message messageDTO = JSON.parseObject(message, Message.class);
System.err.println("msg接收到消息:"+messageDTO);
}
}
接收消息也可以这样写:
/**
* 监听wtyymsg主题消息
*/
@KafkaListener(topics = {"wtyymsg"})
public void consumeMsgDTO(ConsumerRecord<?, ?> consumerRecord){
String message = (String) consumerRecord.value();
Message messageDTO = JSON.parseObject(message, Message.class);
System.err.println("wtyymsg接收到消息:"+messageDTO);
}
4、启动类:
package com.consume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Description: 消费者启动
*/
@SpringBootApplication
public class ConsumeStart {
protected static Logger logger=LoggerFactory.getLogger(ConsumeStart.class);
public static void main(String[] args) {
SpringApplication.run(ConsumeStart.class, args);
logger.info("----------------SpringBoot Start Success-------------------");
}
}
分别启动生产者和消费者(顺序无要求),消费者后台打印: