Spark设置任务个数
今天使用spark读取hive的数据,然后保存到es,数据总共有,数据量1g左右,代码如下所示
package datasource import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch.spark.sql.EsSparkSQL object Data2ES { def main(args: Array[String]): Unit = { val conf = new SparkConf() val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) val options=Map( ("es.nodes", "192.168.111.75"), ("es.port", "9200"), ("es.index.auto.create", "true"), ("es.write.operation", "index") ) val order_index=sqlContext.sql("select * from order_index") EsSparkSQL.saveToEs(order_index,"order_index_3/order_3",options) sc.stop() } } |
通过spark-submit提交任务,设置4个executor,每个executor有4个核,每个executor的内存为4g,命令如下
spark-submit --master spark://server3:7077 --executor-memory 4g --executor-cores 4 --total-executor-cores 16 --name qwm --class datasource.Data2ES /opt/jar/zjsm.jar
保存在HDFS上的数据分成了8个块,因此作业分成了8个小作业,任务运行了两个多小时。为提高任务的运行效率,打算将任务分成80个小任务。通过查询知道可以通过设置spark.default.parallelism参数或者spark.sql.shuffle.partitions参数(这个主要是针对spark sql设置的),因此将提交作业的命令改成下面的命令,但是发现任务数还是没有改变,仍然是8个任务
spark-submit --master spark://server3:7077 --executor-memory 4g --executor-cores 4 --total-executor-cores 16 --conf spark.sql.shuffle.partitions=80 --name qwm --class datasource.Data2ES /opt/jar/zjsm.jar
后来在https://*.com/questions/38249624/how-to-change-partition-size-in-spark-sql这里看到另外一种设置任务数的方法
试了这种方式,仍然没有见效。仔细想了一下,应该可以通过改变分区数来控制任务个数,因此修改了代码,改变了分区数,使用这种方式确实改变了任务的个数
package datasource
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql.EsSparkSQL
object Data2ES {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val options=Map(
("es.nodes", "192.168.111.76"),
("es.port", "9200"),
("es.index.auto.create", "true"),
("es.write.operation", "index")
)
val order_index=sqlContext.sql("select * from order_index")
val rdd=order_index.rdd.repartition(80)
val schema=order_index.schema
val df=sqlContext.createDataFrame(rdd,schema)
EsSparkSQL.saveToEs(df,"order_index_3/order_3",options)
sc.stop()
}
}
|