kafka发送自定义消息体(对象、数组)

在前面简单搭建了Windows上的kafka环境,并使用命令行测试可以运行之后(环境请参考:http://blog.csdn.net/u014104286/article/details/75040932)我们会考虑怎么使用kafka;先试着发送一个简单的消息,发送成功之后是否需要发送自定义的消息类尼?怎么发送自定义的消息类,如果我要发送一个集合呢?下面我们来一一解决我们的问题。


准备工作:

1.需要搭建并测试成功的kafka环境,并启动zookeeper和kafka服务。

2.创建一个可用的maven项目

3.添加开发kafkaka的依赖:

[html] view plain copy
  1. <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->  
  2. <dependency>  
  3.     <groupId>org.apache.kafka</groupId>  
  4.     <artifactId>kafka_2.11</artifactId>  
  5.     <version>0.10.2.0</version>  
  6. </dependency>  


准备工作完成。

1.首先我们要发送第一个消息,消息类型为String:

Producer发送消息类:

[java] view plain copy
  1. public class SimpleProducer {  
  2.     public static void main(String[] args) throws Exception{           
  3.           //Assign topicName to string variable  
  4.           String topicName = "newtest001";  
  5.           // create instance for properties to access producer configs    
  6.           Properties props = new Properties();  
  7.           //Assign localhost id  
  8.           props.put("bootstrap.servers""localhost:9092");  
  9.           //Set acknowledgements for producer requests.       
  10.           props.put("acks""all");  
  11.           //If the request fails, the producer can automatically retry,  
  12.           props.put("retries"0);  
  13.           //Specify buffer size in config  
  14.           props.put("batch.size"16384);  
  15.           //Reduce the no of requests less than 0    
  16.           props.put("linger.ms"1);  
  17.           //The buffer.memory controls the total amount of memory available to the producer for buffering.    
  18.           props.put("buffer.memory"33554432);  
  19.           props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");  
  20.           props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");  
  21.    
  22.           Producer<String, String> producer = new KafkaProducer<String, String>(props);  
  23.    
  24.           for(int i = 0; i < 10; i++)  
  25.              producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));  
  26.                    System.out.println("Message sent successfully");  
  27.                    producer.close();  
  28.        }  
  29. }  

Consumer接收消息类:

[java] view plain copy
  1. public class SimpleConsumer {  
  2.     public static void main(String[] args) throws Exception {  
  3.           //Kafka consumer configuration settings  
  4.           String topicName = "newtest001";  
  5.           Properties props = new Properties();  
  6.    
  7.           props.put("bootstrap.servers""localhost:9092");  
  8.           props.put("group.id""test");  
  9.           props.put("enable.auto.commit""true");  
  10.           props.put("auto.commit.interval.ms""1000");  
  11.           props.put("session.timeout.ms""30000");  
  12.           props.put("key.deserializer",  
  13.              "org.apache.kafka.common.serialization.StringDeserializer");  
  14.           props.put("value.deserializer",  
  15.              "org.apache.kafka.common.serialization.StringDeserializer");  
  16.           @SuppressWarnings("resource")  
  17.         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);  
  18.    
  19.           //Kafka Consumer subscribes list of topics here.  
  20.           consumer.subscribe(Arrays.asList(topicName));  
  21.    
  22.           //print the topic name  
  23.           System.out.println("Subscribed to topic "+ topicName);  
  24.           while (true) {  
  25.              ConsumerRecords<String, String> records = consumer.poll(100);  
  26.              for (ConsumerRecord<String, String> record : records)  
  27.    
  28.              // print the offset,key and value for the consumer records.  
  29.              System.out.printf("offset = %d, key = %s, value = %s\n",  
  30.                 record.offset(), record.key(), record.value());  
  31.           }  
  32.    
  33.        }  
  34. }  
以上内容参考:https://www.w3cschool.cn/apache_kafka/apache_kafka_simple_producer_example.html

启动Consumers类,等待并接收Producer发送的消息:运行Producer发送消息,Consumers接收到的消息:

kafka发送自定义消息体(对象、数组)

说明能成功发送和接收消息。上面发送的消息都是字符,我们如果需要发送一个PerSon这样JavaBean那怎么做呢?

