阅读分布式制表符分隔的CSV
问题描述:
受此启发question,我编写了一些代码来存储RDD(从Parquet文件中读取),其中包含(photo_id,data)的Schema(成对),并由制表符分隔,以及只是作为一个详细基地64编码,就像这样:阅读分布式制表符分隔的CSV
def do_pipeline(itr):
...
item_id = x.photo_id
def toTabCSVLine(data):
return '\t'.join(str(d) for d in data)
serialize_vec_b64pkl = lambda x: (x[0], base64.b64encode(cPickle.dumps(x[1])))
def format(data):
return toTabCSVLine(serialize_vec_b64pkl(data))
dataset = sqlContext.read.parquet('mydir')
lines = dataset.map(format)
lines.saveAsTextFile('outdir')
所以,现在的关注点:如何读取数据集和打印,例如它的反序列化的数据?
我正在使用Python 2.6.6。
我的企图就在这里,在这里只是证实一切可以做到的,我写了这个代码:
deserialize_vec_b64pkl = lambda x: (x[0], cPickle.loads(base64.b64decode(x[1])))
base64_dataset = sc.textFile('outdir')
collected_base64_dataset = base64_dataset.collect()
print(deserialize_vec_b64pkl(collected_base64_dataset[0].split('\t')))
这就要求collect(),这对于测试是确定的,但在现实世界方案将难以...
编辑:
当我试图zero323的建议:
foo = (base64_dataset.map(str.split).map(deserialize_vec_b64pkl)).collect()
我得到这个错误,这归结为:
PythonRDD[2] at RDD at PythonRDD.scala:43
16/08/04 18:32:30 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4, gsta31695.tan.ygrid.yahoo.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
UnpicklingError: NEWOBJ class argument has NULL tp_new
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/08/04 18:32:30 ERROR TaskSetManager: Task 12 in stage 0.0 failed 4 times; aborting job
16/08/04 18:32:31 WARN TaskSetManager: Lost task 14.3 in stage 0.0 (TID 38, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally)
16/08/04 18:32:31 WARN TaskSetManager: Lost task 13.3 in stage 0.0 (TID 39, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally)
16/08/04 18:32:31 WARN TaskSetManager: Lost task 16.3 in stage 0.0 (TID 42, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/homes/gsamaras/code/read_and_print.py in <module>()
17 print(base64_dataset.map(str.split).map(deserialize_vec_b64pkl))
18
---> 19 foo = (base64_dataset.map(str.split).map(deserialize_vec_b64pkl)).collect()
20 print(foo)
/home/gs/spark/current/python/lib/pyspark.zip/pyspark/rdd.py in collect(self)
769 """
770 with SCCallSiteSync(self.context) as css:
--> 771 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
772 return list(_load_from_socket(port, self._jrdd_deserializer))
773
/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814
815 for temp_arg in temp_args:
/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
答
让我们尝试一个简单的例子。为方便起见,我将使用便利的toolz
库,但在这里并不是必需的。
import sys
import base64
if sys.version_info < (3,):
import cPickle as pickle
else:
import pickle
from toolz.functoolz import compose
rdd = sc.parallelize([(1, {"foo": "bar"}), (2, {"bar": "foo"})])
现在,您的代码现在不是完全可移植的。在Python 2中,base64.b64encode
返回str
,而在Python 3中返回bytes
。让我们表明:
-
的Python 2
type(base64.b64encode(pickle.dumps({"foo": "bar"}))) ## str
-
的Python 3
type(base64.b64encode(pickle.dumps({"foo": "bar"}))) ## bytes
所以让我们添加到解码管线:
# Equivalent to
# def pickle_and_b64(x):
# return base64.b64encode(pickle.dumps(x)).decode("ascii")
pickle_and_b64 = compose(
lambda x: x.decode("ascii"),
base64.b64encode,
pickle.dumps
)
请注意,这不承担任何特定形状的数据。正因为如此,我们将使用mapValues
连载仅键:
serialized = rdd.mapValues(pickle_and_b64)
serialized.first()
## 1, u'KGRwMApTJ2ZvbycKcDEKUydiYXInCnAyCnMu')
现在我们可以用简单的格式遵循它并保存:
from tempfile import mkdtemp
import os
outdir = os.path.join(mkdtemp(), "foo")
serialized.map(lambda x: "{0}\t{1}".format(*x)).saveAsTextFile(outdir)
读取该文件,我们逆转这一过程:
# Equivalent to
# def b64_and_unpickle(x):
# return pickle.loads(base64.b64decode(x))
b64_and_unpickle = compose(
pickle.loads,
base64.b64decode
)
decoded = (sc.textFile(outdir)
.map(lambda x: x.split("\t")) # In Python 3 we could simply use str.split
.mapValues(b64_and_unpickle))
decoded.first()
## (u'1', {'foo': 'bar'})
为什么'base64_dataset.map(str.split).map(deserialize_vec_b64pkl)'? – zero323
@ zero323我不知道我们可以使用'str.split',但我仍然对此感到陌生,所以请和我一起裸露,我非常肯定,如果有人解释我将能够相处之后..所以你提出的建议应该是RDD ..所以为了确保一切正常,我如何查看第一个元素?我试图“收集()”你说的,但是导致了一个错误('Py4JJavaError:调用z:org.apache.spark.api.python.PythonRDD.collectAndServe.'时发生错误)。也许它可以帮助,如果我了解RDD的数据布局.. – gsamaras
@ zero323我使用Python 2,它将足以覆盖,我的意思是从那里我可以得到Python 3,如果需要的话! – gsamaras