Elasticsearch之批量处理。
- 检索多个文档
像Elasticsearch一样,检索多个文档依旧非常快。合并多个请求可以避免每个请求单独的网络开销。如果你需要从Elasticsearch中检索多个文档,相对于一个一个的检索,更快的方式是在一个请求中使用multi-get或者mget API。
mget API参数是一个docs数组,数组的每个节点定义一个文档_index、_type、_id元数据。如果你只想检索一个或几个确定的字段,也可以定义一个_source参数:
GET /_mget
{
"docs" : [
{ "_index" : "website",
"_type" : "blog",
"_id" : 2
},
{
"_index" : "website",
"_type" : "pageviews",
"_id" : 1,
"_source" : "views"
}
]
}
响应体也包含了doct数组,每个文档还包含一个响应,他们按照请求定义的顺序排列。每个这样的响应与单独使用get request响应体相同:
如果你想检索的文档在同一个_index中(甚至在同一个_type中),你就可以在URL中定义一个默认的/_index或者/_index/_type。
你依旧可以在单独的请求中使用这些值:
GET /website/blog/_mget
{
"docs" : [
{ "_id" : 2},
{ "_type" : "pageviews" , "_id" : 1}
]
}
事实上,如果所有文档具有相同_index和_type,你可以通过简单的ids数组来代替完整的docs数组:
GET /website/blog/_mget
{
"ids" : [ "2" , "1"]
}
注意到我们请求的第二个文档并不存在。我们定义了类型为blog,但是ID为1的文档类型为pageviews。这个不存在的文档会在响应体中被告知。
<1> 这个文档不存在
事实上,第二个文档不存在并不影响第一个文档的检索。每个文档的检索和报告都是独立的。
注意:尽管前面提到有一个文档没有被找到,但HTTP请求状态码还是200.事实上,就算所有文档都找不到,请求也还是返回200,原因是mget请求本身成功了。如果想知道每个文档是否都成功了,你需要检查found标志。
- 更省时的批量操作
就像mget允许我们一次性检索多个文档一样,bulk API允许我们使用单一请求来实现多个文档的create、index、update或delete。这对索引类似于日志活动这样的数据流非常有用,他们可以以成百上千的数据为一个批次按序进行索引。
bulk请求体如下,它有一点不通寻常:
{ action : { metadata}} \n
{ request body } \n
{ action : { metadata}} \n
{ request body } \n
...这种格式类似于用“\n” 符号连接起来的一行一行的JSON文档流(stream)。两个重要的点需要注意:
1、每行必须以“\n”符号结尾,包括最后一行。这些都是作为每行有效的分离而做的标记。
2、每一行的数据不能包含未被转义的换行符,他们会干扰分析——这意味着JSON不能被美化打印。
action/metadata 这一行定义了文档行为(what action)发生在哪个文档(which document)之上。
行为(action)必须是以下几种:
在索引、创建、更新或删除时必须制定文档的_index、_type、_id这些元数据(metadata)。
例如删除请求看起来像这样:
{"delete" : { "_index" : "website" , "_type" : "blog" , "_id" : "123" }}
请求体(request body)由文档的_source组成——文档所包含的一些字段以及其值。它被index和create操作所必须,这是有道理的:你必须提供文档来创建索引。
这些还被update操作所必须,而且请求体的组成应该与update API(doc,upsert,script等等)一致。删除操作不需要请求体(request body)。
{ "create" : { "_index" : "website" , "_type" : "blog" , "_id" : "123" } }
{ "title" : "My first blog post" }
如果定义_id,ID将会被自动创建:
{ "index" : { "_index" : "website" , "_type" : "blog" } }
{ "title" : "My second blog post" }
为了将这些放在一起,bulk请求表单是这样的:
POST /_bulk
{ "delete" : { "_index" : "website" , "_type" : "blog" , "_id" : "123" } } <1>
{ "create" : { "_index" : "website" , "_type" : "blog" , "_id" : "123" } }
{ "title" : "My first blog post" }
{ "index" : { "_index" : "website" , "_type" : "blog" } }
{ "title" : "My second blog post" }
{ "update" : { "_index" : "website" , "_type" : "blog", "_id" : "123" , "_retry_on_conflict" : 3 } }
{ "doc" : { "title" : "My updated blog post" } } <2>
<1> 注意delete行为(action)没有请求体,它紧接着另一个行为(action)
<2> 记得最后一个换行符
Elasticsearch响应包含一个items数组,它罗列了每一个请求的结果,结果的顺序与我们请求的顺序相同:
<1> 所有子请求都成功完成。
每个子请求都被独立的执行,所以一个子请求的错误并不影响其他请求。如果任何一个请求失败,顶层的error标记将被设置为true,然后错误的细节将在相应的请求中被报告:
POST /_bulk
{ "create" : { "_index" : "website" , "_type" : "blog" , "_id" : "123" } }
{ "title" : "Cannot create - it already exists" }
{ "index" : { "_index" : "website" , "_type" : "blog" , "_id" : "123" } }
{ "title" : "But we can update it"}
响应中我们将看到create文档123失败了,因为文档已经存在,但是后来的在123上执行的index请求成功了:
<1> 一个或多个请求失败。
<2> 这个请求的HTTP状态码被报告为 409 CONFLICT
<3> 错误消息说明了什么请求错误。
<4> 第二个请求成功了,状态码是 200 OK。
这些说明 bulk 请求不是原子操作——他们不能实现事务。每个请求操作是分开的,所以每个请求的成功与否不干扰其他操作。
- 不要重复
你可能在同一个index下的同一个type里批量索引日志数据。为每个文档指定相同的元数据是多余的。就像mget API,bulk请求也可以在URL中使用/_index或/_index/_type:
POST /website/_bulk
{ "index" : { "_type" : "log" } }
{ "event" : "User logged in" }
你依旧可以覆盖元数据行的_index和_type,在没有覆盖时它会使用URL中的值作为默认值:
POST /website/log/_bulk
{ "index" : {} }
{ "event" : " User logged in" }
{ "index" : { "_type" : "blog" } }
{ "title" : "Overriding the default type" }
- 多大才算太大?
整个批量请求需要被加载到接受我们请求节点的内存里,所以请求越大,给其他请求可用的内存就越小。有一个最佳的bulk请求大小。超过这个大小,性能不再提升而且可能降低。
最佳大小,当然并不是一个固定的数字。它完全取决于你的硬件、你文档的大小和复杂度以及索引和搜索的负载。幸运的是,这个最佳点(sweetspot)还是容易找到的:
试着批量索引标准的文档,随着大小的增长,当性能开始降低,说明你每个批次的大小太大了。开始的数量可以在1000~5000个文档之间,如果你的文档非常大,可以使用较小的批次。
通常着眼于你请求批次的物理大小是非常有用的。一千个1kb的文档和一千个1MB的文档大不相同。一个好的批次最好保持在5~15MB大小件。
- 为什么是奇怪的格式?
为什么bulk API需要带换行符的奇怪个时,而不是像 mget API一样使用JSON数组?
为了回答这个问题,我们需要简单的介绍一下背景:
批量中每个引用的文档属于不同的主分片,每个分片可能被分布于集群中的某个节点上。这意味着批量中的每个操作(action)需要被转发到对应的分片和节点上。
如果每个单独的请求被包装到JSON数组中,那意味着我们需要:
- 解析JSON为数组(包括文档数据,可能非常大)
- 检查每个请求决定应该到哪个分片上
- 为每个分片创建一个请求的数组
- 序列化这些数组为内部传输格式
- 发送请求到每个分片
这可行,但需要大量的RAM来承载本质上相同的数据,还要创建更多的数据结构使得JVM花更多的时间执行垃圾回收。
取而代之的,Elasticsearch则是网络缓冲区中一行一行的直接读取数据。它使用换行符识别和解析action/metadata行,以决定哪些分片来处理这个请求。
这些行请求直接转发到对应的分片上。这些没有冗余复制,没有多余的数据结构。整个请求过程使用最小的内存在进行。