Pyspark sc.textFile()不能完全加载文件

问题描述:

我从Cloudera quickstart docker容器上的Python Spark(v 1.6.0)开始。 我把一个静态 .txt文件(500 MB)放在hdfs下的/user/root/access_log.txt文件中。Pyspark sc.textFile()不能完全加载文件

在pyspark我试着将文件加载用TE以下行的Python代码:

lines = sc.textFile("hdfs://quickstart.cloudera/user/root/access_log.txt") 

这让我没有错误。但我发现文件没有完全加载。 还..

lines.max() 

给出不是文件的正确最后一个元素,而实际上HDFS拥有正确的文件大小。

这是内存问题吗?我的码头设置设置为3840 MB。 我不知道如何解决这个问题。我期待着你的答案。

编辑:

我算在我的数据集的元素与:

lines.count() 

和我吃惊的是正确的!这应该表示我的文件已正确加载。但问题仍然是为什么.max()声明不返回正确的元素。

这与不同的任务有关吗?

编辑2: 从.txt文件

10.190.174.142 - - [03/Dec/2011:13:28:10 -0800] "GET /images/filmpics/0000/2229/GOEMON-NUKI-000163.jpg HTTP/1.1" 200 184976 
10.190.174.142 - - [03/Dec/2011:13:28:11 -0800] "GET /images/filmmediablock/360/GOEMON-NUKI-000163.jpg HTTP/1.1" 200 60117 
10.190.174.142 - - [03/Dec/2011:13:28:10 -0800] "GET /images/filmmediablock/360/Chacha.jpg HTTP/1.1" 200 109379 
10.190.174.142 - - [03/Dec/2011:13:28:11 -0800] "GET /images/filmmediablock/360/GOEMON-NUKI-000159.jpg HTTP/1.1" 200 161657 
+0

迈克嗨,该文件是静态的?常规日志文件通常不断更新最新事件。这可能是阅读与日志不匹配的原因。 –

+0

好问题,但文件是静态的:) –

几个例子线一般max不应该返回的(......)最后一个元素。在某些情况下,如果日志文件使用格式,强制执行字典顺序,并且对内容感到幸运,否则它不会发生。由于您的数据前缀为IP地址,并且使用不友好(例如ISO 8601)时间戳格式,因此获取最后一个元素不是您所期望的。找到最后一个元素

的方法之一是包括指数:

from operator import itemgetter 

(rdd 
    .zipWithIndex()    # Add line number to get (line, no) 
    .max(key=itemgetter(1))[0]) # Compare elements using index 

有点不同的办法是找到每个分区和最后一个元素,然后从这些最后一个。

from functools import reduce 

rdd.mapPartitions(lambda part: reduce(lambda _, x: [x], part, [])).collect()[-1] 

或者分区的数量很大:

(rdd 
    .mapPartitionsWithIndex(
     lambda i, part: reduce(lambda _, x: [(i, x)], part, [])) 
    .max()[1]) # Take max using tuple ordering