为什么要有Spark?

上面这张图是Hadoop的MapReduce编程模型的计算概要流程图。
每一次Map完了都把数据放到HDFS,Reduce阶段时在在从HDFS拉取,这个效率太慢了,而且如果有10个MapReduce的任务都是连续性呢?
第一个MapReduce的程序计算完,第二个MapReduce程序是依赖第一个,第三个是依赖第二个和第一个的部分数据,以此类推呢?(迭代式运算)
在现今的互联网中,个人电脑的存储空间也到了TB级别,而在企业中,数据越来越多,数据千奇百怪,单单ETL的过程中就要做大量的运算,另外在机器学习方面(或者可以理解为数据学习)中,每一次进来新的数据都要做大量的猜想式运算,这样的性能效率,能够忍受吗?
在Hadoop的文章中有提到,MapReduce只是一种思想,其中还用了炒菜的例子来说明。
Hadoop的MapReduce编程模型,对迭代式运算的支持非常弱,那么大家都懂的,总会有大神想着去改进它,以满足企业需要。
小结:中间结果的输出,基于Hadoop的MapReduce的计算引擎通常会将中间结果输出到磁盘(边计算边输出结果),进行存储和容错,出于任务管道承接的考虑,当一些查询翻译到MapReduce任务时,往往会产生多个Stage,而这些串联的Stage又依赖底层文件存储(HDFS)来存储每一个Stage的输出结果。
Spark出来时,业界伴随着一句话就是,Spark将会取代Hadoop。
首先我们先提出两个问题:(答案位于RDD篇最尾部 - https://blog.****.net/Su_Levi_Wei/article/details/86697896)
Spark真的是用来取代Hadoop吗?
Spark和Hadoop的关系是什么?
什么是Spark?
Spark是使用Scala编写的一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州伯克利分校AMP实验室,2010年开源,2013年成为Apache孵化项目,14年成为Apache顶级项目。
Spark是一个非常快的通用引擎,Spark非常容易使用,支持Scala、Java、Python、R等语言,并且Spark支持SQL查询、流式计算,连复杂分析都支持了。
Spark是多步计算,内存存储,内存放不下会放到磁盘,所以Spark是内存+磁盘,且任务与任务之间是可以基于内存做交换的,而Hadoop是一个任务完成了在去通知主,主在去开始下一个。
Hadoop是两步计算,磁盘存储。
Spark(一个编程模型内完成,只需要提交一次任务):
提交任务 -> Job1 -> 内存(读、写) -> Job2
Hadoop:
提交任务 -> Job1 -> 写 -> Job2 -> 读
特点
快:Spark实现了高效的DAG执行引擎,可以通过基于内存来高效的处理数据流。

易用:Spark支持Java、Python、和ScalaAPI,还支持超过80种高级算法,使得用户可以快速构建不同的应用,而且还支持交互式的Python和Scala的Spark Shell,可以说是非常方便的在这些Spark Shell中,使用Spark集群来验证问题的方法。

通用:Spark提供了统一的解决方案,Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX),而且这些不同的处理可以在一个应用中无缝使用,可减少开发和维护的人力成本。

兼容性:Spark可以非常方便的和其他开源产品进行融合,比如Spark可以使用Hadoop的YARN和Apache Mesos作为资源管理和调度器,并且还可以处理所有Hadoop支持的数据,包含HDFS、HBase和Cassandra等,这意味着不用迁移也可以使用Spark的强大处理能力,Sprk也可以不依赖第三方的资源管理和调度器,实现了Standalone作为内置资源管理和调度框架,降低了Spark的使用门槛。

