阅读现有的Avro文件并发送到Kafka

问题描述:

我有一个现有的Avro文件与模式。我需要将文件发送给Producer。阅读现有的Avro文件并发送到Kafka

以下是我写的代码。

public class ProducerDataSample { 

    public static void main(String[] args) { 

     String topic = "my-topic"; 

     Schema.Parser parser = new Schema.Parser(); 
     Schema schema = parser.parse(AvroSchemaDefinitionLoader.fromFile("encounter.avsc").get()); 

      File file = new File("/home/hello.avro"); 
     try{ 
     ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); 
     DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); 
     DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); 
     dataFileWriter.create(schema, outputStream); 
     dataFileWriter.appendTo(file); 
     dataFileWriter.close(); 
     System.out.println("Here comes the data: " + outputStream); 



     // Start KAFKA publishing 

     Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9092"); 
     props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 

     KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(props); 
     ProducerRecord<String, byte[]> producerRecord = null; 
     producerRecord = new ProducerRecord<String, byte[]>("m-topic","1",outputStream.toByteArray()); 
     messageProducer.send(producerRecord); 
     messageProducer.close(); 
     }catch(Exception e){ 
      System.out.println("Error in sending to kafka"); 
      e.printStackTrace(); 
     } 





    } 
} 

当我执行此我得到的错误:

Error in sending to kafka org.apache.avro.AvroRuntimeException: already open at org.apache.avro.file.DataFileWriter.assertNotOpen(DataFileWriter.java:85) at org.apache.avro.file.DataFileWriter.appendTo(DataFileWriter.java:203) at org.apache.avro.file.DataFileWriter.appendTo(DataFileWriter.java:193) at ProducerDataSample.main(ProducerDataSample.java:51)

任何帮助。 谢谢。

你将不得不从Avro的文件中读取数据,并将其序列化到字节数组

类似下面片断

 final Schema schema = new Schema.Parser().parse(new File("sample.avsc"));    
     File file ="sample.avro" 

     //read the avro file to GenericRecord 
     final GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>(schema); 
     final DataFileReader<GenericRecord> genericRecords = new DataFileReader<>(file, genericDatumReader); 

     //serialize GenericRecords 
     ByteArrayOutputStream out = new ByteArrayOutputStream(); 
     DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 

     Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(out, null); 

     while (genericRecords.hasNext()) { 
      writer.write(genericRecords.next(), binaryEncoder); 
     } 
     binaryEncoder.flush(); 
     out.close(); 
     //send out.toByteArray() to kakfa 
+0

感谢@liju约翰。有效。 :) – abhi5800