《Storm实时数据处理》一2.5 索引与持久化日志数据

本节书摘来华章计算机《Storm实时数据处理》一书中的第2章 ,第2.5节,(澳)Quinton Anderson 著 卢誉声 译更多章节内容可以访问云栖社区“华章计算机”公众号查看。

2.5 索引与持久化日志数据

我们需要在某些特定时期将日志数据存储起来以便后期利用,而且还要保证这些日志数据能够被检索。为了实现这个目标,本例中将会集成名为Elastic Search的开源产品,它是一个通用并集成RESTful API的集群搜索引擎(http://www.elasticsearch.org/)。

2.5.1 实战

Step01 创建一个继承自BaseRichBolt的IndexerBolt类,并声明org.elasticsearch.client.Client 私有成员变量。你需要在prepare方法中初始化它,代码如下:
《Storm实时数据处理》一2.5 索引与持久化日志数据

Step02 然后在Bolt的execute方法中建立LogEntry对象的索引:
ENTRY);
《Storm实时数据处理》一2.5 索引与持久化日志数据

Step03 创建这个Bolt的单元测试可能没那么容易,因此值得在这里对此多补充几句。在测试源码目录中的storm.cookbook.log包里新建一个 JUnit 4单元测试,然后添加StoringMatcher私有内部类,代码如下:

《Storm实时数据处理》一2.5 索引与持久化日志数据

Step04 然后实现真正的测试逻辑,代码如下:

《Storm实时数据处理》一2.5 索引与持久化日志数据

2.5.2 解析

Elastic Search提供了完整的Java客户端API(它本身也是用Java实现的),因此与它集成轻而易举。Bolt的prepare方法会在本地模式或集群模式下创建一个集群节点。集群模式会将根据名称获得的集群和在当前节点上创建的本地存储节点连接起来,这样就可以避免在使用不同传输方式进行写操作时发生的双跃点延迟问题。
Elastic Search本身是一个大型复杂系统,为了更好地理解操作和配置方面的问题,建议你先读一读它所提供的文档。
当Storm处于调试模式时,Elastic Search节点将连同多个在相同JVM中被执行的节点(如果需要的话)运行嵌入式集群。这显然对单元测试来说大有裨益。所有这些操作都会在Bolt的prepare方法中实现。
《Storm实时数据处理》一2.5 索引与持久化日志数据

当收到Tuple时,Bolt会获取其中的LogEntry对象,并将其转化成对应的JSON格式内容,然后发送给Elastic Search。
《Storm实时数据处理》一2.5 索引与持久化日志数据

接着从Elastic Search集群的响应(response)中获取日志ID,并连同LogEntry一起发送给下游的Bolt。在这个例子中,我们只将这个值用于单元测试。不管怎么说,我们都可以很容易地通过添加下游的Bolt来持久化这个值,进而将该值用于日志统计信息,这么做能为开发用户界面带来极大便利。
《Storm实时数据处理》一2.5 索引与持久化日志数据

对这个Bolt进行单元测试十分需要讲究技巧。这是因为对于一般的单元测试来说,我们在执行单元测试之前就已预知输出结果。但在这个示例中,只有在接收到Elastic Search集群的响应后,我们才能知道ID的具体值。这使得我们很难提前指定预期输出结果,更别提验证搜索引擎中的日志了。所以为了实现这个目标,我们使用了一个JMock自定义匹配器。在matches方法中实现了该自定义匹配器的主要逻辑。
《Storm实时数据处理》一2.5 索引与持久化日志数据

该方法能够确保返回Values的实例,并将其保存起来。我们会在后续的计算操作用到这个值。有了这个实例,就可以指定以下期望集合:
《Storm实时数据处理》一2.5 索引与持久化日志数据

并获取记录ID,根据嵌入式Elastic Search集群来验证它。
《Storm实时数据处理》一2.5 索引与持久化日志数据

若想要实现在集群中搜索日志文件的功能,可以从kibna.org下载并安装Kibana—一款优秀的日志搜索前端引擎。本例通过来自logstash的JSON日志结构来维护信息,而Kibana可作为Elastic Search上logstash的前端,所以它能与本例所使用的系统进行无缝集成。除此之外,Kibana还使用了Twitter Bootstrap 框架,因此你可以非常简单地将其集成到分析面板里。