Spark accumulableCollection不能与mutable配合使用

问题描述:

我正在使用Spark来完成员工记录积累,并且使用了Spark的累加器。我使用Map [empId,emp]作为accumulableCollection,以便我可以通过他们的ID搜索员工。我尝试了一切,但它不起作用。有人指出,如果我使用accumulableCollection的方式有任何逻辑问题,或者不支持Map。下面是我的代码Spark accumulableCollection不能与mutable配合使用

package demo 

import org.apache.spark.{SparkContext, SparkConf, Logging} 

import org.apache.spark.SparkContext._ 
import scala.collection.mutable 


object MapAccuApp extends App with Logging { 
    case class Employee(id:String, name:String, dept:String) 

    val conf = new SparkConf().setAppName("Employees") setMaster ("local[4]") 
    val sc = new SparkContext(conf) 

    implicit def empMapToSet(empIdToEmp: mutable.Map[String, Employee]): mutable.MutableList[Employee] = { 
    empIdToEmp.foldLeft(mutable.MutableList[Employee]()) { (l, e) => l += e._2} 
    } 

    val empAccu = sc.accumulableCollection[mutable.Map[String, Employee], Employee](mutable.Map[String,Employee]()) 

    val employees = List(
    Employee("10001", "Tom", "Eng"), 
    Employee("10002", "Roger", "Sales"), 
    Employee("10003", "Rafael", "Sales"), 
    Employee("10004", "David", "Sales"), 
    Employee("10005", "Moore", "Sales"), 
    Employee("10006", "Dawn", "Sales"), 
    Employee("10007", "Stud", "Marketing"), 
    Employee("10008", "Brown", "QA") 
) 

    System.out.println("employee count " + employees.size) 


    sc.parallelize(employees).foreach(e => { 
    empAccu += e 
    }) 

    System.out.println("empAccumulator size " + empAccu.value.size) 
} 

使用accumulableCollection似乎有点小题大做了您的问题,如下演示:

import org.apache.spark.{AccumulableParam, Accumulable, SparkContext, SparkConf} 

import scala.collection.mutable 

case class Employee(id:String, name:String, dept:String) 

val conf = new SparkConf().setAppName("Employees") setMaster ("local[4]") 
val sc = new SparkContext(conf) 

implicit def mapAccum = 
    new AccumulableParam[mutable.Map[String,Employee], Employee] 
{ 
    def addInPlace(t1: mutable.Map[String,Employee], 
       t2: mutable.Map[String,Employee]) 
     : mutable.Map[String,Employee] = { 
    t1 ++= t2 
    t1 
    } 
    def addAccumulator(t1: mutable.Map[String,Employee], e: Employee) 
     : mutable.Map[String,Employee] = { 
    t1 += (e.id -> e) 
    t1 
    } 
    def zero(t: mutable.Map[String,Employee]) 
     : mutable.Map[String,Employee] = { 
    mutable.Map[String,Employee]() 
    } 
} 

val empAccu = sc.accumulable(mutable.Map[String,Employee]()) 

val employees = List(
    Employee("10001", "Tom", "Eng"), 
    Employee("10002", "Roger", "Sales"), 
    Employee("10003", "Rafael", "Sales"), 
    Employee("10004", "David", "Sales"), 
    Employee("10005", "Moore", "Sales"), 
    Employee("10006", "Dawn", "Sales"), 
    Employee("10007", "Stud", "Marketing"), 
    Employee("10008", "Brown", "QA") 
) 

System.out.println("employee count " + employees.size) 

sc.parallelize(employees).foreach(e => { 
    empAccu += e 
}) 

println("empAccumulator size " + empAccu.value.size) 
empAccu.value.foreach(entry => 
    println("emp id = " + entry._1 + " name = " + entry._2.name)) 

虽然这是记录不完整,现在,在Spark代码库relevant test是相当启发。

编辑:事实证明,使用accumulableCollection确实有值:你不需要定义一个AccumulableParam及以下作品。如果他们对人有用,我会离开这两个解决方案。

case class Employee(id:String, name:String, dept:String) 

val conf = new SparkConf().setAppName("Employees") setMaster ("local[4]") 
val sc = new SparkContext(conf) 

val empAccu = sc.accumulableCollection(mutable.HashMap[String,Employee]()) 

val employees = List(
    Employee("10001", "Tom", "Eng"), 
    Employee("10002", "Roger", "Sales"), 
    Employee("10003", "Rafael", "Sales"), 
    Employee("10004", "David", "Sales"), 
    Employee("10005", "Moore", "Sales"), 
    Employee("10006", "Dawn", "Sales"), 
    Employee("10007", "Stud", "Marketing"), 
    Employee("10008", "Brown", "QA") 
) 

System.out.println("employee count " + employees.size) 

sc.parallelize(employees).foreach(e => { 
    // notice this is different from the previous solution 
    empAccu += e.id -> e 
}) 

println("empAccumulator size " + empAccu.value.size) 
empAccu.value.foreach(entry => 
    println("emp id = " + entry._1 + " name = " + entry._2.name)) 

这两种解决方案都使用Spark 1.0.2进行测试。

+0

看起来像empAccu.value.size没有给出正确的值,打印工作正常。我得到以下输出 ' **员工数8 ** ** 大小empAccumulator 4 ** EMP ID = 10007名=梭哈 EMP ID = 10001名=汤姆 EMP ID = 10004名=大卫 EMP ID = 10006 name = Dawn emp id = 10003 name = Rafael emp id = 10002 name = Roger emp id = 10005 name = Moore emp id = 10008 name = Brown ' – smishra 2014-10-15 21:59:32