SparkJDBC并行查询RDBMS数据库的参数方法
当通过spark读取mysql时,如果数据量比较大,为了加快速度,通常会起多个task并行拉取mysql数据。
api:
def jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame
Spark JDBC连接关系型数据库方式如下:
// 读出数据
val jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:mysql://dbserver:port/db")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 写入数据
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:mysql://dbserver:port/db")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 在写入时指定创建表列数据类型
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:mysql://dbserver:port/db", "schema.tablename", connectionProperties)
Spark通过JDBC读取关系型数据库,默认查全表,只有一个任务去执行查询操作,大量数据情况下,效率是很慢的,也经常会卡死。 这时,可以通过构造多个任务并行连接RDBMS提升效率, 如下:
Spark SQL JDBC属性介绍
增加SparkSQL参数来构造多个任务(并行)来提升效率:
参数名 | 含义 |
---|---|
url |
要连接的JDBC URL。可以在URL中指定特定于源的连接属性。例如,jdbc:postgresql://localhost/test?user=fred&password=secret 或者 jdbc:datadirect:greenplum// hostname:[ port ] [; property = value [; ...]] |
dbtable | 应该读取的JDBC表。请注意,可以使用在SQL查询的FROM子句中有效的任何内容。例如,您也可以在括号中使用子查询,而不是完整的表:
|
driver | 用于连接到此URL的JDBC驱动程序的类名 |
partitionColumn,lowerBound,upperBound | 如果指定了其中任何一个选项,则必须全部指定这些选项。它们描述了在从多个工作者并行读取时如何对表进行分区。partitionColumn必须是相关表中的数字列。请注意,lowerBound和upperBound仅用于决定分区步幅,而不是用于过滤表中的行。因此,表中的所有行都将被分区并返回。此选项为(选填) |
numPartitions | 表读取和写入中可用于并行的最大分区数。这还确定了最大并发JDBC连接数。如果要写入的分区数超过此限制,我们通过在写入之前调用coalesce(numPartitions)将其减少到此限制。此选项为(选填) |
更详细参数:JDBC到其他数据库
Ps:lowerBound和upperBound仅用于决定划分分区时的步长,而不是用于按照这两个值对数据进行过滤。 因此,无论这两个值如何设置,表中的所有行都将被读取。
同时需要注意的是,尽量不要创建太多分区,否则很容易将mysql搞挂。
关于具体的分区示例代码,参考如下(本部分代码参考spark源码org.apache.spark.sql.execution.datasources.jdbc中columnPartition方法 )。代码如下:
1.单分区模式
只需要传入JDBC URL、表名及对应的账号密码Properties即可。但是计算此DF的分区数后发现,这种不负责任的写法,并发数是1
Scala> jdbcDF.rdd.partitions.size=1
操作大数据集时,spark对MySQL的查询语句等同于可怕的:select * from table; ,而单个分区会把数据都集中在一个executor,当遇到较大数据集时,都会产生不合理的资源占用:MySQL可能hang住,spark可能会OOM,所以不推荐生产环境使用;
参考:Spark JDBC系列--取数的四种方式(https://www.jianshu.com/p/c18a8197e6bf)
Spark 并行查询RDBMS
通过增加 Task 数量来提升访问关系型数据(这里gp为例)的效率,大致有以下两种方法:
一、指定 numPartitions,partitionColumn,lowerBound 参数
val spark = SparkSession
.builder()
.config("spark.sql.warehouse.dir", warehouseLocation)
.appName("load data from gp test")
.getOrCreate()
// 开始时间
val startTime = System.currentTimeMillis()
val gpRDF = spark.read
.format("jdbc")
.option("driver", "com.pivotal.jdbc.GreenplumDriver")
.option("url", "jdbc:pivotal:greenplum://ip:port;DatabaseName=testdb")
.option("partitionColumn", "person_id")
.option("lowerBound", lowerBound)
.option("upperBound", upperBound)
.option("numPartitions", numPartitions)
.option("dbtable", "public.t_test")
.option("user", "gpadmin")
.option("password", "gpadmin")
.load()
二、"dbtable"参数构造子查询
不用 numPartitions,partitionColumn, lowerBound, upperBound,可以通过 dbtable 构造子查询,并行执行多个查询得到多个结果 RDD,最后通过 reduce 合并成一个 RDD。
Ps:针对表数据量很大且资源不足的情况很有用,分段对全量数据进行计算再汇总
val stride = Math.ceil(dataNums / numPartitions).toInt
val spark = SparkSession
.builder()
.config("spark.sql.warehouse.dir", warehouseLocation)
.appName("load data from gp")
.getOrCreate()
// 创建 numPartitions 个 task
val registerDF = Range(0, numPartitions)
.map(index => {
spark
.read
.format("jdbc")
.option("driver", "com.pivotal.jdbc.GreenplumDriver")
.option("url", "jdbc:pivotal:greenplum://ip:port;DatabaseName=testdb")
.option("dbtable", s"(SELECT * FROM public.t_test WHERE person_id > ${stride * index} AND person_id <= ${stride * (index + 1)}) AS t_tmp_${index}")
.option("user", "gpadmin")
.option("password", "gpadmin")
.load()
})
.reduce((rdd1, rdd2) => rdd1.union(rdd2))
总结
对于上述的两种方式:
第一种有时会造成数据分布不均匀,有些 task 数据量很大,有些 task 数据量几乎为 0,这是因为 Spark 是根据指定的分区列 partitionColumn 来进行分区,如果指定的 partitionColumn 不是连续的数(分布不均匀),那么每个 task 中的数据量就会分配不均匀;
第二种自定义 sql,相对可控,当然自定义也就意味着代码要稍微复杂。