springCloud整合Kafka消息总线
首先将相关的依赖导入pom.xml文件中
<dependencies>
<!-- springboot核心 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency> <!-- 加上这个才能辨认到log4j2.yml文件 -->
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<!-- springboot的web包 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 注册中心 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<!-- 配置中心 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<!-- 测试运行 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 健康度监控 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- 整合config + cloud bus,实现配置管理与刷新 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
<!-- swagger2构建API文档 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.6.1</version>
</dependency>
<!-- 实现热部署 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<!--POI-->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>3.9</version>
</dependency>
<!-- jpa注解依赖包 -->
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>persistence-api</artifactId>
<version>1.0</version>
</dependency>
<!--JACKSON开始-->
<!-- <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.8.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.8.8</version>
</dependency> -->
<!--JACKSON结束-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>
<!-- 依赖的模块 -->
<!--<dependency>
<groupId>com.migu</groupId>
<artifactId>wog-common</artifactId>
<version>1.5.0.RELEASE</version>
</dependency>-->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3.1</version>
</dependency>
<!--<dependency>
<groupId>com.migu</groupId>
<artifactId>product-mbase</artifactId>
<version>1.5.0.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</exclusion>
</exclusions>
</dependency>-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.36</version>
</dependency>
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/jaxen/jaxen -->
<dependency>
<groupId>jaxen</groupId>
<artifactId>jaxen</artifactId>
<version>1.1.6</version>
</dependency>
<!-- FTP -->
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.54</version>
</dependency>
<!-- ftp -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-ftp</artifactId>
<version>2.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.6</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20090211</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.14.4</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-feign</artifactId>
</dependency>
<!-- es配置包 -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>5.6.5</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.6.5</version>
</dependency>
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<version>5.3.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
<!-- spring-boot:run 中文乱码解决 -->
<jvmArguments>-Dfile.encoding=UTF-8</jvmArguments>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.2</version>
<configuration>
<systemPropertyVariables>
<jacoco-agent.destfile>target/jacoco.exec</jacoco-agent.destfile>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<id>prepare-agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>report</id>
<phase>prepare-package</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
<execution>
<id>post-unit-test</id>
<phase>test</phase>
<goals>
<goal>report</goal>
</goals>
<configuration>
<dataFile>target/jacoco.exec</dataFile>
<outputDirectory>target/jacoco-ut</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
下面是消息接收者的配置文件
测试kafka发送消息的Controller
@RestController
@RequestMapping("/kafka")
@Api(value = "/kafka", description = "kafka消息")
@Slf4j
public class KafkaSendController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送kafka消息
* @param topic 消息的topic
* @param jsonStr 消息内容
*/
@PostMapping("/sendMessageToTopic")
public void sendMessageToTopic(@RequestParam(value = "topic") String topic,
@RequestParam(value = "jsonStr", required = false) String jsonStr) {
if (StringUtils.isEmpty(jsonStr) || "empty".equalsIgnoreCase(jsonStr)) {
log.error("error param : {}", new ResponseDto(new Result("1400530012",
"jsonStr is null"),"sendMessageToTopic error"));
return;
}
kafkaTemplate.send(topic, jsonStr);
}
}
3、configuration:kafka consumer
1)通过@Configuration ,声明Config并且打开KafkaTemplate能力。
2)通过@Value注入application.properties配置文件中的kafka配置。
3)生成bean,@Bean
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.group-id}")
private String groupId;
@Value("${spring.kafka.listener.concurrency}")
private int concurrency;
@Value("${spring.kafka.consumer.max-poll-records}")
private int maxPollRecords;
@Value("${spring.kafka.consumer.max-poll-interval-ms}")
private int maxPollInterval;
@Value("${spring.kafka.consumer.poll-timeout}")
private int pollTimeout;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, pollTimeout);
return props;
}
@Bean("batchContainerFactory")
public ConcurrentKafkaListenerContainerFactory listenerContainer() {
ConcurrentKafkaListenerContainerFactory container = new
ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerConfigs()));
//设置并发量,小于或等于Topic的分区数
container.setConcurrency(concurrency);
//设置为批量监听
container.setBatchListener(true);
return container;
}
}
进行监听消息类,获取发送过来的信息
@Slf4j
@Component
public class KafkaConsumer {
@Autowired
KafkaService kafkaService;
/**
* 监听MQ消息
*/
@KafkaListener(topics = { “” }, containerFactory
= "batchContainerFactory")
public void listen(List<ConsumerRecord<?, ?>> recordList) {
log.info("------------------------batch get {} topic msg------------------------
", recordList.size());
for (ConsumerRecord<?, ?> record : recordList) {
final Optional<?> message = Optional.ofNullable(record.value());
log.info("get message record:" + record);
final String topic = record.topic();
if (message.isPresent()) {
try {
final String data = message.get().toString();
// 数据库新增修改
final ObjectMapper objectMapper = new ObjectMapper();
final Map<String, String> map = objectMapper.readValue(data, Map.class);
final String channelCode = map.get("name");
final String sourceChannelCode = map.get("age");
final String channelName = map.get("sex");
//赋值入对象进行数据库炒作
//省略相关代码
} catch (final DataAccessException e){
// 数据库的异常通过切面捕获,这里仅打印异常日志
log.error(e.getMessage(), e);
} catch (final Exception e) {
log.error("kafka message{} handle exception", topic, e);
return new BaseResponse(AlertCode.EXCEPTION,"system exception:" +
e.getMessage());
}
}
return new BaseResponse(AlertCode.SUCCESS,"success");
}
}
}
}