避免在星火

问题描述:

我承担斯卡拉星火的coursera课程,我试图优化这个片段与ReduceByKey洗牌:避免在星火

val indexedMeansG = vectors.                                         
     map(v => findClosest(v, means) -> v).                                      
     groupByKey.mapValues(averageVectors) 

vectorsRDD[(Int, Int)],为了看的依赖列表和RDD的血统我用:

println(s"""GroupBy:                                           
      | Deps: ${indexedMeansG.dependencies.size}                                   
      | Deps: ${indexedMeansG.dependencies}                                     
      | Lineage: ${indexedMeansG.toDebugString}""".stripMargin) 

这都说明这一点:

/* GroupBy:                                             
    * Deps: 1                                              
    * Deps: List([email protected])                                  
    * Lineage: (6) MapPartitionsRDD[18] at mapValues at *.scala:207 []                             
    * ShuffledRDD[17] at groupByKey at *.scala:207 []                                 
    * +-(6) MapPartitionsRDD[16] at map at *.scala:206 []                                
    * MapPartitionsRDD[13] at map at *.scala:139 []                                 
    *  CachedPartitions: 6; MemorySize: 84.0 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B                         
    * MapPartitionsRDD[12] at values at *.scala:116 []                                 
    * MapPartitionsRDD[11] at mapValues at *.scala:115 []                                
    * MapPartitionsRDD[10] at groupByKey at *.scala:92 []                                
    * MapPartitionsRDD[9] at join at *.scala:91 []                                  
    * MapPartitionsRDD[8] at join at *.scala:91 []                                  
    * CoGroupedRDD[7] at join at *.scala:91 []                                   
    * +-(6) MapPartitionsRDD[4] at map at *.scala:88 []                                
    * | MapPartitionsRDD[3] at filter at *.scala:88 []                                
    * | MapPartitionsRDD[2] at map at *.scala:69 []                                 
    * | src/main/resources/*/*.csv MapPartitionsRDD[1] at textFile at *.scala:23 []                   
    * | src/main/resources/*/*.csv HadoopRDD[0] at textFile at *.scala:23 []                     
    * +-(6) MapPartitionsRDD[6] at map at *.scala:89 []                                
    * MapPartitionsRDD[5] at filter at *.scala:89 []                                 
    * MapPartitionsRDD[2] at map at *.scala:69 []                                  
    * src/main/resources/*/*.csv MapPartitionsRDD[1] at textFile at *.scala:23 []                    
    * src/main/resources/*/*.csv HadoopRDD[0] at textFile at *.scala:23 [] */ 

从这个List([email protected])我推断没有洗牌正在完成,对吗?但是,低于ShuffledRDD[17]被打印,这意味着实际上有洗牌。

我已经试过了reduceByKey以取代groupByKey呼叫,像这样:

val indexedMeansR = vectors.                                        
     map(v => findClosest(v, means) -> v).                                     
     reduceByKey((a, b) => (a._1 + b._1)/2 -> (a._2 + b._2)/2) 

和它的依赖和血统是:

/* ReduceBy:                                             
    * Deps: 1                                              
    * Deps: List([email protected])                                  
    * Lineage: (6) ShuffledRDD[17] at reduceByKey at *.scala:211 []                              
    * +-(6) MapPartitionsRDD[16] at map at *.scala:210 []                                
    * MapPartitionsRDD[13] at map at *.scala:139 []                                 
    *  CachedPartitions: 6; MemorySize: 84.0 MB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B                         
    * MapPartitionsRDD[12] at values at *.scala:116 []                                 
    * MapPartitionsRDD[11] at mapValues at *.scala:115 []                                
    * MapPartitionsRDD[10] at groupByKey at *.scala:92 []                                
    * MapPartitionsRDD[9] at join at *.scala:91 []                                  
    * MapPartitionsRDD[8] at join at *.scala:91 []                                  
    * CoGroupedRDD[7] at join at *.scala:91 []                                   
    * +-(6) MapPartitionsRDD[4] at map at *.scala:88 []                                
    * | MapPartitionsRDD[3] at filter at *.scala:88 []                                
    * | MapPartitionsRDD[2] at map at *.scala:69 []                                 
    * | src/main/resources/*/*.csv MapPartitionsRDD[1] at textFile at *.scala:23 []                   
    * | src/main/resources/*/*.csv HadoopRDD[0] at textFile at *.scala:23 []                     
    * +-(6) MapPartitionsRDD[6] at map at *.scala:89 []                                
    * MapPartitionsRDD[5] at filter at *.scala:89 []                                 
    * MapPartitionsRDD[2] at map at *.scala:69 []                                  
    * src/main/resources/*/*.csv MapPartitionsRDD[1] at textFile at *.scala:23 []                    
    * src/main/resources/*/*.csv HadoopRDD[0] at textFile at *.scala:23 [] */ 

这一次,相关性是ShuffleDependency和我我无法理解为什么。

由于RDD是一对键是整数,因此具有的排序,我还试图修改的分割器和使用RangePartitioner,但它并不能改善或者

reduceByKey甲操作仍然涉及洗牌,因为它仍然需要确保具有相同密钥的所有项目成为同一分区的一部分。

但是,这将是一个比groupByKey操作小得多的洗牌操作。 A reduceByKey将在混洗之前执行每个分区内的缩小操作,从而减少要混洗的数据量。

+0

但是相应地,对于依赖关系的输出,groupByKey有一个OneToOneDependency,它不涉及混洗,reduceByKey具有ShuffleDependency,涉及混洗。为什么? – elbaulp

+0

'OneToOneDependency'对应于'mapValues'调用,而不是'groupByKey'调用。如果删除了,你应该注意到'ShuffleDependency'。另外,请注意'groupByKey'血统中的'ShuffledRDD'。 –