指数一大熊猫据帧到Elasticsearch没有elasticsearch-PY

问题描述:

我想指数一堆大大熊猫dataframes(约400万行和50列)到Elasticsearch。指数一大熊猫据帧到Elasticsearch没有elasticsearch-PY

在寻找如何做到这一点的例子,大多数人都会用elasticsearch-py's bulk helper method,通过它的一个实例of the Elasticsearch class它处理的连接以及其上创建with pandas' dataframe.to_dict(orient='records') method词典列表。元数据可以预先作为新列插入到数据帧中,例如, df['_index'] = 'my_index'

但是,我有理由不使用elasticsearch-py库,并希望直接与Elasticsearch bulk API交谈,例如,通过requests或其他方便的HTTP库。此外,df.to_dict()是大dataframes,可惜很慢,一个数据帧转换为类型的字典列表,然后通过elasticsearch-PY序列化JSON听起来像是不必要的开销时,有类似dataframe.to_json()的速度非常快,即使在大dataframes。

什么会得到一个数据框大熊猫成大宗原料药所需要的格式的方便,快捷的方法呢?我认为,在正确的方向迈出的一步如下使用dataframe.to_json()

import pandas as pd 
df = pd.DataFrame.from_records([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}, {'a': 5, 'b': 6}]) 
df 
    a b 
0 1 2 
1 3 4 
2 5 6 
df.to_json(orient='records', lines=True) 
'{"a":1,"b":2}\n{"a":3,"b":4}\n{"a":5,"b":6}' 

现在这是一个新行分隔的JSON字符串,但是,它仍然缺乏的元数据。什么是表演方式让它在那里?

编辑: 为了完整性,元数据JSON文件将看起来像:

{"index": {"_index": "my_index", "_type": "my_type"}} 

因此,在端部通过本体API预期整个JSON看起来像 这(与另外的最后行之后换行符):

{"index": {"_index": "my_index", "_type": "my_type"}} 
{"a":1,"b":2} 
{"index": {"_index": "my_index", "_type": "my_type"}} 
{"a":3,"b":4} 
{"index": {"_index": "my_index", "_type": "my_type"}} 
{"a":5,"b":6} 
+0

您可以发布一个预期的元数据为您的样品DF? – MaxU

+0

当然,请看我的编辑。 – Dirk

+0

我不明白格式(结构) - 它不是一个有效的JSON。您是否可以尝试使用其批量API将此小型“JSON”加载到ElasticSearch中进行一些小测试? – MaxU

同时我发现了多种可能性如何做到这一点至少合理的速度:

import json 
import pandas as pd 
import requests 

# df is a dataframe or dataframe chunk coming from your reading logic 
df['_id'] = df['column_1'] + '_' + df['column_2'] # or whatever makes your _id 
df_as_json = df.to_json(orient='records', lines=True) 

final_json_string = '' 
for json_document in df_as_json.split('\n'): 
    jdict = json.loads(json_document) 
    metadata = json.dumps({'index': {'_id': jdict['_id']}}) 
    jdict.pop('_id') 
    final_json_string += metadata + '\n' + json.dumps(jdict) + '\n' 

headers = {'Content-type': 'application/json', 'Accept': 'text/plain'} 
r = requests.post('http://elasticsearch.host:9200/my_index/my_type/_bulk', data=final_json_string, headers=headers, timeout=60) 

代替使用熊猫to_json()方法,其中一个也可以使用to_dict()如下。这是在我的测试速度较慢,但​​没有多少:

dicts = df.to_dict(orient='records') 
final_json_string = '' 
for document in dicts: 
    metadata = {"index": {"_id": document["_id"]}} 
    document.pop('_id') 
    final_json_string += json.dumps(metadata) + '\n' + json.dumps(document) + '\n' 

当大数据集运行此,我们可以通过安装它,然后import ujson as jsonimport rapidjson as jsonujsonrapidjson更换Python的默认json库保存了两三分钟, 分别。

通过用并行执行替换顺序执行的步骤可以实现更大的加速,以便在请求等待Elasticsearch处理所有文档并返回响应时,读取和转换不会停止。这可以通过线程,多处理,Asyncio,任务队列完成......但这不在这个问题的范围之内。

如果你碰巧发现的方法以更快的速度做到JSON的转换,让我知道。

+0

只要看看这段代码,就可以序列化成json,然后再次反序列化循环。我想你可以通过使用'df.iterrows'来获得简单的加速,然后在行本身上只调用'to_json' – szxk

此功能插入大熊猫据帧到弹性搜索(由大块大块)

def insertDataframeIntoElastic(dataFrame,index='index', typ = 'test', server = 'http://localhost:9200', 
          chunk_size = 2000): 
    headers = {'content-type': 'application/x-ndjson', 'Accept-Charset': 'UTF-8'} 
    records = dataFrame.to_dict(orient='records') 
    actions = ["""{ "index" : { "_index" : "%s", "_type" : "%s"} }\n""" % (index, typ) +json.dumps(records[j]) 
        for j in range(len(records))] 
    i=0 
    while i<len(actions): 
     serverAPI = server + '/_bulk' 
     data='\n'.join(actions[i:min([i+chunk_size,len(actions)])]) 
     data = data + '\n' 
     r = requests.post(serverAPI, data = data, headers=headers) 
     print r.content 
     i = i+chunk_size