spark学习(一):集群的搭建
目录
Spark集群的配置:
准备三台有hdfs集群的节点
hadoop101 master和worker
hadoop102 worker
hadoop103 worker
1.下载
官网下载:http://spark.apache.org/downloads.html
所有版本:https://archive.apache.org/dist/spark/
2.上传
上传压缩文件到指定的目录如:
/opt/software
3.解压
将上传的文件解压到指定文件夹/opt/module:
tar -zxvf spark-2.2.0-bin-hadoop2.7.gz /opt/module/
4.修改配置文件
进入安装的spark/conf目录下
将spark-env.sh.template 和slaves.template复制一份
cp slaves.template slavescp spark-env.sh.template spark-env.sh
进入slaves,配置worker节点,我们将三台节点都作为worker使用
进入spark-env.sh
将Hadoop101节点作为Master
Spark提交任务的端口,默认就是7077
给每个worker的核数,也就是worker可以并行运行两个Task
设置每个worker可使用的内存
将配置好的spark发送到另外两台节点上,回到/opt/module目录,下面两种方式都可以
[[email protected] module]# scp -r spark-2.2.0-bin-hadoop2.7/ hadoop102:/opt/module[[email protected] module]# scp -r spark-2.2.0-bin-hadoop2.7/ hadoop102:$
环境变量配置
将spark添加到环境变量,添加以下内容到 /etc/profile
注意最后 source /etc/profile 刷新配置
5.启动spark集群
在master节点进入spark的sbin目录下
[[email protected] sbin]# pwd/opt/module/spark-2.2.0-bin-hadoop2.7/sbin
[[email protected] sbin]# ./start-all.sh
出现Master和worker进程即可
访问hadoop101:8080
我们知道8080是tomcat的默认端口号(可以不更改),如有必要,我们可以更改Master WEBUI访问的端口号
三种方式都是在master节点设置
第一种:进入conf/spark-env.sh,在之前的配置后加一句
export SAPRK_MASTER_WEBUI=8081
第二种:进入sbin/start-master.sh,修改下面的端口号即可
第三种:导入临时环境变量
export 查看当前节点的环境变量
export SPARK_MASTER_WEBUI_PORT=8081
export -n SPARK_MASTER_WEBUI_PORT //去除导入的临时环境变量
6.Spark的四种部署模式
Local模式
不指定master,或者指定--master local/local[n]/local[*],在监控界面看不到application
local 只使用一个cores
local[n] 使用n个cores
local[*] 使用全部的cores
Standalone
Spark本身提供的集群模式 --master spark://host:port
Yarn
统一的调度平台 --master yarn
Mesos
类似与yarn的资源调度平台 --master mesos://host:port
7.如何去提交spark任务
提交任务的两个命令
spark-submit 程序执行完成之后,application就会退出
spark-shell 会一直占有一个application,手动退出,ctrl + c
spark-shell
是一个交互式的命令行,主要用于测试
spark-shell脚本,实际上调用的是spark-submit脚本:
spark-shell //以本地模式进行测试,在监控界面看不到application
spark-shell --master spark://hadoop101:7077 //以Standalone模式测试,在监控界面一直占有一个application
spark-submit
最常用于spark任务的提交。需要有jar包
spark-submit 选项 jar包 参数列表
8.Spark的第一个程序
SparkPi的源码,在我们安装的spark/examples/jars/spark-examples_2.11-2.2.0.jar,就是该程序的jar包
求圆周率: SparkPi
local模式运行程序,不能在spark 监控界面中进行查看。
spark-submit --class org.apache.spark.examples.SparkPi /opt/spark-2.2.0/examples/jars/spark-examples_2.11-2.2.0.jar 100
指定master,任务提交到standalone集群运行:
spark-submit --master spark://Hadoop101:7077 --class org.apache.spark.examples.SparkPi /opt/spark-2.2.0/examples/jars/spark-examples_2.11-2.2.0.jar 100
spark-submit 查看参数
--master 指定程序以何种方式运行
--class 运行程序的 包名+类名
Jar包的路径
后面的100,就是传入该程序的参数
在源码中我们得知,我们传入参数100,然后取100000*100和Int的最大值中的较小数,这个就是随机生成的坐标数m,简单理解就是向圆上打点
9.Spark编程(WordCount)
利用spark-shell完成WordCount案例
scala> val rdd1 = sc.textFile("hdfs://hadoop101:9000/WordCount/bigdata.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://hadoop101:9000/WordCount/bigdata.txt MapPartitionsRDD[3] at textFile at <console>:24
scala> val rdd2 = rdd1.flatMap(_.split(" "))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:26
scala> rdd2.collect
res0: Array[String] = Array(hello, spark, hello, scala, tom, hello, spark, scala, spark, hello, spark, hello)
scala> val rdd3 = rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:28
scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:30
scala> rdd4.collect
res0: Array[(String, Int)] = Array((scala,2), (tom,1), (hello,5), (spark,4))
IDEA编写spark版本的wordCount
查看官方网站,我们需要导入下面两个包
def main(args: Array[String]): Unit = {
if (args.length != 2) {
println("Usage :com.bj.spark33.day01.ScalaWordCount <input> <output>")
sys.exit(1)
}
// 参数接收
val Array(input, output) = args
val conf: SparkConf = new SparkConf()
// 创建SparkContext
val sc: SparkContext = new SparkContext(conf)
// 理论可以一行搞定,实际不推荐
// sc.textFile("").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("")
// 读取数据
val file: RDD[String] = sc.textFile(input)
// 切分并压平
val words: RDD[String] = file.flatMap(_.split(" "))
// 组装
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
// 分组聚合
val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
// 排序 降序
// 第一种 - 第二种 : 第二个参数
val finalRes: RDD[(String, Int)] = result.sortBy(_._2, false)
// 直接存储到hdfs中
finalRes.saveAsTextFile(output)
// 释放资源
sc.stop()
把程序打成jar包,提交到集群中运行:
spark-submit [--master xx] --class 程序运行的主类 xxx.jar input output
spark-submit --master spark://hadoop101:7077 --class com.spark.day01.ScalaWordCount /root/spark33-1.0-SNAPSHOT.jar hdfs://hadoop101:9000/WordCount/input hdfs://hadoop101:9000/wordcount/output2
以local模式运行
如果是local模式运行spark程序:
master地址,appname必须在Conf上设置。
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCountSpark {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
println("USAGE:day01.WordCount <input> <output>")
sys.exit(1)
}
val Array(input,output) = args
//1创建sparkContext实例
val conf=new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
val sc: SparkContext = new SparkContext(conf)
//1.读取文件
val rdd1: RDD[String] = sc.textFile(input)
//2.切分文件,并扁平化
val rdd2: RDD[String] = rdd1.flatMap(_.split(" "))
//3.组装成(word,1)
val rdd3: RDD[(String, Int)] = rdd2.map((_,1))
//4.分组聚合
val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((_+_))
//5.输出
rdd4.foreach(println)
rdd4.saveAsTextFile(output)
//关闭,释放资源
sc.stop()
}
}