我们可以先观察上面的Producer和Consumers,他们在实例化一个Producer和一个Consumers的时候需要一些参数,其中有: props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");可见是对消息的key和value的序列化指定类。我们到org.apache.kafka.common.serialization.StringSerializer中可以看见这个类实现了

org.apache.kafka.common.serialization.Deserializer<T>和org.apache.kafka.common.serialization.Serializer<T>

kafka发送自定义消息体(对象、数组)kafka发送自定义消息体(对象、数组)

我们也分别实现序列化和反序列化的借口即可:

DecodeingKafka类:

[java] view plain copy
  1. import java.util.Map;  
  2. import org.apache.kafka.common.serialization.Deserializer;  
  3. import com.ys.test.SpringBoot.zktest.util.BeanUtils;  
  4.   
  5. public class DecodeingKafka implements Deserializer<Object> {  
  6.   
  7.     @Override  
  8.     public void configure(Map<String, ?> configs, boolean isKey) {  
  9.     }  
  10.   
  11.     @Override  
  12.     public Object deserialize(String topic, byte[] data) {  
  13.         return BeanUtils.byte2Obj(data);  
  14.     }  
  15.   
  16.     @Override  
  17.     public void close() {  
  18.           
  19.     }  
  20. }  

EncodeingKafka类:

[java] view plain copy
  1. import java.util.Map;  
  2. import org.apache.kafka.common.serialization.Serializer;  
  3. import com.ys.test.SpringBoot.zktest.util.BeanUtils;  
  4. public class EncodeingKafka implements Serializer<Object> {  
  5.     @Override  
  6.     public void configure(Map configs, boolean isKey) {  
  7.           
  8.     }  
  9.     @Override  
  10.     public byte[] serialize(String topic, Object data) {  
  11.         return BeanUtils.bean2Byte(data);  
  12.     }  
  13.     /* 
  14.      * producer调用close()方法是调用 
  15.      */  
  16.     @Override  
  17.     public void close() {  
  18.         System.out.println("EncodeingKafka is close");  
  19.     }  
  20. }  

之后我们需要定义JavaBean对象怎么序列化和反序列化,我们使用ObjectOutputStream和ObjectInputStream实现。(大家也可以考虑更高效的序列化方法)

BeanUtils类:

[java] view plain copy
  1. import java.io.ByteArrayInputStream;  
  2. import java.io.ByteArrayOutputStream;  
  3. import java.io.IOException;  
  4. import java.io.ObjectInputStream;  
  5. import java.io.ObjectOutputStream;  
  6.   
  7. public class BeanUtils {  
  8.     private BeanUtils() {}  
  9.     /** 
  10.      * 对象序列化为byte数组 
  11.      *  
  12.      * @param obj 
  13.      * @return 
  14.      */  
  15.     public static byte[] bean2Byte(Object obj) {  
  16.         byte[] bb = null;  
  17.         try (ByteArrayOutputStream byteArray = new ByteArrayOutputStream();  
  18.              ObjectOutputStream outputStream = new ObjectOutputStream(byteArray)){  
  19.             outputStream.writeObject(obj);  
  20.             outputStream.flush();  
  21.             bb = byteArray.toByteArray();  
  22.         } catch (IOException e) {  
  23.             e.printStackTrace();  
  24.         }  
  25.         return bb;  
  26.     }  
  27.     /** 
  28.      * 字节数组转为Object对象 
  29.      *  
  30.      * @param bytes 
  31.      * @return 
  32.      */  
  33.     public static Object byte2Obj(byte[] bytes) {  
  34.         Object readObject = null;  
  35.         try (ByteArrayInputStream in = new ByteArrayInputStream(bytes);  
  36.              ObjectInputStream inputStream = new ObjectInputStream(in)){  
  37.              readObject = inputStream.readObject();  
  38.         } catch (Exception e) {  
  39.             e.printStackTrace();  
  40.         }   
  41.         return readObject;  
  42.     }  
  43. }  

PerSon.java:

