kafka与Spring的集成

1项目结构

kafka与Spring的集成

 

2 pom依赖

    <!-- kafka客户端支持包 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.1</version>
    </dependency>
    
    <!-- spring支持kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.0.4.RELEASE</version>
    </dependency>

3.application-kafka.xml配置文件

   <?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans"    
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
    xmlns:p="http://www.springframework.org/schema/p"  
    xmlns:aop="http://www.springframework.org/schema/aop"   
    xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:jee="http://www.springframework.org/schema/jee"  
    xmlns:tx="http://www.springframework.org/schema/tx" 
    xmlns:task="http://www.springframework.org/schema/task"
    xmlns:mvc="http://www.springframework.org/schema/mvc" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="    
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd  
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd  
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.0.xsd  
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.1.xsd
        http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-2.1.xsd
        ">
        
        <!-- 生产者 -->
        <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="192.168.28.129:9092,192.168.28.130:9092,192.168.28.131:9092"></entry>
                    <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"></entry>
                    <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"></entry>
                    <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"></entry>
                    <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"></entry>
                </map>
            </constructor-arg>
        </bean>
        <!-- 用于发送消息的模板 kafkaTemplate-->
        <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
            <constructor-arg ref="producerFactory"></constructor-arg>
            <constructor-arg name="autoFlush" value="true"></constructor-arg>
        </bean>
        
        <!-- 创建消费者 -->
       <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
               <constructor-arg>
                   <map>
                       <entry key="bootstrap.servers" value="192.168.28.129:9092,192.168.28.130:9092,192.168.28.131:9092"></entry>
                       <entry key="group.id" value="testGroup1"></entry>
                       <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"></entry>
                       <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"></entry>
                   </map>
               </constructor-arg>
       </bean>
       <!-- 消息消费者监听器 用于在接收到消息时执行相关的业务代码  (自行定义的类 用户于处理相应的业务操作)-->
       <bean id="consumerListener" class="com.java.kafka.KafkaConsumerListener"/>
       <!-- 用于将监听器和个某个主题绑定在一起 -->
       <bean id="containerProperties_example" class="org.springframework.kafka.listener.config.ContainerProperties">
               <constructor-arg value="test-topic"></constructor-arg>
               <property name="messageListener" ref="consumerListener"></property>
       </bean>
       <!-- 用于将消息监听器与消息消费者工厂绑定在一起 -->
       <bean id="messageListtenerContainer_example" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
               <constructor-arg ref="containerProperties_example"></constructor-arg>
               <constructor-arg ref="consumerFactory"></constructor-arg>
       </bean>             
 
</beans>  

4.Producer消息提供类

package com.java.kafka;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.kafka.core.KafkaTemplate;


/**
 * 
* @ClassName: SpringProducer  
* @Description: TODO(消息发送者)  
* @author lenovo  
* @date 2019年1月12日  
*
 */
public class SpringProducer {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ApplicationContext context = new ClassPathXmlApplicationContext("application-kafka.xml");
        KafkaTemplate<String, String> kafkaTemplate = (KafkaTemplate<String, String>)context.getBean("kafkaTemplate");
        kafkaTemplate.send("test-topic","我的测试消息1");
        kafkaTemplate.send("test-topic","我的测试消息2");
        kafkaTemplate.send("test-topic","我的测试消息3");
    }

}
 

5.消息消费者类

package com.java.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.Acknowledgment;

/**
 * 
* @ClassName: KafkaConsumerListener  
* @Description: TODO(消息消费者)  
* @author lenovo  
* @date 2019年1月12日  
*
 */
public class KafkaConsumerListener implements MessageListener<String, String>  {

    public void onMessage(ConsumerRecord<String, String> record) {
        // TODO Auto-generated method stub
        System.out.println("partition = " + record.partition() 
                  + " ,offset = " + record.offset() 
                  + " ,key = " + record.key() + " ,value = " + record.value());
    }

    public void onMessage(ConsumerRecord<String, String> data,
            Acknowledgment acknowledgment) {
        // TODO Auto-generated method stub
        
    }

    public void onMessage(ConsumerRecord<String, String> data,
            Consumer<?, ?> consumer) {
        // TODO Auto-generated method stub
        
    }

    public void onMessage(ConsumerRecord<String, String> data,
            Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
        // TODO Auto-generated method stub
        
    }

}
 

5.启动项目 运行SpringProducer类

  测试结果:

kafka与Spring的集成