Spark学习(2)-Spark数据集与编程模型
目录:
- RDD介绍
- Spark核心介绍 - RDD
- Spark核心介绍 - 分区
- Spark核心介绍 - 宽依赖和窄依赖
- Spark核心介绍 - Transformation 和 Action
- Spark发布
一.RDD介绍
- Spark 核心的概念是 Resilient Distributed Dataset (RDD),弹性分布式数据集:一个可并行操作的有容错机制的数据集合。可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。
同时,RDD还提供一组丰富的操作来操作这些数据。在这些操作中,诸如map、faltMap、filter等转换操作实现了函数式编程模式,很好地切合了scala的集合操作。初此之外,RDD还提供了诸如join、groupBy、reduceByKey等更为方便你的操作(注意:reduceByKey是action,而非transformation),以支持常见的数据运算。 - 通产来讲,针对数据处理有几种常见模型,包括:Iterative Algorithms,Relational Queries,MapReduce,Stream Prcessing。例如Hadoop MapReduce采用了MapReduce模型,Strom则采用了Stream Prcessing模型。RDD混合了这四种模型,使得Spark可以应用于各种大数据处理场景。
解析:
-
RDD:弹性分布式数据集
分布式:将数据放在多台服务器上进行并行计算的一种方式
数据集:包含很多条数据
弹性:在计算过程中,某些任务或者某些数据出现故障之后,spark都可以将这个RDD还原成一个正常的状态 -
RDD中也有一个概念:分区
数据集是一个很大的数据集合,如果这个集合是一个给整体的话,那么将会出现计算时间过长,所以Spark将RDD中的数据进行第二次拆分:按分区来拆分。
每一个分区将作为spark计算时的一个Task任务(shuffleMapTask)
在mapReduce里面如何认定一个MapTask?一个块产生一个MapTask -
Spark的算子分类:转换算子和行动算子(在某些书中提到控制算子,相当于行动算子的一小块)
转换算子在使用的时候,spark是不会真正执行,直到需要行动算子之后才会执行。
在spark中每一个算子计算之后就会产生一个新的RDD。 -
Spark在计算的时候,只要源文件存在,那么结果永远不会出错。(高容错性)
-
问题一:为什么Spark在使用中,要需要行动算子的时候才开始进行计算?
如果每一个算子在创建的时候就执行的话,执行的结果都会产生一个新的RDD,但这些RDD优先存到内存中。如果很长一段时间都没有遇到行动算子的话,那么就会存在要么数据丢失或者数据移到硬盘上(那么计算速度会下降)
spark要解决这个问题,就要把这种可能性缩小。 -
问题二:如果某一个RDD在计算的时候出错(如服务器挂掉),在spark怎么解决?
spark内部中有一个血统容错的机制(有一个RDD挂掉之后,spark会从它的上一步去拿RDD,如果上一步的RDD也掉了的话,又继续向上进行,直到拿到RDD)。
-
问题一:为什么Spark在使用中,要需要行动算子的时候才开始进行计算?
转换算子和行动算子说明:
二.Spark核心介绍 - RDD
说明:
RDD是一个只读的(不可以修改),spark针对每一步操作会操作一个新的RDD.
一个RDD是否可以同时存在内存和硬盘中?可以的
Spark的产生RDD的数据来源,可以来此外部文件(hdfs、hbase、文件系统等等)和scala集合(数组、list、map、set)
实例:
- 将原生的scala数据集合转换为RDD操作
package test import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD object SparkFirst { def main(args: Array[String]): Unit = { //先拿到sc == sparkContext //spark主要是做计算 -->最终是不是要打包发布到spark集群中,因为集群中要运行多个spark程序,所以事先要给job取个名字 val conf=new SparkConf().setAppName("HelloSpark").setMaster("local[2]") val sc=new SparkContext(conf) val arr=Array(2,3,4,5,6,7) //因为RDD是只读的,所以在定义变量的时候,采用val而不是var val r1:RDD[Int]=sc.makeRDD(arr) println(r1.count()) } }
- 将hdfs原文件的内部数据转换为RDD操作
三.Spark核心介绍 - 分区
说明:(面试重点)
查看分区数:
设置分区数的第1种方法
设置分区数的第2种方法
设置分区数的第3种方法
四.Spark核心介绍 - 宽依赖和窄依赖
说明:
stage的划分:遇到宽依赖就划分
五.Spark核心介绍 - Transformation 和 Action
1.Transformation
说明:
map和flatmap的区别:
aaa.txt
scala hello
aaa world java
yi guang aaa bbb
ddd
package test
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
object SparkFirst {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("HelloSpark").setMaster("local[2]")
val sc=new SparkContext(conf)
val arr:RDD[String]=sc.textFile("E://aaa.txt")
println(arr.count()) //4
println(arr.flatMap(_.split(" ")).count) //12
println(arr.map(_.split(" ")).count()) //4
}
}
map的形式
List(List(scala hello), List(aaa world java),List(yi guang aaa bbb),List(ddd))
flatMap的形式
List(scala,hello,aaa,world,java,yi,guang,aaa,bbb,ddd)
实例:
- map算子
- 使用scala原生语法和spark转化算子的区别
- filter算子
- 关于执行效率:比如将一批数据中年龄大于40的人的工资加500,那么先过滤掉年龄小于40的,再工资加500,也就是先过滤,再计算
- flatmap算子(map算子和flatmap算子的区别)
- groupByKey算子
- reduceByKey算子
结果: - union算子
结果: - join算子
结果: - union和join的区别
- mapValues算子
值扩大10倍
结果: - partitionBy算子
第一步:创建自定义分区类
第二步:使用自定义分区
2.Action
说明:
实例:
- count算子:统计RDD中有多少个元素
- collect算子
- reduce算子
结果:
结果:
- lookup算子
结果: - take算子
结果: - sortBy算子
结果: - saveAsTextFile算子
- saveAsSequenceFile算子
六.Spark发布
说明:
流程:
- 项目编译、打包
- 发布
实例1:
实例2: - web访问