多行星火滑动窗口
问题描述:
我学习的Apache星火使用Scala,并想用它来处理跨多行这样的DNA数据集:多行星火滑动窗口
ATGTAT
ACATAT
ATATAT
我想这个映射到一个固定的群体大小k并计数组。因此,对于k = 3,我们会得到每个字符组与后面的两个字符:
ATG TGT GTA TAT ATA TAC
ACA CAT ATA TAT ATA TAT
ATA TAT ATA TAT
...再算上团体(如字数):
(ATA,5), (TAT,5), (TAC,1), (ACA,1), (CAT,1), (ATG,1), (TGT,1), (GTA,1)
的问题是, “字”跨越多行,正如上面的示例中的TAC
一样。它跨越了换行。我不想只计算每行中的组,但是在整个文件中忽略行结束。
换句话说,我想在整个文件上处理整个序列作为宽度为k的滑动窗口,就好像没有换行符一样。问题是向前看(或后退)到下一个RDD行,以在到达行尾时完成一个窗口。
两个想法,我所做的是:
- 追加K-1从下一行大字:
ATATATAC ACATATAT ATATAT
我试图与星火SQL铅()函数,但是当我尝试执行flatMap时,我得到了WindowSpec的NotSerializableException。有没有其他方法可以引用下一行?我需要编写自定义输入格式吗?
- 阅读整个序列中作为一个单一的线(或读取后加入行):
ATATATACATATATATAT
有一种读取多个线,所以他们可以作为一个处理?如果是这样,它是否都需要适应单个机器的内存?
我意识到这些都可以作为预处理步骤完成。我想知道最好的方法是在Spark内部完成。一旦我有了这些格式,我就知道如何去做其余的事情,但我被困在这里。
答
可以使单个字符串的RDD,而不是加入他们为一条线,因为这将会使结果不能被分配的字符串:
val rdd = sc.textFile("gene.txt")
// rdd: org.apache.spark.rdd.RDD[String] = gene.txt MapPartitionsRDD[4] at textFile at <console>:24
所以简单地用flatMap
分割线成字符的列表:
rdd.flatMap(_.split("")).collect
// res4: Array[String] = Array(A, T, G, T, A, T, A, C, A, T, A, T, A, T, A, T, A, T)
一个更完整的解决方案,从this answer借:
val rdd = sc.textFile("gene.txt")
// create the sliding 3 grams for each partition and record the edges
val rdd1 = rdd.flatMap(_.split("")).mapPartitionsWithIndex((i, iter) => {
val slideList = iter.toList.sliding(3).toList
Iterator((slideList, (slideList.head, slideList.last)))
})
// collect the edge values, concatenate edges from adjacent partitions and broadcast it
val edgeValues = rdd1.values.collect
val sewedEdges = edgeValues zip edgeValues.tail map { case (x, y) => {
(x._2 ++ y._1).drop(1).dropRight(1).sliding(3).toList
}}
val sewedEdgesMap = sc.broadcast(
(0 until rdd1.partitions.size) zip sewedEdges toMap
)
// sew the edge values back to the result
rdd1.keys.mapPartitionsWithIndex((i, iter) => iter ++ List(sewedEdgesMap.value.getOrElse(i, Nil))).
flatMap(_.map(_ mkString "")).collect
// res54: Array[String] = Array(ATG, TGT, GTA, TAT, ATA, TAC, ACA, CAT, ATA, TAT, ATA, TAT, ATA, TAT, ATA, TAT)
我认为问题依然存在:我如何在当前元素之前访问元素的两个位置。所以如果我在第一个元素'A'上,我如何展望未来两个组合:'ATG'?我知道什么时候它在一个字符串或数组中,我可以向前看并根据索引进行连接,但RDD行又如何呢? – jcadcell
你可以参考[这个答案](http://*.com/questions/35154267/how-to-compute-cumulative-sum-using-spark) – Psidom
谢谢,这个工程。我必须通过它来了解发生了什么。 – jcadcell