Spark组成
Spark目前已经发展成为一个生态系统了,是一个包含多个子项目的集合,其中包括SparkSQL、SparkStreaming、GraphX、MLib、SparkR等子项目,Spark是基于内存计算的大数据并行计算框架。
除了广泛使用的MapReduce计算模型,而且高效的支持更加计算模式,包含交互式查询和处理,Spark适合于各种各样原先需要多种不同的分布式平台的场景,包含批处理、迭代算法、交互式查询、流处理,通过在一个统一的框架下支持这些不同的计算,Spark使得开发人员可以简单而抵消的把各种处理流程整合在一起。
而这样的组合,在实际的数据分析过程中很有意义。
不止如此,Spark的这种特性还大大的减轻了原先需要对各种平台分别管理的负担。
统一的软件栈,各个组件关系密切并且可以相互调用,这种设计有几个好处:
1、软件栈中的所有的程序库和高级组件都可以从下层的改进中获益。
2、运行整个软件栈的代价变小了,不用运行5 ~ 10套独立的软件系统了,一个企业只需要运行一套系统即可,相对的系统的部署、维护、测试、支持等大大缩减。
3、能够构建出无缝整合不同处理模型的应用。

Spark Core
实现了Spark的基本功能,包含任务调度、内存管理、错误恢复与存储系统交互等模块,Spark Core中还包含了对弹性分布式数据集(Resilient Distributed Dataset:RDD)的API定义。
Spark SQL
是Spark用来操作结构化数据的程序包,通过Spark SQL,可以使用SQL或Apache Hive版本的SQL(HQL)来查询数据,Spark SQL还支持多种数据源,如Hive表、Parquet以及JSON等。
Spark Streaming
是Spark提供对实时数据进行流式计算的组件,提供了用来操作数据流的API,并与Spark Core中的RDD API高度对应。
Spark MLlib
提供常用的机器学习(ML)功能的程序库,包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。
集群管理器
Spark设计为可以高效的在一个计算节点到数千个计算节点之间伸缩性计算,为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器上运行,包括Hadoop YARN、Apache Mesos,Spark自身也有携带一个调度器,叫作独立调度器(standalone)。
Spark的应用
数据处理应用
Spark开发生产环境中的数据处理应用的软件开发者,通过对接Spark的API实现对处理的处理和转换等任务。(大规模的数据处理、运算)
数据科学任务
主要是数据分析领域,数据科学家要负责分析数据并建模,具备SQL、统计、预测建模(机器学习)等方面的经验,以及一定的使用Python、Matlab、R语言等其他语言共同编程的能力。
Spark集群

安装就不说……一堆的文章。
备注:Master和Worker存在的意义就是为了支撑Standalone的运行,如果换成YARN或Mesos,那么Master和Worker就不存在了。
集群测试应用
命令:./bin/spark-submit --master spark://hadoop100:7077,hadoop101:7077 --class org.apache.spark.examples.SparkPi --executor-memory 1G --total-executor-cores 3 examples/jars/spark-examples_2.11-2.1.1.jar 2000
在运行过程中,使用jsp命令查看集群全部机器的进程。
命令参数说明:
./bin/spark-submit
--class <main-class> 应用启动类
--master <master-url> Master的Url,如spark://hadoop100:7077、Mesos、
Yarn-client、Yarn-cluster
--deploy-mode <deploy-mode> 是否发布驱动到Worker节点(cluster),或者
作为一个本地客户端(client)(default:client)
*,client模式或cluster模式。
--conf <key>=<value> 任意的Spark配置属性,格式是k=v,如果值包含空格,
可以加引号”k=v”
……other options
--executor-memory 1G 每个executor可用内存为1G
--total-executor-cores 3 每个executor使用的cpu核数为3个
<application> [application-arguments] 打包好的应用jar,包含依赖,这个URL
在集群全局可见,比如hdfs://共享存储
系统,如果是file://path,那么所有的
Worker节点都需要有这个文件,
application-arguments是传给
main(args)的函数。
SparkSubmit参数说明
参数
|
描述
|
local
|
本地以一个Worker线程运行(非并发的情况)
|
local[K]
|
本地以K Worker线程(理想情况下,K设置为机器的CPU核数)
|
local[*]
|
本地以本机同样核数线程运行
|
spark://Host:Port
|
连接到指定的Spark Standalone Cluster Master,端口是Master集群配置的端口,缺省值为7077
|
mesos:// Host:Port
|
连接到指定的Mesos集群,Port缺省值为5050,如果Mesos使用Zookeeper,格式为mesos://zk://……
|
yarn-client
|
以client模式连接到YARN Cluster,集群的位置基于HADOOP_CONF_DIR变量找到(/etc/profile)
|
yarn-cluster
|
以cluster模式连接到YARN Cluster,集群的位置基于HADOOP_CONF_DIR变量找到(/etc/profile)
|
程序运行过程 (1)