[java] view plain copy
  1. public class PerSon implements Serializable{  
  2.     /** 
  3.      * 
  4.      */  
  5.     private static final long serialVersionUID = 1L;  
  6.     private long userid;  
  7.     private String name;  
  8.     private int age;  
  9.     private String addr;  
  10.     private String eMail;  
  11.     private String userRole;  
  12.     private IDCard card;  
[java] view plain copy
  1. <span style="white-space:pre;"> </span>set... get...  

SimpleProducerPerSon.java:消息生产者:

[java] view plain copy
  1. import java.util.Arrays;  
  2. import java.util.List;  
  3. //import util.properties packages  
  4. import java.util.Properties;  
  5.   
  6. import org.apache.kafka.clients.producer.Callback;  
  7. //import KafkaProducer packages  
  8. import org.apache.kafka.clients.producer.KafkaProducer;  
  9. //import simple producer packages  
  10. import org.apache.kafka.clients.producer.Producer;  
  11. //import ProducerRecord packages  
  12. import org.apache.kafka.clients.producer.ProducerRecord;  
  13. import org.apache.kafka.clients.producer.RecordMetadata;  
  14.   
  15. import com.ys.test.SpringBoot.model.IDCard;  
  16. import com.ys.test.SpringBoot.model.PerSon;  
  17. public class SimpleProducerPerson {  
  18.     public static void main(String[] args) throws Exception{  
  19.             
  20.           //Assign topicName to string variable  
  21.           String topicName = "mypartition001";  
  22.           // create instance for properties to access producer configs     
  23.           Properties props = new Properties();  
  24.           //Assign localhost id  
  25.           props.put("bootstrap.servers""localhost:9092");  
  26.           //Set acknowledgements for producer requests.        
  27.           props.put("acks""all");  
  28.           //If the request fails, the producer can automatically retry,  
  29.           props.put("retries"0);  
  30.           props.put("metadata.fetch.timeout.ms"30000);  
  31.           //contorller the send method :sync or async default : sync  
  32.           //Specify buffer size in config  
  33.           props.put("batch.size"16384);  
  34.           //Reduce the no of requests less than 0     
  35.           props.put("linger.ms"1);  
  36.           //The buffer.memory controls the total amount of memory available to the producer for buffering.     
  37.           props.put("buffer.memory"33554432);  
  38.          <span style="color:#ff0000;"> props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");  
  39.           props.put("value.serializer""com.ys.test.SpringBoot.zktest.encode.EncodeingKafka");</span>  
  40. //        props.put("partitioner.class", "继承了Partition的类,实现的是根据指定的算法把消息推送到指定的分区中com.ys.test.SpringBoot.zktest.util.MyPartition");  
  41.             
  42.           Producer<String, Object> producer = new KafkaProducer<String, Object>(props);  
  43.         long startTimes = System.currentTimeMillis();  
  44.           System.out.println();  
  45.             
  46.           for(int i = 0; i < 2; i++){  
  47.                 
  48.               final int index = i;  
  49.               PerSon perSon = new PerSon();  
  50.               perSon.setAge(i);  
  51.               perSon.setAddr("My Producer TEST001"+i);  
  52.               perSon.setName("MyTest "+i);  
  53.               IDCard card = new IDCard();  
  54.               card.setCardName("MyTest"+i+"'s idCard");  
  55.               card.setCardid(10000000000L);  
  56.               perSon.setCard(card);  
  57.                 
  58.               List<PerSon> asList = Arrays.asList(perSon,perSon);  
  59. //            producer.send(new ProducerRecord<String, Object>(topicName,Integer.toString(i),asList));  
  60. //            producer.send(new ProducerRecord<String, Object>(topicName, Integer.toString(i), perSon));  
  61.               producer.send(new ProducerRecord<String, Object>(topicName, Integer.toString(i), asList), new Callback() {  
  62.                   
  63.                 @Override  
  64.                 public void onCompletion(RecordMetadata metadata, Exception exception) {  
  65.                     if (metadata != null) {  
  66.                         System.out.println(index+"  发送成功:"+"checksum: "+metadata.checksum()+" offset: "+metadata.offset()+" partition: "+metadata.partition()+" topic: "+metadata.topic());  
  67.                     }  
  68.                     if (exception != null) {  
  69.                         System.out.println(index+"异常:"+exception.getMessage());  
  70.                     }  
  71.                 }  
  72.             });  
  73.           }  
  74.           producer.close();  
  75.        }  
  76. }  

SimpleConsumersPerSon.java 消息接收者:

[java] view plain copy
  1. import java.util.Arrays;  
  2. import java.util.Properties;  
  3.   
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;  
  5. import org.apache.kafka.clients.consumer.ConsumerRecords;  
  6. import org.apache.kafka.clients.consumer.KafkaConsumer;  
  7. public class SimpleConsumerPerSon {  
  8.     public static void main(String[] args) throws Exception {  
  9.   
  10.           String topicName = "mypartition001";  
  11.           Properties props = new Properties();  
  12.             
  13.           props.put("bootstrap.servers""localhost:9092");  
  14.           props.put("group.id""partitiontest05");  
  15.           props.put("enable.auto.commit""true");   
  16.           props.put("auto.commit.interval.ms""1000");  
  17.           props.put("session.timeout.ms""30000");  
  18.             
  19.           //要发送自定义对象,需要指定对象的反序列化类  
  20.          <span style="color:#ff0000;"> props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");  
  21.           props.put("value.deserializer""com.ys.test.SpringBoot.zktest.encode.DecodeingKafka");</span>  
  22.             
  23.           //使用String时可以使用系统的反序列化类  
  24. //        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
  25. //        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
  26.           @SuppressWarnings("resource")  
  27.         KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);  
  28.           //Kafka Consumer subscribes list of topics here.  
  29.           consumer.subscribe(Arrays.asList(topicName));  
  30.           //print the topic name  
  31.           System.out.println("Subscribed to topic "+ topicName);  
  32.             
  33.             
  34.           while (true) {  
  35.               ConsumerRecords<String, Object> records = consumer.poll(100);  
  36.              for (ConsumerRecord<String, Object> record : records)  
  37.              // print the offset,key and value for the consumer records.  
  38. //           System.out.printf("offset = %d, key = %s, value = %s\n",   
  39. //              record.offset(), record.key(), record.value().toString());  
  40.                    
  41.                  System.out.println(record.toString());  
  42.           }  
  43.             
  44.        }  
  45. }  

