IOException写入文件到Hdfs

问题描述:

我正在从一个mqtt代理程序获取gps数据并将其加载到hadoop集群中。试图将数据写入hdfs,我得到和IOException。下面是完整的堆栈跟踪:IOException写入文件到Hdfs

java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status; Host Details : local host is: "quickstart.cloudera/192.168.25.170"; destination host is: "quickstart.cloudera":8020; 
    at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:765) 
    at org.apache.hadoop.ipc.Client.call(Client.java:1165) 
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:184) 
    at com.sun.proxy.$Proxy7.create(Unknown Source) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:165) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:84) 
    at com.sun.proxy.$Proxy7.create(Unknown Source) 
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:187) 
    at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1250) 
    at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1269) 
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1063) 
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1021) 
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:232) 
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:75) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:806) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:686) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:675) 
    at com.mqttHadoopLoader.hadoop.MqttLoader.HdfsWriter.writeToHdfs(HdfsWriter.java:19) 
    at com.mqttHadoopLoader.hadoop.MqttLoader.MqttDataLoader.messageArrived(MqttDataLoader.java:43) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:354) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:162) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status 
    at com.google.protobuf.UninitializedMessageException.asInvalidProtocolBufferException(UninitializedMessageException.java:81) 
    at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto$Builder.buildParsed(RpcPayloadHeaderProtos.java:1094) 
    at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto$Builder.access$1300(RpcPayloadHeaderProtos.java:1028) 
    at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto.parseDelimitedFrom(RpcPayloadHeaderProtos.java:986) 
    at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:850) 
    at org.apache.hadoop.ipc.Client$Connection.run(Client.java:781) 

好像当我尝试创建的OutputStream但很难说,因为我的Eclipse调试器工作不正常的错误发生(它说,它无法连接到虚拟机和我已经尝试了很多修补程序发布在这里*)。这是我为我的HdfsWriter代码:

String destFile = "hdfs://127.0.0.0.1:8020/gpsData/output/gps_data.txt"; 
//Note *this is just a placeholder IP address for the purpose of this post. I do have the fully correct IP address for the program. 

    public void writeToHdfs(String gpsInfo) { 
     try { 
      Configuration conf = new Configuration(); 
      System.out.println("Connecting to -- " + conf.get("fs.defaultFS")); 

      FileSystem fs = FileSystem.get(URI.create(destFile), conf); 
      System.out.println(fs.getUri()); 

      // Error seems to occur here 
      OutputStream outStream = fs.create(new Path(destFile)); 

      byte[] messageByt = gpsInfo.getBytes(); 
      outStream.write(messageByt); 
      outStream.close(); 

      System.out.println(destFile + " copied to HDFS"); 

     } catch (FileNotFoundException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

这是对HdfsWriter调用MQTT方法:

public void messageArrived(String topic, MqttMessage message) 
     throws Exception { 
      System.out.println(message); 
      HdfsWriter hdfsWriter = new HdfsWriter(); 
      hdfsWriter.writeToHdfs(message.toString()); 
    } 

我还是真正的新的Hadoop所以任何和所有帮助将是巨大的。

更新 我有我的调试工作,并可以明确告诉你,每当我尝试调用FileSystem方法时发生错误。例如,错误也由fs.exists(pt)fs.setReplication()触发。

+0

您需要仔细阅读错误消息。它告诉你什么是错误的:* InvalidProtocolBufferException:消息缺少必需的字段:callId,status; *。所以你发送无效的数据到服务器。问题在于你发送的内容,而不是你如何发送。 –

+0

@JB Nizet是的,我有**阅读**错误信息。多次。一遍又一遍地。我甚至不知道错误消息正在谈论的是什么“消息”。如果你查看了我包含的代码,你会发现我没有直接调用任何名为“Message”的东西。我有我的调试工作,并可以告诉你,它发生在我尝试调用的任何FileSystem消息(即它也发生在'fs.exists(pt)'或'fs.setReplication(pt,(short)1)' )。我无法找到这个错误直接发生的地方。 – ebbBliss

我相信hdfs使用谷歌protobuf库。您的客户端代码似乎使用了错误(不兼容)的protobuf版本。尝试挖掘这个方向。

HDFS客户端和NameNode之间的协议使用Google Protocol Buffers对消息进行序列化。该错误表明客户端发送的消息不包含所有预期的字段,因此它与服务器不兼容。

这可能表示您正在运行比NameNode版本旧的HDFS客户端版本。例如,callId字段在由Apache JIRA问题HADOOP-9762跟踪的功能中实现,并在Apache Hadoop 2.1.0-beta中提供。该版本之前的客户端不会在其邮件中包含callId,因此它将与运行2.1.0-beta或更高版本的NameNode不兼容。

我建议检查您的客户端应用程序以确保它使用的Hadoop客户端库版本与Hadoop集群版本相匹配。从堆栈跟踪看来,您使用的是Cloudera发行版。如果是这样,那么通过使用Cloudera在其Maven存储库中提供的匹配客户端库依赖项版本,您可能会获得最大的成功。详情请参阅Using the CDH 5 Maven Repository