IDEA入门程序
pom.xml
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<hadoop.version>2.7.3</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<!-- 导入scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 导入spark的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 指定hadoop-client API的版本 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<!-- 编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
|
ScalaWordCount
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object ScalaWordCount {
def main(args: Array[String]): Unit = {
// 创建Spark配置,必须设置AppName,
// 可以通过http://hadoop100:8080/,每一个应用都有一个AppName
val conf = new SparkConf().setAppName("ScalaWordCount")
//创建Spark执行入口,由此对象创建Spark的抽象(RDD)
val sc = new SparkContext(conf)
//指定以后从哪里读取数据,创建RDD(分布式数据集),
// 得到一个RDD,在Spark操作的都是RDD,RDD只是抽象概念,是Spark内部的封装抽象
val lines: RDD[String] = sc.textFile(args(0))
//读取
//不推荐这种方式,读取困难
//lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false)
//切分压平,得到一个新的RDD,RDD只是抽象的,就是Spark的一个小封装而已
val words:RDD[String] = lines.flatMap(_.split(" "))
//将得到的每一个单词和1组合
val wordAndOne:RDD[(String,Int)] = words.map((_,1))
//按照key进行聚合,又是一个RDD,此时可以感受到,Spark一切皆RDD
val reduced:RDD[(String,Int)] = wordAndOne.reduceByKey(_+_)
//排序
val sorted:RDD[(String,Int)] = reduced.sortBy(_._2,false)
//获取到结果,将结果保存成文件(HDFS、本地)
sorted.saveAsTextFile(args(1))
//释放资源
sc.stop()
//到集群中用bin/spark-submit运行
}
}
|
JavaWordCount
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class JavaWordCount {
public static void main(String[] args) {
//创建配置
SparkConf conf = new SparkConf().setAppName("JavaWordCount");
//创建Spark入口
JavaSparkContext javaSparkContext = new JavaSparkContext();
//指定读取数据的位置
JavaRDD<String> lines = javaSparkContext.textFile(args[0]);
//切分压平
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});
//将单词和1组合在一起
//第一个参数是输入
//第二个参数和第三个参数是返回,返回 对
JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<>(word, 1);
}
});
//聚合
JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
//调换顺序,因为只能按照Key进行排序,除非自己写个排序规则
//参数1:输入
//参数2和参数3:输出
JavaPairRDD<Integer, String> swaped = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.swap();
}
});
//排序
JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
//调整顺序
JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return integerStringTuple2.swap();
}
});
//将数据保存到HDFS
result.saveAsTextFile(args[1]);
//释放资源
javaSparkContext.stop();
}
}
|
JavaLambdaWordCount
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class JavaLambdaWordCount {
public static void main(String[] args) {
//Spark的配置
SparkConf conf = new SparkConf().setAppName("JavaLambdaWordCount");
//创建Spark的应用入口
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
//指定读取数据的位置
JavaRDD<String> lines = javaSparkContext.textFile(args[0]);
//切分压平
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
//将单词和1组合
JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(word -> new Tuple2<>(word, 1));
//聚合
JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((m, n) -> m + n);
//调整顺序,因为排序根据Key进行排序
JavaPairRDD<Integer, String> swaped = reduced.mapToPair(tp -> tp.swap());
//排序
JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
//调整顺序
JavaPairRDD<String, Integer> result = sorted.mapToPair(tp -> tp.swap());
//结果保存
result.saveAsTextFile(args[1]);
//释放资源
javaSparkContext.stop();
}
}
|
以上三个案例的spark-submit运行命令
/bin/spark-submit\
--class com.atguigu.spark.WordCount\
--master spark://master01:7077\
--executor-memory 1G \
--total-executor-cores 2 \
wordcount-jar-with-dependencies.jar\
hdfs://master01:9000/RELEASE\
dfs://master01:9000/out
查看结果:hdfs dfs -cat hdfs://master01:9000/out/*
IDEA本地调试WordCount程序
object ScalaWordCount2 {
def main(args: Array[String]): Unit = {
//每次上传到服务器调试太麻烦了
//本地调试
val conf = new SparkConf()
.setAppName("ScalaWordCount")
//local 本地运行 - 一个结果文件
//local[*] 按照cpu的核数开启对应的线程去执行(可以为*个Executor在运行),多个结果文件
//local[2] 开启2个线程去跑(可以为2个Executor在运行),2个结果文件
.setMaster("local[2]")
//创建Spark执行入口 - RDD
val sc = new SparkContext(conf)
/*sc.makeRDD(List(1,2,3))
sc.parallelize()*/
//指定读取数据位置,得到RDD
val lines:RDD[String] = sc.textFile("hdfs://hadoop100:8020/wc")
//切分压平,又是得到RDD
val words:RDD[String] = lines.flatMap(_.split(" "))
//将单词和1组合,又得到RDD
val wordAndOne:RDD[(String,Int)] = words.map((_,1))
//按照Key进行聚合,又得到了一个RDD
val reduced:RDD[(String,Int)] = wordAndOne.reduceByKey(_+_)
//排序,又是一个RDD
val sorted:RDD[(String,Int)] = reduced.sortBy(_._2,false)
//保存结果
sorted.saveAsTextFile("hdfs://hadoop100:8020/wcR5")
//释放资源
sc.stop()
//可以发现开多少个线程,相当于多少个Executor,那么运行是在Worker的Executor中运行,
// 每个Executor运行的逻辑是一样的,但是分配的数据是不一样的,
// 假设2条数据,分配2个线程,去执行,各自执行一条数据,
// 得到的结果也是2个文件,因为一个Executor输出一个文件
}
}
|
程序运行过程 (2)

