文档计数/斯卡拉
问题描述:
我有一个文本变量,它是一个String RDD在阶文档计数/斯卡拉
val data = sc.parallelize(List("i am a good boy.Are you a good boy.","You are also working here.","I am posting here today.You are good."))
我在Scala的地图另一个变量(以下给出)
//列表对于该文档计数需要被发现的话,最初的文档数为1
val dictionary = Map("""good""" -> 1,"""working""" -> 1,"""posting""" -> 1).
我想要做的每一个字典术语的文件数量和获得的输出以键值格式
我的输出应该与以下数据类似。
(good,2)
(working,1)
(posting,1)
我曾尝试是
dictionary.map { case(k,v) => k -> k.r.findFirstIn(data.map(line => line.trim()).collect().mkString(",")).size}
我越来越算作1的所有的话。
请帮我解决上面的问题
在此先感谢。
答
为什么不使用flatMap来创建字典,然后你就可以查询它。
val dictionary = data.flatMap {case line => line.split(" ")}.map {case word => (word, 1)}.reduceByKey(_+_)
如果我收集这个在REPL我得到以下结果:
res9: Array[(String, Int)] = Array((here,1), (good.,1), (good,2), (here.,1), (You,1), (working,1), (today.You,1), (boy.Are,1), (are,2), (a,2), (posting,1), (i,1), (boy.,1), (also,1), (I,1), (am,2), (you,1))
很明显,你需要做一个更好的分流比我的简单例子。
答
首先你的字典应该是一个集合,因为从一般意义上讲,你需要将集合映射到包含它们的文档数量。
所以你的数据应该是这样的:
scala> val docs = List("i am a good boy.Are you a good boy.","You are also working here.","I am posting here today.You are good.")
docs: List[String] = List(i am a good boy.Are you a good boy., You are also working here., I am posting here today.You are good.)
你的字典应该是这样的:
scala> val dictionary = Set("good", "working", "posting")
dictionary: scala.collection.immutable.Set[String] = Set(good, working, posting)
然后你要实现你的改造,为contains
功能它可能看起来最简单的逻辑像:
scala> dictionary.map(k => k -> docs.count(_.contains(k))) toMap
res4: scala.collection.immutable.Map[String,Int] = Map(good -> 2, working -> 1, posting -> 1)
为了更好的解决方案,我会建议最终你实现特定的功能,满足您的要求
(字符串,字符串)=>布尔
确定期限的文件中存在:
scala> def foo(doc: String, term: String): Boolean = doc.contains(term)
foo: (doc: String, term: String)Boolean
然后最终解决方案将看起来像:
scala> dictionary.map(k => k -> docs.count(d => foo(d, k))) toMap
res3: scala.collection.immutable.Map[String,Int] = Map(good -> 2, working -> 1, posting -> 1)
你最不得不做的是t o使用SparkContext计算结果图。首先你必须定义你想要并行化的数据。假设我们想要并行化文档集合,那么解决方案可能如下所示:
val docsRDD = sc.parallelize(List(
"i am a good boy.Are you a good boy.",
"You are also working here.",
"I am posting here today.You are good."
))
docsRDD.mapPartitions(_.map(doc => dictionary.collect {
case term if doc.contains(term) => term -> 1
})).map(_.toMap) reduce { case (m1, m2) => merge(m1, m2) }
def merge(m1: Map[String, Int], m2: Map[String, Int]) =
m1 ++ m2 map { case (k, v) => k -> (v + m1.getOrElse(k, 0)) }