多行星火滑动窗口

问题描述:

我学习的Apache星火使用Scala,并想用它来处理跨多行这样的DNA数据集:多行星火滑动窗口

ATGTAT 
ACATAT 
ATATAT 

我想这个映射到一个固定的群体大小k并计数组。因此,对于k = 3,我们会得到每个字符组与后面的两个字符:

ATG TGT GTA TAT ATA TAC 
ACA CAT ATA TAT ATA TAT 
ATA TAT ATA TAT 

...再算上团体(如字数):

(ATA,5), (TAT,5), (TAC,1), (ACA,1), (CAT,1), (ATG,1), (TGT,1), (GTA,1) 

的问题是, “字”跨越多行,正如上面的示例中的TAC一样。它跨越了换行。我不想只计算每行中的组,但是在整个文件中忽略行结束。

换句话说,我想在整个文件上处理整个序列作为宽度为k的滑动窗口,就好像没有换行符一样。问题是向前看(或后退)到下一个RDD行,以在到达行尾时完成一个窗口。

两个想法,我所做的是:

  1. 追加K-1从下一行大字:
ATATATAC 
ACATATAT 
ATATAT 

我试图与星火SQL铅()函数,但是当我尝试执行flatMap时,我得到了WindowSpec的NotSerializableException。有没有其他方法可以引用下一行?我需要编写自定义输入格式吗?

  1. 阅读整个序列中作为一个单一的线(或读取后加入行):
  2. ATATATACATATATATAT 
    

    有一种读取多个线,所以他们可以作为一个处理?如果是这样,它是否都需要适应单个机器的内存?

    我意识到这些都可以作为预处理步骤完成。我想知道最好的方法是在Spark内部完成。一旦我有了这些格式,我就知道如何去做其余的事情,但我被困在这里。

开始=>

可以使单个字符串的RDD,而不是加入他们为一条线,因为这将会使结果不能被分配的字符串:

val rdd = sc.textFile("gene.txt") 
// rdd: org.apache.spark.rdd.RDD[String] = gene.txt MapPartitionsRDD[4] at textFile at <console>:24 

所以简单地用flatMap分割线成字符的列表:

rdd.flatMap(_.split("")).collect 
// res4: Array[String] = Array(A, T, G, T, A, T, A, C, A, T, A, T, A, T, A, T, A, T) 

一个更完整的解决方案,从this answer借:

val rdd = sc.textFile("gene.txt") 

// create the sliding 3 grams for each partition and record the edges 
val rdd1 = rdd.flatMap(_.split("")).mapPartitionsWithIndex((i, iter) => { 
    val slideList = iter.toList.sliding(3).toList 
    Iterator((slideList, (slideList.head, slideList.last))) 
}) 

// collect the edge values, concatenate edges from adjacent partitions and broadcast it 
val edgeValues = rdd1.values.collect 

val sewedEdges = edgeValues zip edgeValues.tail map { case (x, y) => { 
    (x._2 ++ y._1).drop(1).dropRight(1).sliding(3).toList 
}} 

val sewedEdgesMap = sc.broadcast(
    (0 until rdd1.partitions.size) zip sewedEdges toMap 
) 

// sew the edge values back to the result 
rdd1.keys.mapPartitionsWithIndex((i, iter) => iter ++ List(sewedEdgesMap.value.getOrElse(i, Nil))). 
    flatMap(_.map(_ mkString "")).collect 

// res54: Array[String] = Array(ATG, TGT, GTA, TAT, ATA, TAC, ACA, CAT, ATA, TAT, ATA, TAT, ATA, TAT, ATA, TAT) 
+0

我认为问题依然存在:我如何在当前元素之前访问元素的两个位置。所以如果我在第一个元素'A'上,我如何展望未来两个组合:'ATG'?我知道什么时候它在一个字符串或数组中,我可以向前看并根据索引进行连接,但RDD行又如何呢? – jcadcell

+0

你可以参考[这个答案](http://*.com/questions/35154267/how-to-compute-cumulative-sum-using-spark) – Psidom

+0

谢谢,这个工程。我必须通过它来了解发生了什么。 – jcadcell