Elasticsearch之批量处理。

  • 检索多个文档

        像Elasticsearch一样,检索多个文档依旧非常快。合并多个请求可以避免每个请求单独的网络开销。如果你需要从Elasticsearch中检索多个文档,相对于一个一个的检索,更快的方式是在一个请求中使用multi-get或者mget API。

        mget API参数是一个docs数组,数组的每个节点定义一个文档_index、_type、_id元数据。如果你只想检索一个或几个确定的字段,也可以定义一个_source参数:

        GET /_mget

        {

           "docs" : [

               {   "_index" : "website",

                   "_type" : "blog",

                   "_id" : 2

               },

               {

                  "_index" : "website",

                  "_type" : "pageviews",

                  "_id" : 1,

                  "_source" : "views"

               }

            ]

        }

         响应体也包含了doct数组,每个文档还包含一个响应,他们按照请求定义的顺序排列。每个这样的响应与单独使用get request响应体相同:

         Elasticsearch之批量处理。

         如果你想检索的文档在同一个_index中(甚至在同一个_type中),你就可以在URL中定义一个默认的/_index或者/_index/_type。

         你依旧可以在单独的请求中使用这些值:

         GET /website/blog/_mget

         {

            "docs" : [

               { "_id" : 2},

               { "_type" : "pageviews" , "_id" : 1}

            ]

         }

         事实上,如果所有文档具有相同_index和_type,你可以通过简单的ids数组来代替完整的docs数组:

         GET /website/blog/_mget

         {

             "ids" : [ "2" , "1"]

         }

         注意到我们请求的第二个文档并不存在。我们定义了类型为blog,但是ID为1的文档类型为pageviews。这个不存在的文档会在响应体中被告知。

         Elasticsearch之批量处理。

         <1> 这个文档不存在

        事实上,第二个文档不存在并不影响第一个文档的检索。每个文档的检索和报告都是独立的。

        注意:尽管前面提到有一个文档没有被找到,但HTTP请求状态码还是200.事实上,就算所有文档都找不到,请求也还是返回200,原因是mget请求本身成功了。如果想知道每个文档是否都成功了,你需要检查found标志。


  • 更省时的批量操作

        就像mget允许我们一次性检索多个文档一样,bulk API允许我们使用单一请求来实现多个文档的create、index、update或delete。这对索引类似于日志活动这样的数据流非常有用,他们可以以成百上千的数据为一个批次按序进行索引。

        bulk请求体如下,它有一点不通寻常:

        { action : { metadata}} \n

        { request body           } \n

        { action : { metadata}} \n

        { request body           } \n

         ...

        这种格式类似于用“\n”   符号连接起来的一行一行的JSON文档流(stream)。两个重要的点需要注意:

        1、每行必须以“\n”符号结尾,包括最后一行。这些都是作为每行有效的分离而做的标记。

        2、每一行的数据不能包含未被转义的换行符,他们会干扰分析——这意味着JSON不能被美化打印。

        action/metadata 这一行定义了文档行为(what action)发生在哪个文档(which document)之上。

        行为(action)必须是以下几种:

        Elasticsearch之批量处理。

         在索引、创建、更新或删除时必须制定文档的_index、_type、_id这些元数据(metadata)。

         例如删除请求看起来像这样:

         {"delete" : { "_index" : "website" , "_type" : "blog" , "_id" : "123" }}

         请求体(request body)由文档的_source组成——文档所包含的一些字段以及其值。它被index和create操作所必须,这是有道理的:你必须提供文档来创建索引。

         这些还被update操作所必须,而且请求体的组成应该与update API(doc,upsert,script等等)一致。删除操作不需要请求体(request body)。

         { "create" : { "_index" : "website" , "_type" : "blog" , "_id" : "123" } }

         { "title" : "My first blog post" }

         如果定义_id,ID将会被自动创建:

         { "index" : { "_index" : "website" , "_type" : "blog" } }

         { "title" : "My second blog post" }

         为了将这些放在一起,bulk请求表单是这样的:

         POST /_bulk

         { "delete" : { "_index" : "website" , "_type" : "blog" , "_id" : "123" } }                             <1>

         { "create" : { "_index" : "website" , "_type" : "blog" , "_id" : "123" } }

         { "title" : "My first blog post" }

         { "index" : { "_index" : "website" , "_type" : "blog" } }

         { "title" : "My second blog post" }

         { "update" : { "_index" : "website" , "_type" : "blog", "_id" : "123" , "_retry_on_conflict" : 3 } }

         { "doc" : { "title" : "My updated blog post" } }                                                                     <2>

         <1> 注意delete行为(action)没有请求体,它紧接着另一个行为(action)

         <2> 记得最后一个换行符

         Elasticsearch响应包含一个items数组,它罗列了每一个请求的结果,结果的顺序与我们请求的顺序相同:

         Elasticsearch之批量处理。

          <1> 所有子请求都成功完成。

         每个子请求都被独立的执行,所以一个子请求的错误并不影响其他请求。如果任何一个请求失败,顶层的error标记将被设置为true,然后错误的细节将在相应的请求中被报告:

         POST /_bulk

         { "create" : { "_index" : "website" , "_type" : "blog" , "_id" : "123" } }

         { "title" : "Cannot create - it already exists" }

         { "index" : { "_index" : "website" , "_type" : "blog" , "_id" : "123" } }

         { "title" : "But we can update it"}

        响应中我们将看到create文档123失败了,因为文档已经存在,但是后来的在123上执行的index请求成功了:

         Elasticsearch之批量处理。

         <1>  一个或多个请求失败。

         <2> 这个请求的HTTP状态码被报告为 409 CONFLICT

         <3> 错误消息说明了什么请求错误。

         <4> 第二个请求成功了,状态码是 200 OK。

         这些说明 bulk 请求不是原子操作——他们不能实现事务。每个请求操作是分开的,所以每个请求的成功与否不干扰其他操作。


  • 不要重复

         你可能在同一个index下的同一个type里批量索引日志数据。为每个文档指定相同的元数据是多余的。就像mget API,bulk请求也可以在URL中使用/_index或/_index/_type:

         POST /website/_bulk

         { "index" : {  "_type" : "log"  } }

         { "event" : "User logged in" }

        你依旧可以覆盖元数据行的_index和_type,在没有覆盖时它会使用URL中的值作为默认值:

        POST /website/log/_bulk

        { "index" : {} }

        { "event" : " User logged in" }

        { "index" : { "_type" : "blog" } }

        { "title" : "Overriding the default type" }


  • 多大才算太大?

        整个批量请求需要被加载到接受我们请求节点的内存里,所以请求越大,给其他请求可用的内存就越小。有一个最佳的bulk请求大小。超过这个大小,性能不再提升而且可能降低。

        最佳大小,当然并不是一个固定的数字。它完全取决于你的硬件、你文档的大小和复杂度以及索引和搜索的负载。幸运的是,这个最佳点(sweetspot)还是容易找到的:

        试着批量索引标准的文档,随着大小的增长,当性能开始降低,说明你每个批次的大小太大了。开始的数量可以在1000~5000个文档之间,如果你的文档非常大,可以使用较小的批次。

        通常着眼于你请求批次的物理大小是非常有用的。一千个1kb的文档和一千个1MB的文档大不相同。一个好的批次最好保持在5~15MB大小件。


  • 为什么是奇怪的格式?

        为什么bulk API需要带换行符的奇怪个时,而不是像 mget API一样使用JSON数组?

        为了回答这个问题,我们需要简单的介绍一下背景:

        批量中每个引用的文档属于不同的主分片,每个分片可能被分布于集群中的某个节点上。这意味着批量中的每个操作(action)需要被转发到对应的分片和节点上。

        如果每个单独的请求被包装到JSON数组中,那意味着我们需要:

  • 解析JSON为数组(包括文档数据,可能非常大)
  • 检查每个请求决定应该到哪个分片上
  • 为每个分片创建一个请求的数组
  • 序列化这些数组为内部传输格式
  • 发送请求到每个分片

        这可行,但需要大量的RAM来承载本质上相同的数据,还要创建更多的数据结构使得JVM花更多的时间执行垃圾回收。

        取而代之的,Elasticsearch则是网络缓冲区中一行一行的直接读取数据。它使用换行符识别和解析action/metadata行,以决定哪些分片来处理这个请求。

        这些行请求直接转发到对应的分片上。这些没有冗余复制,没有多余的数据结构。整个请求过程使用最小的内存在进行。