执行Spark程序

1.执行第一个spark程序(standalone)  

/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://hadoop102:7077 --executor-memory 1G --total-executor-cores 2 /opt/module//spark-2.1.1-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.1.jar 100

    参数说明:

    —master spark://hadoop102:7077指定Master的地址

    --executor-memory 1G 指定每个executor可用内存为1G

    --total-executor-cores 2 指定每个executor使用的cup核数为2个

 

2.执行第一个spark程序(yarn)  

/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client /opt/module/spark-2.1.1-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.1.jar  100

执行Spark程序

 

3.Spark应用提交

    一旦打包好,就可以使用bin/spark-submit脚本启动应用了. 这个脚本负责设置spark使用的classpath和依赖,支持不同类型的集群管理器和发布模式:

./bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

    一些常用选项:

    • --class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi)

    • --master: 集群的master URL (如 spark://23.195.26.187:7077)

    • --deploy-mode: 是否发布你的驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)*

    • --conf: 任意的Spark配置属性, 格式key=value. 如果值包含空格,可以加引号“key=value”. 缺省的Spark配置

    • application-jar: 打包好的应用jar,包含依赖. 这个URL在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path都包含同样的jar.

    • application-arguments: 传给main()方法的参数

    Master URL 可以是以下格式

执行Spark程序

    例如:bin/spark-submit --class com.luomk.sparkcore_worldcount --master spark://hadoop102:7077 /opt/module/spark-2.1.1-bin-hadoop2.7/localjar/sparkcore_worldcount-1.0-SNAPSHOT.jar

 

4.启动Spark Shell

    spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。

    启动Spark shell:/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-shell --master spark://hadoop102:7077 --executor-memory 2g --total-executor-cores 2

    注意:如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。

4.1 在Spark shell中编写WordCount程序

执行Spark程序

4.1.1 首先启动hdfs

执行Spark程序

4.1.2 将Spark目录下的RELEASE文件上传一个文件到hdfs://master01:9000/RELEASE

执行Spark程序

4.1.3 在Spark shell中用scala语言编写spark程序

执行Spark程序

4.1.4 使用hdfs命令查看结果:hdfs dfs -cat hdfs://master01:9000/out/p*

执行Spark程序

说明:

    • sc是SparkContext对象,该对象时提交spark程序的入口

    • textFile(hdfs://master01:9000/RELEASE)是hdfs中读取数据

    • flatMap(_.split(" "))先map在压平

    • map((_,1))将单词和1构成元组

    • reduceByKey(_+_)按照key进行reduce,并将value累加

    • saveAsTextFile("hdfs:// master01:9000/out")将结果写入到hdfs中

 

5.在IDEA中编写WordCount程序

    spark shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。

    注意:在此只是核心的WorldCount示例和提交打包执行的命令

5.1 sparkcore_worldcount 代码示例

object sparkcore_worldcount {
  def main(args: Array[String]): Unit = {
    //新建sparkconf对象
    // 在本地运行
     val conf = new SparkConf().setMaster("local[*]").setAppName("sparkcore_worldcount")
    //打jar包上集群运行
   // val conf = new SparkConf().setAppName("sparkcore_worldcount")
    //创建sparkcontext
    val sc = new SparkContext(conf)
    //读取数据
    val textfile = sc.textFile("./WorldCount")
    //按照空格进行切分
    val worlds = textfile.flatMap(_.split(" "))
    //转换为k v 结构
    val k2v = worlds.map((_,1))
    //将形同的key进行合并
    val result = k2v.reduceByKey(_+_)
    //输出结果
    result.collect().foreach(println _)
    //关闭连接
    sc.stop()
  }
}

5.2 打包提交执行

bin/spark-submit --class com.luomk.sparkcore_worldcount --master spark://hadoop102:7077 /opt/module/spark-2.1.1-bin-hadoop2.7/localjar/sparkcore_worldcount-1.0-SNAPSHOT.jar

 

6.常见提交spark程序执行示例(以WordCount为例)

--在Hdfs中统计WorldCount
sc.textFile("hdfs://hadoop103:9000/WorldCount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://hadoop103:9000/WorldCount_Out")

--在本地统计WorldCount
sc.textFile("/opt/module/datas/WorldCount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/opt/module/datas/WorldCount_Out")

--在控制台统计WorldCount
sc.textFile("/opt/module/datas/WorldCount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

--在服务器中打开spark命令行窗口
bin/spark-shell --master spark://hadoop102:7077,spark://hadoop103:7707

--在集群中运行jar包(注意:如果在集群中运行,需要将WorldCount源文件分发到集群的每一台机器)
bin/spark-submit --class com.luomk.sparkcore_worldcount --master spark://hadoop102:7077 /opt/module/spark-2.1.1-bin-hadoop2.7/localjar/sparkcore_worldcount-1.0-SNAPSHOT.jar