十四.Spark SQL总结之spark日志文件数据格式的转换
第一步.数据源
找到spark的日志数据源,在/root/spark/spark-2.0.2-bin-hadoop2.7/logs目录下:
通过对文件的读取,统计其中数据的条数:
val masterLog = sc.textFile("file:///root/spark/spark-2.0.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop000.out")
val workLog = sc.textFile("file:///root/spark/spark-2.0.2-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-hadoop000.out")
val allLog = sc.textFile("file:///root/spark/spark-2.0.2-bin-hadoop2.7/logs/*out*")
结果显示:
第二步.将读取的数据源转换成DataFrame
方式一(通过schema然后再转DataFrame)
import org.apache.spark.sql.Row
val masterRDD = masterLog.map(x => Row(x))
import org.apache.spark.sql.types._
val schemaString = "line"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = true))
val schema = StructType(fields)
val masterDF = spark.createDataFrame(masterRDD,schema)
masterDF.show
因为显示的数据部分隐藏了,如果不想隐藏需要使用下面的命令进行显示:
masterDF.show(false)
显示的结果:
方式二(通过sql的方式获取数据)
scala> masterDF.createOrReplaceTempView("master_logs")
scala> spark.sql("select * from master_logs limit 10").show(false)
显示结果:
第三步.parquet格式的文件转换成DataFrame类型的数据
方式一.以读取文件的形式
val userDF = spark.read.format("parquet").load("file:///root/spark/spark-2.0.2-bin-hadoop2.7/examples/src/main/resources/users.parquet")
userDF.show
方式二.以sql的形式读取parquet类型的文件
val userDF = spark.sql("select * from parquet.`file:///root/spark/spark-2.0.2-bin-hadoop2.7/examples/src/main/resources/users.parquet`")
userDF.show
第四步.拓展
- 读取HDFS中的数据:
val hdfsDF = sc.textFile("hdfs://path/file路径")
2)读取正常的文件:
val dataDF = spark.read.format("txt").load("文件路径")
以上就是关于读取外部数据源的各种形式…
每天的疲劳换来的是美好的未来,笑着面对每一天。