Spark程序需要很长时间才能完成执行

问题描述:

Spark应该以极快的速度完成数据处理。但我想我没有使用我的程序的正确功能来让S​​park以这种方式工作。Spark程序需要很长时间才能完成执行

这是我的计划是如何的样子:

from pyspark import SparkContext 
from pyspark import SQLContext 
from pyspark.sql.types import * 
from pyspark.sql import Row 
from pyparsing import re 
import time 

start_time = time.time() 
sc = SparkContext("local","test") 
sqlContext = SQLContext(sc) 

def func1(lines): 
    for line in lines: 
     qtype = re.search("qtype=(\S+)",line) 
     try: 
      url = re.search(" url=(\S+)",line) 
      url=url.group(1) 
     except: 
      url="null" 

     time = re.search("(^\S+)",line) 
     .... #extracting field names 
     row = Row(time = time.group(1),ttms = ttms.group(1),chi = chi.group(1),pssc = pssc.group(1),cqhm = cqhm.group(1),rtype = rtype.group(1),rdetails = rdetails.group(1),rurl = rurl.group(1),qtype = qtype.group(1),phn = phn.group(1),fqdn = fqdn,url = url) 

    return row 

file = sc.textFile("C:\\Logs\\filename.log").cache() 
result = file.map(lambda x: x.split("\n")) 
lines = result.map(func1).collect() 
df = sqlContext.createDataFrame(lines) 
df.registerTempTable("df") 
df.show() 

for line in df : 
    query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect() 
    query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect() 
    query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect() 
    query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect() 

df1 = sqlContext.createDataFrame(query1) 
df2 = sqlContext.createDataFrame(query2) 
df3 = sqlContext.createDataFrame(query3) 
df4 = sqlContext.createDataFrame(query4) 
df1.toPandas().to_csv("C:\\Sample logs\\sqlcsv1.csv") 
df2.toPandas().to_csv("C:\\Sample logs\\sqlcsv2.csv") 
df3.toPandas().to_csv("C:\\Sample logs\\sqlcsv3.csv") 
df4.toPandas().to_csv("C:\\Sample logs\\sqlcsv4.csv") 
print(time.time() - start_time) 

这个程序需要将近200秒,执行这是一个很长的时间。我无法弄清楚原因。 (我的日志文件包含大约34k条日志行)。我试图用正则表达式的火花filter,但我得到的错误rdd is not iterable。所以我需要知道如何优化我的程序以使其运行速度更快。 另外,我得到stage x contains task of very large size的警告。试过广播lines,但发生了错误。

+0

这是一个单箱或多节点你遇到的群集?你能否澄清代码中代表查询时间的地方?一些快速观察可能会有所帮助:1)'线'你需要运行一个收集? 2)你想在你的uber-dataframe中为每行创建一个DataFrame吗? –

+1

您可以尝试解释一下,请问您正在尝试做什么?对我来说不是很清楚。数据的大小又是多少? DF中有多少条目? – eliasah

+0

@DennyLee:这是我正在使用的一个盒子。我刚刚使用python的'time'函数来评估整个程序执行的时间。 1)我需要为'lines'使用collect,以便它被转换为列表并且可以从那里创建df。 2)我没有为每一行创建一个df,我只是从超级用户那里查询每一行。 – kaks

一些原因,这个火花的代码运行速度会变慢与纯Python代码:使用一台机器

sc = SparkContext("local","test") 

蟒蛇火花集群上运行时,可能要执行比纯Python更好

1)。 在“本地”模式下,spark对纯python没有好处。使用“缓存”时,它不使用

file = sc.textFile("C:\\Logs\\filename.log").cache() 

.cache()

2)应该使用仅当相同对象被调用多次。 “文件”只调用一次 - 即不需要使用缓存

3)“收集()”

lines = result.map(func1).collect() 

for line in df : 
    query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect() 
    query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect() 
    query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect() 
    query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect() 

的一般规则 - 避免使用“收集()”,除非是真的需要它。使用

4) “toPandas()”

df1.toPandas().to_csv("C:\\Sample logs\\sqlcsv1.csv") 
df2.toPandas().to_csv("C:\\Sample logs\\sqlcsv2.csv") 
df3.toPandas().to_csv("C:\\Sample logs\\sqlcsv3.csv") 
df4.toPandas().to_csv("C:\\Sample logs\\sqlcsv4.csv") 

“toPandas()” 实施开始的 “收集()” 执行(见#3)

由于火花2.0,你可以直接写数据帧到CSV:

http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) 

5)我不知道,我明白下面的代码:

for line in df : 
    query1 = sqlContext.sql("SELECT qtype,rtype,COUNT(qtype,rtype) from df GROUP BY qtype,rtype").collect() 
    query2 = sqlContext.sql("SELECT qtype, COUNT(qtype) from df GROUP BY qtype").collect() 
    query3 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df GROUP BY rtype").collect() 
    query4 = sqlContext.sql("SELECT rtype, COUNT(rtype) from df WHERE qtype = \"HTTP\" GROUP BY rtype").collect() 

你有什么努力实现“为DF行:”?

如果数据框包含100,000行,您计划执行这个“for-loop”100,000次吗?

似乎变量query1,query2,query3,query4将仅保存for循环的最后一次执行的结果(因为每次从“df”中读取新的“行”时,它们的值似乎都被覆盖“) - 是否有意?

6)可以直接从RDD

创建数据帧例如使用

sqlContext.createDataFrame 

http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html

createDataFrame(**数据,模式=无,samplingRatio =无,verifySchema =真) **创建一个数据帧从RDD,列表或pandas.DataFrame 。

RDD.toDF() 

http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html

toDF(*的cols)

返回一个新的类:数据帧,与新指定的列名 参数:列 - 新的列名列表(串)

>>> df.toDF('f1', 'f2').collect() 
[Row(f1=2, f2=u'Alice'), Row(f1=5, f2=u'Bob')] 
+0

2)和5)非常有效。谢谢!我像5)中所说的那样删除了for循环。 我使用'collect'的原因是RDD将被转换为列表。只有这样我才能创建一个df。否则会引发错误。有另一种方法吗? 我认为如果使用熊猫,转换为CSV更容易。 现在我只使用本地机器。稍后我会开始使用集群。 – kaks

+0

@kaks - 我已经更新了我的答案,关于df.write.csv()函数。 – Yaron

+0

@kaks - 添加了关于直接从RDD创建Dataframe的部分 – Yaron