从火花写入rdd弹性搜索失败
问题描述:
我试图在版本2.4.0
上的弹性云上针对Elastic Search上的弹性搜索编写一对rdd。 我正在使用elasticsearch-spark_2.10-2.4.0
插件来写入ES。 这里是我用写信给ES代码:从火花写入rdd弹性搜索失败
def predict_imgs(r):
import json
out_d = {}
out_d["pid"] = r["pid"]
out_d["other_stuff"] = r["other_stuff"]
return (r["pid"], json.dumps(out_d))
res2 = res1.map(predict_imgs)
es_write_conf = {
"es.nodes" : image_es,
#"es.port" : "9243",
"es.resource" : "index/type",
"es.nodes.wan.only":"True",
"es.write.operation":"upsert",
"es.mapping.id":"product_id",
"es.nodes.discovery" : "false",
"es.net.http.auth.user": "username",
"es.net.http.auth.pass": "pass",
"es.input.json": "true",
"es.http.timeout":"1m",
"es.scroll.size":"10",
"es.batch.size.bytes":"1mb",
"es.http.retries":"1",
"es.batch.size.entries":"5",
"es.batch.write.refresh":"False",
"es.batch.write.retry.count":"1",
"es.batch.write.retry.wait":"10s"}
res2.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
我得到的错误如下:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 744 in stage 26.0 failed 4 times, most recent failure: Lost task 744.3 in stage 26.0 (TID 2841, 10.181.252.29): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
有趣的部分是当我做了就第一个几本作品在RDD2的元素,然后建立一个新RDD了出来,并把它写入ES,它完美的作品:
x = sc.parallelize([res2.take(1)])
x.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
我使用的弹性云(弹性搜索的云服务)和Databricks(云Apache Spark的冒险) 难道ES不能跟上Spark写入ES的过程吗? 我将弹性云的大小从2GB RAM增加到8GB RAM。
有没有推荐配置的es_write_conf
我上面用过?任何其他confs
,你可以想到的? 更新到ES 5.0有帮助吗?
任何帮助表示赞赏。这几天一直在挣扎着。谢谢。
答
它看起来像pyspark计算的问题,而不是必需的elasticsearch保存过程。确保您的RDDS都OK方式:
- 表演
count()
上RDD1集(以 “物化” 的结果) - 上RDD2执行
count()
如果计数正常,尝试用缓存的结果保存到ES前:
res2.cache()
res2.count() # to fill the cache
res2.saveAsNewAPIHadoopFile(...
它的问题仍然存在,尝试看看死者遗嘱执行人标准错误和标准输出(你可以找到他们处于S执行人标签parkUI)。
我也注意到在es_write_conf
的非常小的批量大小,尝试增加到500或1000以获得更好的性能。