获取Spark数据框列中最大值的最佳方法
我想弄清楚获取Spark数据框列中最大值的最佳方法。获取Spark数据框列中最大值的最佳方法
请看下面的例子:
df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
它创建:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
我的目标是要找到A列中的最大值(通过检查,这是3.0)。使用PySpark,这里是我能想到的四种方法:
# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
# Method 3: Use groupby()
df.groupby().max('A').collect()[0].asDict()['max(A)']
# Method 4: Convert to RDD
df.select("A").rdd.max()[0]
上述每一个给出了正确的答案,但在没有火花分析工具的我不能告诉这是最好的。
来自直觉或经验主义的任何想法,对于上述哪些方法在Spark运行时或资源使用方面最有效,或者是否存在比上述方法更直接的方法?
>df1.show()
+-----+--------------------+--------+----------+-----------+
|floor| timestamp| uid| x| y|
+-----+--------------------+--------+----------+-----------+
| 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418|
| 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393|
| 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585|
| 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|
>row1 = df1.agg({"x": "max"}).collect()[0]
>print row1
Row(max(x)=110.33613)
>print row1["max(x)"]
110.33613
答案与method3几乎相同。但似乎“asDict()”中的方法3,可以去除
有人可以解释为什么collect()[0]是需要的吗? – jibiel
@jibiel'collect()'返回一个列表(在本例中是一个单一的项目),所以你需要访问列表中的第一个(唯一的)项目 –
如果一些奇迹如何使用Scala的做,在这里你去(使用星火2.0 +):
scala> df.createOrReplaceTempView("TEMP_DF")
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF").
collect()(0).getInt(0)
scala> print(myMax)
117
的最大值一个数据帧的特定列可以通过使用来实现 -
your_max_value = df.agg({"your-column": "max"}).collect()[0][0]
备注:火花旨在对大数据工作 - 分布式计算。示例DataFrame的大小非常小,因此实际示例的顺序可以相对于小示例进行更改。
最慢:Method_1,因为.describe( “A”)来计算最小值,最大值,平均值,STDDEV和计数(5个计算在整个列)
介质:方法4,因为,.rdd(DF到RDD转换)减慢了过程。因为逻辑非常相似,所以Spark的催化剂优化器遵循非常相似的逻辑,并且操作次数最少(获取特定列的最大值,收集单值数据帧);而且, (.asDict()增加了一个小的额外时间比较3,2〜5)以毫秒为单位指定群的一个边缘节点上
import pandas as pd
import time
time_dict = {}
dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
#-- For bigger/realistic dataframe just uncomment the following 3 lines
#lst = list(np.random.normal(0.0, 100.0, 100000))
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst})
#dfff = self.sqlContext.createDataFrame(pdf)
tic1 = int(round(time.time() * 1000))
# Method 1: Use describe()
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
tac1 = int(round(time.time() * 1000))
time_dict['m1']= tac1 - tic1
print (max_val)
tic2 = int(round(time.time() * 1000))
# Method 2: Use SQL
dfff.registerTempTable("df_table")
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
tac2 = int(round(time.time() * 1000))
time_dict['m2']= tac2 - tic2
print (max_val)
tic3 = int(round(time.time() * 1000))
# Method 3: Use groupby()
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)']
tac3 = int(round(time.time() * 1000))
time_dict['m3']= tac3 - tic3
print (max_val)
tic4 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.select("A").rdd.max()[0]
tac4 = int(round(time.time() * 1000))
time_dict['m4']= tac4 - tic4
print (max_val)
tic5 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.agg({"A": "max"}).collect()[0][0]
tac5 = int(round(time.time() * 1000))
time_dict['m5']= tac5 - tic5
print (max_val)
print time_dict
结果(毫秒):
小DF(毫秒): 'm1':7096,'m2':205,'m3':165,'m4':211,'m5':180}
更大DF(ms):{'m1':10260,'m2 ':452,'m3':465,'m4':916,'m5':373}
方法2和3相同,使用相同的物理和优化逻辑计划。方法4将rdd中的max应用于reduce。它可能比直接在DataFrame上运行要慢。方法1或多或少相当于2和3. – zero323
@ zero323那么'df.select(max(“A”))。collect()[0] .asDict()['max(A)']'?看起来相当于方法2,而更紧凑,也更直观,方法3。 – desertnaut
- 最慢的是方法4,因为您对DF进行了整个列的RDD转换,然后提取最大值; –