spark复习二:Broadcast广播变量和accumulator累加器
1.shared variable共享变量:
scala> val kvphone=sc.parallelize(List((1,"iphone"),(2,"xiaomi"),(3,"oppo"),(4,"huawei")))
kvphone: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[0] at parallelize at <console>:24
(1)创建phoneMap
scala> val phoneMap=kvphone.collectAsMap
phoneMap: scala.collection.Map[Int,String] = Map(2 -> xiaomi, 4 -> huawei, 1 -> iphone, 3 -> oppo)
(2)创建phoneIds
scala> val phoneIds=sc.parallelize(List(2,3,4,1))
phoneIds: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
(3)使用对照表进行转换
scala> val phoneNames=phoneIds.map(x=>phoneMap(x)).collect
phoneNames: Array[String] = Array(xiaomi, oppo, huawei, iphone)
注:每次执行一次转换,都必须将ids与map传送到worker Node,如果ids与map都很大,则会耗费很多的内存。如下图:
2.为了解决内在消耗的问题,引入Broadcast变量:
Broadcast广播变量的使用规则 :
1:可以使用SparkContext.broadcast([初始值])创建
2:使用value的方法来读取广播变量的值
3:Broadcast广播变量被创建后,就不可以修改。
Sala> val KVPhones=sc.parallelize(List((1,"iphone"),(2,"xiaomi"),(3,"oppo"),(4,"huawei")))
KVPhones: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> val phonesMap=KVPhones.collectAsMap
phonesMap: scala.collection.Map[Int,String] = Map(2 -> xiaomi, 4 -> huawei, 1 -> iphone, 3
scala> val phonesID=sc.parallelize(List(2,3,4,1))
phonesID: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> val bcPhoneMap=sc.broadcast(phonesMap)
bcPhoneMap: org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,String]] = Broadcast(1)
scala> val phones=phonesID.map(x=>bcPhoneMap.value(x)).collect
phones: Array[String] = Array(xiaomi, oppo, huawei, iphone)
3.累加器介绍:
为了方便并行处理,spark提供了accumlator累加器共享变量。
使用规则:Accumulator可以sc创建使用+=在task中,foreach不能读取(内)不能,在循环外才可以读取。
4.累加器演示:
(1).创建RDD,并初始化:
scala> val intRDD=sc.parallelize(List(3,4,5,6))
intRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
(2).创建sum累加器总和:
scala> val sum=sc.accumulator(0.0)
warning: there were two deprecation warnings; re-run with -deprecation for details
sum: org.apache.spark.Accumulator[Double] = 0.0
(3).创建num累加器:
scala> val num=sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
num: org.apache.spark.Accumulator[Int] = 0
(4).求和:
scala> intRDD.foreach(i=>{
| sum+=i
| num+=1
| }
| )
(5).输出显示:
scala> println("sum="+sum.value+",num="+num.value)
sum=18.0,num=4
(6).计算平均数:
scala> val avg=sum.value/num.value
avg: Double = 4.5