springCloud整合Kafka消息总线

首先将相关的依赖导入pom.xml文件中

    <dependencies>

		<!-- springboot核心 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
			<exclusions>
			    <exclusion>
			      <groupId>org.springframework.boot</groupId>
			      <artifactId>spring-boot-starter-logging</artifactId>
			    </exclusion>
			</exclusions>
		</dependency>
		<dependency>
			  <groupId>org.springframework.boot</groupId>
			  <artifactId>spring-boot-starter-log4j2</artifactId>
		</dependency>
		<dependency>  <!-- 加上这个才能辨认到log4j2.yml文件 -->
			<groupId>com.fasterxml.jackson.dataformat</groupId>
			<artifactId>jackson-dataformat-yaml</artifactId>
   		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>log4j-over-slf4j</artifactId>
		</dependency>

		<!-- springboot的web包 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<!-- 注册中心 -->
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-eureka</artifactId>
		</dependency>
		
		<!-- 配置中心 -->
		<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>

		<!-- 测试运行 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<!-- 健康度监控 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>

		<!-- 整合config + cloud bus,实现配置管理与刷新 -->
		<dependency>
		  <groupId>org.springframework.cloud</groupId>
		  <artifactId>spring-cloud-starter-bus-kafka</artifactId>
		</dependency>

		<!-- swagger2构建API文档 -->
		<dependency>
			<groupId>io.springfox</groupId>
			<artifactId>springfox-swagger2</artifactId>
			<version>2.6.1</version>
		</dependency>
		<dependency>
			<groupId>io.springfox</groupId>
			<artifactId>springfox-swagger-ui</artifactId>
			<version>2.6.1</version>
		</dependency>

		<!-- 实现热部署 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<optional>true</optional>
		</dependency>
		<!--POI-->
		<dependency>
			<groupId>org.apache.poi</groupId>
			<artifactId>poi-ooxml</artifactId>
			<version>3.9</version>
		</dependency>
		<!-- jpa注解依赖包 -->
		<dependency>
			<groupId>javax.persistence</groupId>
			<artifactId>persistence-api</artifactId>
			<version>1.0</version>
		</dependency>
		<!--JACKSON开始-->
		<!-- <dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.8.8</version>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-annotations</artifactId>
			<version>2.8.8</version>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-core</artifactId>
			<version>2.8.8</version>
		</dependency> -->
		<!--JACKSON结束-->
		<dependency>
	        <groupId>org.mybatis.spring.boot</groupId>
	        <artifactId>mybatis-spring-boot-starter</artifactId>
	        <version>1.1.1</version>
   		 </dependency>
	     <dependency>
	        <groupId>mysql</groupId>
	        <artifactId>mysql-connector-java</artifactId>
	    </dependency>
        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper-spring-boot-starter</artifactId>
            <version>1.1.2</version>
        </dependency>
        <dependency>
            <groupId>tk.mybatis</groupId>
            <artifactId>mapper-spring-boot-starter</artifactId>
            <version>1.1.1</version>
        </dependency>
        <!-- 依赖的模块 -->
    	<!--<dependency>
            <groupId>com.migu</groupId>
            <artifactId>wog-common</artifactId>
            <version>1.5.0.RELEASE</version>
        </dependency>-->
        <dependency>
			<groupId>com.google.code.gson</groupId>
			<artifactId>gson</artifactId>
			<version>2.3.1</version>
		</dependency>
		<!--<dependency>
            <groupId>com.migu</groupId>
            <artifactId>product-mbase</artifactId>
            <version>1.5.0.RELEASE</version>
            <exclusions>
				<exclusion>
					<groupId>org.springframework.boot</groupId>
					<artifactId>spring-boot-devtools</artifactId>
				</exclusion>
			</exclusions>
        </dependency>-->
		<dependency>
		    <groupId>com.alibaba</groupId>
		    <artifactId>fastjson</artifactId>
		    <version>1.2.36</version>
		</dependency>
		<dependency>
		    <groupId>dom4j</groupId>
		    <artifactId>dom4j</artifactId>
		    <version>1.6.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/jaxen/jaxen -->
		<dependency>
		    <groupId>jaxen</groupId>
		    <artifactId>jaxen</artifactId>
		    <version>1.1.6</version>
		</dependency>
		<!-- FTP -->
        <dependency>
		    <groupId>com.jcraft</groupId>
		    <artifactId>jsch</artifactId>
		    <version>0.1.54</version>
		</dependency>
		<!-- ftp -->
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-ftp</artifactId>
			<version>2.13.2</version>
		</dependency>
		<dependency>
		    <groupId>org.apache.commons</groupId>
		    <artifactId>commons-lang3</artifactId>
		    <version>3.0</version>
	    </dependency>
		<dependency>
		   <groupId>com.alibaba</groupId>
		   <artifactId>druid-spring-boot-starter</artifactId>
		   <version>1.1.6</version>
		</dependency>
		<dependency>
		    <groupId>net.sf.json-lib</groupId>
		    <artifactId>json-lib</artifactId>
		    <version>2.4</version>
		    <classifier>jdk15</classifier>
		</dependency>
		<dependency>
			<groupId>org.json</groupId>
			<artifactId>json</artifactId>
			<version>20090211</version>
		</dependency>
		
		<dependency>
		    <groupId>org.projectlombok</groupId>
		    <artifactId>lombok</artifactId>
		    <version>1.14.4</version>
		</dependency>
		
		<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-feign</artifactId>
       	</dependency>
		<!-- es配置包 -->
		<dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>5.6.5</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>5.6.5</version>
        </dependency>
        <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
            <version>5.3.3</version>
        </dependency>
    </dependencies>
	

    <build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<fork>true</fork>
					<!-- spring-boot:run 中文乱码解决 -->
					<jvmArguments>-Dfile.encoding=UTF-8</jvmArguments>
				</configuration>
			</plugin>
			<plugin>
	            <groupId>org.apache.maven.plugins</groupId>
	            <artifactId>maven-compiler-plugin</artifactId>
	            <version>3.1</version>
	            <configuration>
	                <source>${java.version}</source>
	                <target>${java.version}</target>
	            </configuration>
	        </plugin>

			<plugin>
				<groupId>org.jacoco</groupId>
				<artifactId>jacoco-maven-plugin</artifactId>
				<version>0.8.2</version>
				<configuration>
					<systemPropertyVariables>
						<jacoco-agent.destfile>target/jacoco.exec</jacoco-agent.destfile>
					</systemPropertyVariables>
				</configuration>
				<executions>
					<execution>
						<id>prepare-agent</id>
						<goals>
							<goal>prepare-agent</goal>
						</goals>
					</execution>
					<execution>
						<id>report</id>
						<phase>prepare-package</phase>
						<goals>
							<goal>report</goal>
						</goals>
					</execution>
					<execution>
						<id>post-unit-test</id>
						<phase>test</phase>
						<goals>
							<goal>report</goal>
						</goals>
						<configuration>
							<dataFile>target/jacoco.exec</dataFile>
							<outputDirectory>target/jacoco-ut</outputDirectory>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>

