spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1)

今天我们要做的就是简单编写一个统计单词出现数量的项目!!!

目录

1.搭建Spark开发环境

1.1 完成wordcount示例

2.Spark架构理解

3.Spark工作原理

4.DAG、Stage、宽窄依赖 


 

1.搭建Spark开发环境

先安装scala,我这里是直接安装运行已经集成好的eclipse软件,包括jdk的配置

jdk你可以去官网下载,1.8以上的,最好是我这个版本的(如果出错就可以使用下图版本):

spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1)

 还需要去配置jdk环境:(不懂去百度一下)spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1)

eclipse软件资源:

链接:https://pan.baidu.com/s/1D6Bp1KRmH-ZZM4s_0Esx7A 
提取码:dujm 
复制这段内容后打开百度网盘手机App,操作更方便哦

 

1.1 完成wordcount示例

打开eclipse软件,新建scala项目,可以直接在窗口的右上角创建

spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1)

 新建好后,窗体左边就会出现下图项目文件列表,但是这里需要改一个版本:右击出现选项--->点击Properties选项

spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1) spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1)

然后就是导入需要用到的jar包

spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1)

jar的网盘资源:(全选jar导入)

链接:https://pan.baidu.com/s/1jZHbd5e2r01CiuoJtx3WwQ 
提取码:liom 
复制这段内容后打开百度网盘手机App,操作更方便哦

最后就是编写代码了,代码如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object numberWord {
def main(args: Array[String]):Unit = {
  System.setProperty("hadoop.home.dir", "E:\\U\\互联网大数据\\0509\\hadoop-2.6.5");   
//这里是你的hadoop的安装包的路径
    //spark处理---并发迭代大数据  
    //RDD.map word->(w,1)
    //rdd.reduce->w->sum 1
     //1.sc
    var conf = new SparkConf()
    conf.setMaster("local[*]").setAppName("")
    var sc = new SparkContext(conf)
    sc.setLogLevel("WARN");
    //2.sc.textFile ->RDD
    var path = "src/data/txt"    //这里是你的数据路径
    var filedata_rdd = sc.textFile(path,2)
    var words_rdd=filedata_rdd
    .flatMap(_.split("\\W+"))
    .map(x=>(x,1))
    .reduceByKey(_+_)
    .map(_.swap)
    .sortByKey(false)
//    words_rdd.foreach(println)
    println(filedata_rdd.count())
    words_rdd.foreach{line=>
      println("word="+line._1+" ,num="+line._2)
    }
    println("end....")
  }    
}

我就直接在项目里面新建一个文件,来存放测试数据

spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1)

 数据内容为:

spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1)

运行结果为:

spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1)

2.Spark架构理解

并行化是将工作负载分在不同线程或不同节点上执行的子任务.

Spark的工作负载的划分由RDD分区决定。

(1).任务调度

spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1)

架构示意图: 

spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1)

3.Spark工作原理

(1).Spark工作流程

spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1) 

编写程序提交到Master上,

Master是由四大部分组成(RDD Graph,Scheduler,Block Tracker以及Shuffle Tracker)

启动RDD Graph就是DAG,它会提交给Task Scheduler任务调度器等待调度执行

具体执行时,Task Scheduler会把任务提交到Worker节点上

Block Tracker用于记录计算数据在Worker节点上的块信息

Shuffle Blocker用于记录RDD在计算过程中遇到Shuffle过程时会进行物化,Shuffle Tracker用于记录这些物化的RDD的存放信息

Spark有如下优势:

Spark提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求
官方资料介绍Spark可以将Hadoop集群中的应用在内存中的运行速度提升100倍,甚至能够将应用在磁盘上的运行速度提升10倍
 

4.DAG、Stage、宽窄依赖 

(1).DAG:有向无环图:有方向,无闭环,代表着数据的流向,这个DAG的边界则是Action方法的执行;原始的RDD通过一系列的转换就形成

(2).Stage概念

Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAG Scheduler(调度),DAG Scheduler会把DAG划分相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每个stage包含一个或多个task任务。然后将这些task以taskSet的形式提交给TaskScheduler运行stage是由一组并行的task组成。

stage划分

一个Job会被拆分为多组Task,每组任务被称为一个Stage就像Map Stage, Reduce Stage。Stage的划分,简单的说是以shuffle和result这两种类型来划分。

spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。

DAG Scheduler:负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,且将DAG划分为不同的Stage,每个Stage可并发执行一组task。

TaskScheduler:DAGScheduler将划分完成的Task提交到TaskScheduler,TaskScheduler通过Cluster Manager在集群中的某个Worker的Executor上启动任务,实现类TaskSchedulerImpl。

(3).RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)以及宽依赖(wide dependency).

窄依赖

父RDD和子RDD partition之间的关系是一对一的。或者父RDD一个partition只对应一个子RDD的partition情况下的父RDD和子RDD partition关系是多对一的。不会有shuffle的产生。父RDD一个分区去到子RDD的一个分区

宽依赖

父RDD与子RDD partition之间的关系是一对多。会有shuffle的产生。父RDD的一个分区的数据去到子RDD的不同分区里面。

spark的统计单词实例运行、架构、原理、DAG、stage、宽窄依赖(1)

 

 

图片来自博客:https://blog.csdn.net/qq_16681169/article/details/82432841

介绍来自博客:https://www.cnblogs.com/wnbahmbb/p/6271375.html

https://www.cnblogs.com/LHWorldBlog/p/8415541.html