大数据开发之spark介绍及wordcount程序开发
1.1什么是Spark(官网:http://spark.apache.org)
Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、
腾讯、京东、携程、优酷土豆。当前百度的Spark已应用于凤巢、大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯Spark集群达到8000台的规模,是当前已知的世界上最大的Spark集群。
1.2为什么要学Spark
Spark是一个开源的类似于Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Spark中的Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。
Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。
1.3 Spark特点
1.3.1 快
与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。
1.3.2 易用
Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。
1.3.4 通用
Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。
1.3.5 兼容性
Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。
-
2 Spark角色介绍
Spark是基于内存计算的大数据并行计算框架。因为其基于内存计算,比Hadoop中MapReduce计算框架具有更高的实时性,同时保证了高效容错性和可伸缩性。从2009年诞生于AMPLab到现在已经成为Apache顶级开源项目,并成功应用于商业集群中,学习Spark就需要了解其架构。
Spark架构图如下:
Spark架构使用了分布式计算中master-slave模型,master是集群中含有master进程的节点,slave是集群中含有worker进程的节点。
- Driver Program :运⾏main函数并且新建SparkContext的程序。
- Application:基于Spark的应用程序,包含了driver程序和集群上的executor。
- Cluster Manager:指的是在集群上获取资源的外部服务。目前有三种类型
(1)Standalone: spark原生的资源管理,由Master负责资源的分配
(2)Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架
(3)Hadoop Yarn: 主要是指Yarn中的ResourceManager
- Worker Node: 集群中任何可以运行Application代码的节点,在Standalone模式中指的是通过slaves文件配置的Worker节点,在Spark on Yarn模式下就是NodeManager节点
- Executor:是在一个worker node上为某应用启动的⼀个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用都有各自独立的executor。
- Task :被送到某个executor上的工作单元。
使用idea工具scala语言开发wordcount程序
配置Maven的pom.xml
<properties> <scala.version>2.11.8</scala.version> <spark.version>2.1.3</spark.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> |
编写spark程序
package cn.test.spark import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object WordCount { def main(args: Array[String]): Unit = { //设置spark的配置文件信息 val sparkConf: SparkConf = new SparkConf().setAppName("WordCount") //构建sparkcontext上下文对象,它是程序的入口,所有计算的源头 val sc: SparkContext = new SparkContext(sparkConf) //读取文件 val file: RDD[String] = sc.textFile(args(0)) //对文件中每一行单词进行压平切分 val words: RDD[String] = file.flatMap(_.split("")) //对每一个单词计数为1 转化为(单词,1) val wordAndOne: RDD[(String, Int)] = words.map(x=>(x,1)) //相同的单词进行汇总 前一个下划线表示累加数据,后一个下划线表示新数据 val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_) //保存数据到HDFS result.saveAsTextFile(args(1)) sc.stop() } } |
maven打包出jar包
启动spark集群 后将jar包上传到集群
启动spark
/export/servers/spark/sbin/start-all.sh
使用spark-submit命令提交Spark应用(注意参数的顺序)
spark-submit \
--class cn.test.spark.WordCount \
--master spark://node1:7077 \
--executor-memory 1g \
--total-executor-cores 2 \
/root/spark-1.0-SNAPSHOT.jar \
/words.txt \
/spark_out
这里通过spark-submit提交任务到集群上。用的是spark的Standalone模式
Standalone模式是Spark内部默认实现的一种集群管理模式,这种模式是通过集群中的Master来统一管理资源。
- 查看Spark的web管理界面
地址: 192.168.200.160:8080
- 查看HDFS上的结果文件
hdfs dfs -cat /spark_out/part*
(hello,4)
(me,2)
(you,3)
(her,1)