下面是消息接收者的配置文件

springCloud整合Kafka消息总线

测试kafka发送消息的Controller

@RestController
@RequestMapping("/kafka")
@Api(value = "/kafka", description = "kafka消息")
@Slf4j
public class KafkaSendController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 发送kafka消息
     * @param topic 消息的topic
     * @param jsonStr 消息内容
     */
    @PostMapping("/sendMessageToTopic")
    public void sendMessageToTopic(@RequestParam(value = "topic") String topic,
            @RequestParam(value = "jsonStr", required = false) String jsonStr) {
        if (StringUtils.isEmpty(jsonStr) || "empty".equalsIgnoreCase(jsonStr)) {
            log.error("error param : {}", new ResponseDto(new Result("1400530012",             
               "jsonStr is null"),"sendMessageToTopic error"));
            return;
        }
        kafkaTemplate.send(topic, jsonStr);
    }
}

3、configuration:kafka consumer

1)通过@Configuration ,声明Config并且打开KafkaTemplate能力。

2)通过@Value注入application.properties配置文件中的kafka配置。

3)生成bean,@Bean

@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.group-id}")
    private String groupId;
    @Value("${spring.kafka.listener.concurrency}")
    private int concurrency;
    @Value("${spring.kafka.consumer.max-poll-records}")
    private int maxPollRecords;
    @Value("${spring.kafka.consumer.max-poll-interval-ms}")
    private int maxPollInterval;
    @Value("${spring.kafka.consumer.poll-timeout}")
    private int pollTimeout;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
         StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, pollTimeout);
        return props;
    }
    @Bean("batchContainerFactory")
    public ConcurrentKafkaListenerContainerFactory listenerContainer() {
        ConcurrentKafkaListenerContainerFactory container = new 
        ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerConfigs()));
        //设置并发量,小于或等于Topic的分区数
        container.setConcurrency(concurrency);
        //设置为批量监听
        container.setBatchListener(true);
        return container;
    }

}

进行监听消息类,获取发送过来的信息

@Slf4j
@Component
public class KafkaConsumer {
    @Autowired
    KafkaService kafkaService;
    /**
     * 监听MQ消息
     */
    @KafkaListener(topics = { “” }, containerFactory 
       = "batchContainerFactory")
    public void listen(List<ConsumerRecord<?, ?>> recordList) {
        log.info("------------------------batch get {} topic msg------------------------        
        ", recordList.size());
        for (ConsumerRecord<?, ?> record : recordList) {
          final Optional<?> message = Optional.ofNullable(record.value());
          log.info("get message record:" + record);
          final String topic = record.topic();
          if (message.isPresent()) {
              try {
                final String data = message.get().toString();
                // 数据库新增修改
                final ObjectMapper objectMapper = new ObjectMapper();
                final Map<String, String> map = objectMapper.readValue(data, Map.class);
                final String channelCode = map.get("name");
                final String sourceChannelCode = map.get("age");
                final String channelName = map.get("sex");
                //赋值入对象进行数据库炒作

                   //省略相关代码


              } catch (final DataAccessException e){
                // 数据库的异常通过切面捕获,这里仅打印异常日志
                log.error(e.getMessage(), e);
              } catch (final Exception e) {
                log.error("kafka message{} handle exception", topic, e);
            return new BaseResponse(AlertCode.EXCEPTION,"system exception:" +         
         e.getMessage());
       }
    }
    return new BaseResponse(AlertCode.SUCCESS,"success");
  }
        }
    }
}