每个Spark应用都由一个驱动程序(Driver)来发起对集群上的并行操作,驱动器程序包包含应用的main函数,并且定义了集群上的分布式数据集,还对这些分布式数据集应用提供了相关的操作。
驱动器程序通过一个SparkContext对象来访问Spark,这个对象代表了对计算集群的一个连接。
驱动器程序一般要管理多个执行器(Executor)节点。
YARN
|
Spark-Standalone
|
描述
|
ResourceManager
|
Master
|
管理子节点、资源调度、接收任务请求
|
NodeManager
|
Worker
|
管理当前节点,并管理子进程
|
YarnChild
|
Executor
|
运行真正的要计算的业务逻辑
|
Client
|
Spark-Submit
|
(相当于YARN的Client+AppMaster)提交App,管理该任务的Executor,并将Task提交到集群(提交到Executor)
|
ApplicationMaster
|
|
|
Spark Shell
spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令下用Scala编写Spark程序。
启动spark-shell:
bin/spark-shell \
--master spark://hadoop100:7077 \
--executor-memory 2g \
--total-executor-cores 2
备注:如果启动spark-shell时没有指定master地址,也能正常启动spark-shell和执行spark-shell里面的程序,其实是启动了spark的cluster模式,如果spark是单节点,并且没有指定slave文件,这个时候如果打开spark-shell默认的是local模式。
Local模式是master和worker在同一进程内。
Cluster模式是master和worker在不同进程内。
spark-shell是默认已经将SparkContext类初始化为对象sc,用户代码如果需要用到,则直接应用sc即可。
http://hadoop100:4040/jobs/ 这个是Spark程序运行时的Web UI,可查看Stage等信息