pyspark之DataFrame学习【dataFrame应用实例】(4)
1.准备源数据集
下载地址:https://github.com/drabastomek/learningPySpark
原始数据截图如下:
airport-code-na.txt
departuredelays.csv
首先通过制定数据集的文件路径位置以及使用SparkSession 导入数据集,来处理机场和飞行性能源数据集
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("dataFrameApply").getOrCreate() #设置文件路径 flightPerfFilePath = "../testData/flight-data/departuredelays.csv" airportsFilePath ="../testData/flight-data/airport-codes-na.txt" #获得机场数据集 airports = spark.read.csv(airportsFilePath,header='true',inferSchema='true',sep='\t') airports.registerTempTable("airports") #获得起飞延时数据集 flightPerf = spark.read.csv(flightPerfFilePath,header='true') flightPerf.registerTempTable("FlightPerformance") #缓存起飞延迟数据集 flightPerf.cache()2.连接飞行性能和机场
通过城市和起飞代码查询华盛顿州的航班延误总数
#通过城市和起飞代码查询航班延误的总数 #(华盛顿州) spark.sql("""select a.city,f.origin,sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.State = 'WA' group by a.City,f.origin order by sum(f.delay) desc""").show()+-------+------+--------+
| city|origin| Delays|
+-------+------+--------+
|Seattle| SEA|159086.0|
|Spokane| GEG| 12404.0|
| Pasco| PSC| 949.0|
+-------+------+--------+