SparkSQL(6)——Spark SQL JDBC
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame。
通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。
SparkSQL从MySQL中加载数据
package com.fgm.sparksql
import java.util.Properties
import org.apache.spark.sql.SparkSession
/**
*通过sparksql读取mysql表中的数据
*
* @Auther: fgm
*/
object DataFromMysql {
def main(args: Array[String]): Unit = {
//创建对象
val spark = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()
//通过sparkSession对象加载mysql中的数据
val url="jdbc:mysql://localhost:3306/spark"
//定义表名
val table="test"
//properties
val properties=new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123")
val jdbc = spark.read.jdbc(url,table,properties)
jdbc.printSchema()
jdbc.show()
jdbc.createTempView("test")
spark.sql("select * from test").show()
spark.stop()
}
}
SparkSQL向MySQL中写入数据
package com.fgm.sparksql
import java.util.Properties
import org.apache.spark.sql.SparkSession
/**
*通过sparksql把结果数据写入到mysql表
* @Auther: fgm
*/
case class User(val id:Int,val name:String,val age:Int)
object DataToMysql {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("DataToMysql").master("local[2]").getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
//读取数据文件
val RDD1 = sc.textFile("D:\\tmp\\user.txt").map(_.split(" "))
//将RDD与样例类关联
val userRDD = RDD1.map(x=>User(x(0).toInt,x(1),x(2).toInt))
//构建DataFrame
import spark.implicits._
val df = userRDD.toDF()
df.printSchema()
df.show()
df.createTempView("user")
val result = spark.sql("select * from user where age >30")
//定义表名
val table="user"
//将结果写入到mysql
//定义数据库url
val url="jdbc:mysql://localhost:3306/spark"
//properties
val properties=new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123")
result.write.mode("append").jdbc(url,table,properties)
//再将数据库中的数据读取出来,检查是否写入成功,也可以进行其他相关操作
//val jdbc=spark.read.jdbc(url,table,properties)
//jdbc.show()
spark.stop()
}
}
D:\tmp\user.txt
1 zhangsan 20
2 lisi 29
3 wangwu 33
4 zhaoliu 30
5 hahaha 44
未注释读取的代码时,数据如下:
并且查看数据库发现,新建的user表中已经有了数据。
注意:以上代码,都可以打成jar包之后在集群中运行。参数(如:文件url,以及table等,)可以通过args(0)等方式传入,不要写死在代码里。