使用java代码来实现动态提交spark任务到集群
场景
执行java代码的一个方法,这个动作触发提交spark任务到运行在yarn上的spark 集群
开始
Spark任务介绍
执行spark读取Hive中的一个表,这个表是用Hive来管理的HBASE表。统计这个表的总记录数。
具体代码如下:
objectTable_count { def main(args: Array[String]): Unit = { import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.{SparkConf} val sparkConf = new SparkConf().setAppName("test_hive_count").setMaster("yarn-client") val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() val sqlContext = sparkSession.sqlContext val sql = " select count(1) from slri_test.hiveslri_hbaselmriskapp " val result = sqlContext.sql(sql) result.write.format("jdbc").option("url", "jdbc:mysql://hadoop01:3306/test").option("dbtable", "result").option("user", "root").option("password", "Hadoop0!").mode(SaveMode.Overwrite).save() sparkSession.stop() } }
尖叫提示:
这种方式提交spark任务读取Hive表,创建spark入口使用sqlContext做为入口直接使用sparkSession不行,会报错找不到Hive对应表
javaAPI调用提交介绍
使用的是spark提供的SparkLauncher 的API来提交任务
具体代码如下:
import org.apache.spark.launcher.SparkLauncher; import java.io.IOException; import java.util.HashMap; /** * * Created by xiaoliu on 2018-5-3. */ public class SubmitMain { public static void main(String args[]) { System.setProperty("HADOOP_USER_NAME", "hdfs"); HashMap<String, String> map = new HashMap<String, String>(); map.put("HADOOP_CONF_DIR", "/etc/hadoop/conf"); map.put("YARN_CONF_DIR", "/etc/hadoop/conf"); map.put("SPARK_CONF_DIR", "/etc/spark2/conf"); map.put("SPARK_HOME", "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2"); map.put("JAVA_HOME","/usr/java/jdk1.8.0_144"); try { SparkLauncher spark = new SparkLauncher(map) .setDeployMode("client") .setAppResource("hdfs:///user/jars/spark_module-1.0-SNAPSHOT.jar") .setMainClass("com.sinosoft.Table_count") .setMaster("yarn-client") .setConf(SparkLauncher.DRIVER_MEMORY, "1g") .setConf(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH,"/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/jars/") .setConf(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH,"/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/") .setVerbose(true); // 启动spark任务 Process process =spark.launch(); InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input"); Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input"); inputThread.start(); InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error"); Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error"); errorThread.start(); System.out.println("Waiting for finish..."); int exitCode = process.waitFor(); System.out.println("Finished! Exit code:" + exitCode); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
尖叫提示:
在上文中配置的hadoop、yarn、spark的配置目录必须有,否则会抛出找不到目录异常
这其中用到了一个自定义的log日志输出类
package com.sinosoft.sparksubmit; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; /** * * Created by xiaoliu on 2018-5-5. */ public class InputStreamReaderRunnable implements Runnable { private BufferedReader reader; private String name; public InputStreamReaderRunnable(InputStream is, String name) { this.reader = new BufferedReader(new InputStreamReader(is)); this.name = name; } public void run() { System.out.println("InputStream " + name + ":"); try { String line = reader.readLine(); while (line != null) { System.out.println(line); line = reader.readLine(); } reader.close(); } catch (IOException e) { e.printStackTrace(); } } }
本次测试介绍
1. 将这两个部分分别打包成jar文件
2. 将spark任务的jar包上传至hdfs对应目录
3. 在集群中抽取一台节点做为client端提交任务,准备一个jars目录,将spark的提交任务用到的jar包复制到这个目录下
4. 启动运行提交jar包的类,使用命令java来执行这个动作
java -Djava.ext.dirs=/opt/jars/job -cp/opt/jars/job/web-1.0-SNAPSHOT.jar com.sinosoft.sparksubmit.SubmitMain
5. 运行结果:
结束标志我们在代码中写的