在Spark/Scala中写入HDFS
我正在写一个Spark/Scala程序来读取ZIP文件,将它们解压缩并将内容写入一组新文件。我可以将其写入本地文件系统,但是想知道是否有办法将输出文件写入分布式文件系统(如HDFS)。代码显示below`在Spark/Scala中写入HDFS
import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._
var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file: (String, PortableDataStream)) =>
{
val zipStream = new ZipInputStream(file._2.open)
val entry = zipStream.getNextEntry
val iter = scala.io.Source.fromInputStream(zipStream).getLines
val fname = f"/d/tmp/myfile$i.txt"
i = i + 1
val xx = iter.mkString
val writer = new PrintWriter(new File(fname))
writer.write(xx)
writer.close()
iter
}).collect()
`
您可以使用Hadoop的公用库(如果你正在使用SBT作为依赖manangement工具,加thath库到你的依赖)容易写数据到HDFS。这样,您可以创建一个文件系统对象:
private val fs = {
val conf = new Configuration()
FileSystem.get(conf)
}
一定要与你的Hadoop集群信息(核心site.xml的,等等)
然后,你可以写配置文件系统,例如字符串路径(在你的情况下,你应该处理数据流),在HDFS如下:
@throws[IOException]
def writeAsString(hdfsPath: String, content: String) {
val path: Path = new Path(hdfsPath)
if (fs.exists(path)) {
fs.delete(path, true)
}
val dataOutputStream: FSDataOutputStream = fs.create(path)
val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8"))
bw.write(content)
bw.close
}
你应该从官方文档看看方法saveAsTextFile:http://spark.apache.org/docs/latest/programming-guide.html
它可以让您保存到HDFS:
iter.saveAsTextFile("hdfs://...")
你可以试试saveAsTextFile方法。
将数据集的元素作为文本文件(或文本文件集)写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定目录中。 Spark将在每个元素上调用toString将其转换为文件中的一行文本。
它会将每个分区保存为一个不同的文件。除非您重新分区或合并,否则最终将使用的分区数将与输入文件数相同。
请看我上面的评论,为什么使用saveasTextFile是一个问题 – user2699504
不能你可以写整个RDD而不是单独的每个文件。而不是收集使用saveAsText文件? – NetanelRabinowitz
将所有解压缩数据连接成一个文件。这不是我想要的。我希望每个解压缩文件都在它自己的单独文件中 – user2699504
sc.binaryFiles("/user/example/zip_dir", 10) //make an RDD from *.zip files in HDFS
.flatMap((file: (String, PortableDataStream)) => { //flatmap to unzip each file
val zipStream = new ZipInputStream(file._2.open) //open a java.util.zip.ZipInputStream
val entry = zipStream.getNextEntry //get the first entry in the stream
val iter = Source.fromInputStream(zipStream).getLines //place entry lines into an iterator
iter.next //pop off the iterator's first line
iter //return the iterator
})
.saveAsTextFile("/user/example/quoteTable_csv/result.csv")
在该代码中iter不是RDD,所以不能写它。可能首先进行转换。 – dumitru
是的,我认为我们会在这里很好。 RDD应该是数据类型来操纵火花以便在集群上获得分布式数据。 – chateaur
这就是问题的症结所在。我已经尝试了所有我能想到的将我的数据传递给RDD以启用saveasTextFile的使用,但是结果很短。如果有人已经解决了这个问题,请让我知道 – user2699504