pyspark之DataFrame学习【dataFrame应用实例】(4)

1.准备源数据集

下载地址:https://github.com/drabastomek/learningPySpark

原始数据截图如下:

airport-code-na.txt

pyspark之DataFrame学习【dataFrame应用实例】(4)

departuredelays.csv

pyspark之DataFrame学习【dataFrame应用实例】(4)

首先通过制定数据集的文件路径位置以及使用SparkSession 导入数据集,来处理机场和飞行性能源数据集

from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("dataFrameApply").getOrCreate()

#设置文件路径
flightPerfFilePath = "../testData/flight-data/departuredelays.csv"
airportsFilePath ="../testData/flight-data/airport-codes-na.txt"
#获得机场数据集
airports = spark.read.csv(airportsFilePath,header='true',inferSchema='true',sep='\t')
airports.registerTempTable("airports")
#获得起飞延时数据集
flightPerf = spark.read.csv(flightPerfFilePath,header='true')
flightPerf.registerTempTable("FlightPerformance")
#缓存起飞延迟数据集
flightPerf.cache()
2.连接飞行性能和机场

通过城市和起飞代码查询华盛顿州的航班延误总数

#通过城市和起飞代码查询航班延误的总数
#(华盛顿州)
spark.sql("""select a.city,f.origin,sum(f.delay) as Delays
    from FlightPerformance f 
    join airports a 
    on a.IATA = f.origin 
    where a.State = 'WA' 
    group by a.City,f.origin 
    order by sum(f.delay) desc""").show()
+-------+------+--------+
|   city|origin|  Delays|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+