执行Spark程序
1.执行第一个spark程序(standalone)
/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://hadoop102:7077 --executor-memory 1G --total-executor-cores 2 /opt/module//spark-2.1.1-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.1.jar 100
参数说明:
—master spark://hadoop102:7077指定Master的地址
--executor-memory 1G 指定每个executor可用内存为1G
--total-executor-cores 2 指定每个executor使用的cup核数为2个
2.执行第一个spark程序(yarn)
/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client /opt/module/spark-2.1.1-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.1.jar 100
3.Spark应用提交
一旦打包好,就可以使用bin/spark-submit脚本启动应用了. 这个脚本负责设置spark使用的classpath和依赖,支持不同类型的集群管理器和发布模式:
./bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
一些常用选项:
• --class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi)
• --master: 集群的master URL (如 spark://23.195.26.187:7077)
• --deploy-mode: 是否发布你的驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)*
• --conf: 任意的Spark配置属性, 格式key=value. 如果值包含空格,可以加引号“key=value”. 缺省的Spark配置
• application-jar: 打包好的应用jar,包含依赖. 这个URL在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path都包含同样的jar.
• application-arguments: 传给main()方法的参数
Master URL 可以是以下格式
例如:bin/spark-submit --class com.luomk.sparkcore_worldcount --master spark://hadoop102:7077 /opt/module/spark-2.1.1-bin-hadoop2.7/localjar/sparkcore_worldcount-1.0-SNAPSHOT.jar
4.启动Spark Shell
spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。
启动Spark shell:/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-shell --master spark://hadoop102:7077 --executor-memory 2g --total-executor-cores 2
注意:如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。
4.1 在Spark shell中编写WordCount程序
4.1.1 首先启动hdfs
4.1.2 将Spark目录下的RELEASE文件上传一个文件到hdfs://master01:9000/RELEASE
4.1.3 在Spark shell中用scala语言编写spark程序
4.1.4 使用hdfs命令查看结果:hdfs dfs -cat hdfs://master01:9000/out/p*
说明:
• sc是SparkContext对象,该对象时提交spark程序的入口
• textFile(hdfs://master01:9000/RELEASE)是hdfs中读取数据
• flatMap(_.split(" "))先map在压平
• map((_,1))将单词和1构成元组
• reduceByKey(_+_)按照key进行reduce,并将value累加
• saveAsTextFile("hdfs:// master01:9000/out")将结果写入到hdfs中
5.在IDEA中编写WordCount程序
spark shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。
注意:在此只是核心的WorldCount示例和提交打包执行的命令
5.1 sparkcore_worldcount 代码示例
object sparkcore_worldcount {
def main(args: Array[String]): Unit = {
//新建sparkconf对象
// 在本地运行
val conf = new SparkConf().setMaster("local[*]").setAppName("sparkcore_worldcount")
//打jar包上集群运行
// val conf = new SparkConf().setAppName("sparkcore_worldcount")
//创建sparkcontext
val sc = new SparkContext(conf)
//读取数据
val textfile = sc.textFile("./WorldCount")
//按照空格进行切分
val worlds = textfile.flatMap(_.split(" "))
//转换为k v 结构
val k2v = worlds.map((_,1))
//将形同的key进行合并
val result = k2v.reduceByKey(_+_)
//输出结果
result.collect().foreach(println _)
//关闭连接
sc.stop()
}
}
5.2 打包提交执行
bin/spark-submit --class com.luomk.sparkcore_worldcount --master spark://hadoop102:7077 /opt/module/spark-2.1.1-bin-hadoop2.7/localjar/sparkcore_worldcount-1.0-SNAPSHOT.jar
6.常见提交spark程序执行示例(以WordCount为例)
--在Hdfs中统计WorldCount
sc.textFile("hdfs://hadoop103:9000/WorldCount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://hadoop103:9000/WorldCount_Out")
--在本地统计WorldCount
sc.textFile("/opt/module/datas/WorldCount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/opt/module/datas/WorldCount_Out")
--在控制台统计WorldCount
sc.textFile("/opt/module/datas/WorldCount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
--在服务器中打开spark命令行窗口
bin/spark-shell --master spark://hadoop102:7077,spark://hadoop103:7707
--在集群中运行jar包(注意:如果在集群中运行,需要将WorldCount源文件分发到集群的每一台机器)
bin/spark-submit --class com.luomk.sparkcore_worldcount --master spark://hadoop102:7077 /opt/module/spark-2.1.1-bin-hadoop2.7/localjar/sparkcore_worldcount-1.0-SNAPSHOT.jar