以上的发送者和接收者他们的key的序列化类还是StringDeserializer,但是value的序列化需要指定为我们自己的。

运行接收者和发送者,观察结果:(结果是发送一个集合和发送一个对象)

发送一个对象person接收的结果:

ConsumerRecord(topic = mypartition001, partition = 0, offset = 29, CreateTime = 1502457680160, checksum = 3691507046, serialized key size = 1, serialized value size = 391, key = 0, value = PerSon [userid=0, name=MyTest 0, age=0, addr=My Producer TEST0010, eMail=null, userRole=null, card=IDCard [cardid=10000000000, cardName=MyTest0's idCard]])
ConsumerRecord(topic = mypartition001, partition = 0, offset = 30, CreateTime = 1502457680175, checksum = 1443537499, serialized key size = 1, serialized value size = 391, key = 1, value = PerSon [userid=0, name=MyTest 1, age=1, addr=My Producer TEST0011, eMail=null, userRole=null, card=IDCard [cardid=10000000000, cardName=MyTest1's idCard]])

发送asList集合的结果:
ConsumerRecord(topic = mypartition001, partition = 0, offset = 31, CreateTime = 1502457689533, checksum = 3469353517, serialized key size = 1, serialized value size = 524, key = 0, value = [PerSon [userid=0, name=MyTest 0, age=0, addr=My Producer TEST0010, eMail=null, userRole=null, card=IDCard [cardid=10000000000, cardName=MyTest0's idCard]], PerSon [userid=0, name=MyTest 0, age=0, addr=My Producer TEST0010, eMail=null, userRole=null, card=IDCard [cardid=10000000000, cardName=MyTest0's idCard]]])
ConsumerRecord(topic = mypartition001, partition = 0, offset = 32, CreateTime = 1502457689552, checksum = 1930168239, serialized key size = 1, serialized value size = 524, key = 1, value = [PerSon [userid=0, name=MyTest 1, age=1, addr=My Producer TEST0011, eMail=null, userRole=null, card=IDCard [cardid=10000000000, cardName=MyTest1's idCard]], PerSon [userid=0, name=MyTest 1, age=1, addr=My Producer TEST0011, eMail=null, userRole=null, card=IDCard [cardid=10000000000, cardName=MyTest1's idCard]]])

这样我们不管是发送的是一个对象还是一个集合我们都可以正确发送和接收了。