Spark程序需要很长时间才能完成执行
Spark应该以极快的速度完成数据处理。但我想我没有使用我的程序的正确功能来让Spark以这种方式工作。Spark程序需要很长时间才能完成执行
这是我的计划是如何的样子:
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyparsing import re
import time
start_time = time.time()
sc = SparkContext("local","test")
sqlContext = SQLContext(sc)
def func1(lines):
for line in lines:
qtype = re.search("qtype=(\S+)",line)
try:
url = re.search(" url=(\S+)",line)
url=url.group(1)
except:
url="null"
time = re.search("(^\S+)",line)
.... #extracting field names
row = Row(time = time.group(1),ttms = ttms.group(1),chi = chi.group(1),pssc = pssc.group(1),cqhm = cqhm.group(1),rtype = rtype.group(1),rdetails = rdetails.group(1),rurl = rurl.group(1),qtype = qtype.group(1),phn = phn.group(1),fqdn = fqdn,url = url)
return row
file = sc.textFile("C:\\Logs\\filename.log").cache()
result = file.map(lambda x: x.split("\n"))
lines = result.map(func1).collect()
df = sqlContext.createDataFrame(lines)
df.registerTempTable("df")
df.show()
for line in df :
query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect()
query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect()
query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect()
query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect()
df1 = sqlContext.createDataFrame(query1)
df2 = sqlContext.createDataFrame(query2)
df3 = sqlContext.createDataFrame(query3)
df4 = sqlContext.createDataFrame(query4)
df1.toPandas().to_csv("C:\\Sample logs\\sqlcsv1.csv")
df2.toPandas().to_csv("C:\\Sample logs\\sqlcsv2.csv")
df3.toPandas().to_csv("C:\\Sample logs\\sqlcsv3.csv")
df4.toPandas().to_csv("C:\\Sample logs\\sqlcsv4.csv")
print(time.time() - start_time)
这个程序需要将近200秒,执行这是一个很长的时间。我无法弄清楚原因。 (我的日志文件包含大约34k条日志行)。我试图用正则表达式的火花filter
,但我得到的错误rdd is not iterable
。所以我需要知道如何优化我的程序以使其运行速度更快。 另外,我得到stage x contains task of very large size
的警告。试过广播lines
,但发生了错误。
一些原因,这个火花的代码运行速度会变慢与纯Python代码:使用一台机器
sc = SparkContext("local","test")
蟒蛇火花集群上运行时,可能要执行比纯Python更好
1)。 在“本地”模式下,spark对纯python没有好处。使用“缓存”时,它不使用
file = sc.textFile("C:\\Logs\\filename.log").cache()
.cache()
2)应该使用仅当相同对象被调用多次。 “文件”只调用一次 - 即不需要使用缓存
3)“收集()”
lines = result.map(func1).collect()
for line in df :
query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect()
query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect()
query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect()
query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect()
的一般规则 - 避免使用“收集()”,除非是真的需要它。使用
4) “toPandas()”
df1.toPandas().to_csv("C:\\Sample logs\\sqlcsv1.csv")
df2.toPandas().to_csv("C:\\Sample logs\\sqlcsv2.csv")
df3.toPandas().to_csv("C:\\Sample logs\\sqlcsv3.csv")
df4.toPandas().to_csv("C:\\Sample logs\\sqlcsv4.csv")
“toPandas()” 实施开始的 “收集()” 执行(见#3)
由于火花2.0,你可以直接写数据帧到CSV:
http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
5)我不知道,我明白下面的代码:
for line in df :
query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect()
query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect()
query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect()
query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect()
你有什么努力实现“为DF行:”?
如果数据框包含100,000行,您计划执行这个“for-loop”100,000次吗?
似乎变量query1,query2,query3,query4将仅保存for循环的最后一次执行的结果(因为每次从“df”中读取新的“行”时,它们的值似乎都被覆盖“) - 是否有意?
6)可以直接从RDD
创建数据帧例如使用
sqlContext.createDataFrame
http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html
createDataFrame(**数据,模式=无,samplingRatio =无,verifySchema =真) **创建一个数据帧从RDD,列表或pandas.DataFrame 。
或
RDD.toDF()
http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html
toDF(*的cols)
返回一个新的类:数据帧,与新指定的列名 参数:列 - 新的列名列表(串)
>>> df.toDF('f1', 'f2').collect()
[Row(f1=2, f2=u'Alice'), Row(f1=5, f2=u'Bob')]
这是一个单箱或多节点你遇到的群集?你能否澄清代码中代表查询时间的地方?一些快速观察可能会有所帮助:1)'线'你需要运行一个收集? 2)你想在你的uber-dataframe中为每行创建一个DataFrame吗? –
您可以尝试解释一下,请问您正在尝试做什么?对我来说不是很清楚。数据的大小又是多少? DF中有多少条目? – eliasah
@DennyLee:这是我正在使用的一个盒子。我刚刚使用python的'time'函数来评估整个程序执行的时间。 1)我需要为'lines'使用collect,以便它被转换为列表并且可以从那里创建df。 2)我没有为每一行创建一个df,我只是从超级用户那里查询每一行。 – kaks