Storm
Storm概述
实时计算可以实时获取数据进行运算,得到计算结果,在很多实时性要求比较高的场景下有大量的应用.例如:微博热门话题榜单、电商网站实时推荐、地图路况信息。
实时计算和离线计算有较大的不同,实时计算强调实时性,数据不断流入,实时运算后结果实时反馈,实时计算没有数据积累的过程,有开始没有结束,如果不人为停止会一直运行下去。
Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流。
Storm支持水平扩展,具有高容错性,保证每个消息都会得到处理,而且处理速度很快(在一个小集群中,每个结点每秒可以处理数以百万计的消息)。
Storm的部署和运维都很便捷,而且更为重要的是可以使用多种编程语言来开发应用。
Storm核心组件
storm结构称为topology(拓扑),由stream(数据流),spout(喷嘴-数据流的生成者),bolt(阀门-数据流运算者)组成(参考图:Storm组成结构)。
1. Spout
Spout负责连接数据源,接收数据,转换为tuple向后发送,Spout只负责转化数据,不负责数据处理。Spout和业务是解耦的,一方面使Spout的逻辑简单清晰,另一方面因为没有耦合可以非常方便的实现Spout的复用。
2. Bolt
Bolt负责接收数据,执行运算,运算过后可以继续向后发送tuple,给其他零个或多个Bolt。
其中的运算包括 数据运算 数据的连接 数据写出等等功能
3. Topology
这样利用Spout和Bolt就可以组件起复杂的数据处理流网络,实现复杂的分布式实时运算。
这个由Spout和Bolt组成的复杂的数据流处理网络称之为一个topology。
4. Storm可靠性保证概述
Storm是分布式实时处理系统,需要在运算时保证数据处理的可靠。
分布式数据处理的可靠性,具有三个级别:
1) 至多一次
可能会丢,但不会多
2) 至少一次
可能会多,但不会丢
3) 恰好一次
不对丢,也不会多
Storm提供了完整的三个级别的可靠性保障。
至多一次
Storm不做任何可靠性相关的配置时,默认就是此可靠性级别。
也即,可能因网络不稳定 程序有异常等的情况下,造成数据丢失,会丢,但不会多。
至少一次
需要在数据发生丢失时,检测到数据丢失,重发数据。
Storm提供了ack-fail机制来实现重发,从而实现至少一次的语义。
a. 至少一次语义在Spout中的实现
i. 缓存发送过的数据
spout需要缓存发送过的数据,以便于在后续数据处理失败时,可以找到缓存的数据进行重发。
发送过的数据需要一直缓存,直到该数据在storm中完成了全部处理,才可以删除。
ii. 实现ack()fail()方法
spout需要实现ack()fail()方法
当某条数据处理成功时,storm会自动调用spout中的ack()方法,开发者可以在这个方法中执行数据处理成功时的逻辑,例如,删除缓存的该数据。
当某条数据处理失败时,storm会自动调用spout中的fail()方法,开发者可以在这个方法中执行数据处理失败时的逻辑,主要是重发该数据。
b. 至少一次语义在Bolt中的实现
i. 在发送子tuple时锚定父tuple
所谓锚定,就是在bolt中发送子tuple时,将子tuple派生自父tuple的关系信息维系在collector中,以便于后续子孙tuple出错时,可以根据关系信息,找到最初的祖先tuple进行重发。
ii. 在处理tuple成功或失败后告知storm
所谓的告知,就是在bolt处理tuple成功或失败时,通过调用collector.ack()和collector.fail()告知collector,collector会收集这个信息,当整个拓扑对tuple都成功处理时,collector调用spout的ack方法,而任何一个处理失败,告知到collector后,collector调用spout的fail方法,进行重发。
• 恰好一次
所有的数据要按照批发送
批带编号,且编号是递增的
Bolt分为Processor阶段 和 Commit阶段的Bolt,Processor阶段的Bolt 对批的处理随意并发,保证效率,而Commit阶段的Bolt 要求 批严格按照顺序处理,保证一起且一次的语义。
只需要在Commit阶段的Bolt中记录最后一次处理的批的编号,后续的批过来时,和保存的批的编号进行比较,如果小于等于保存的编号,则说明是重发的数据,抛弃,如果大于则是正常数据,进行处理,并更新保存的编号,以此,实现了恰好一次的语义。
Storm也提供了恰好一次的语义支持,为了实现这样的语义,Storm提供了一套新的api - TransactionTopology来实现。
a. TransactionSpout
需要写一个类继承BaseTransactionalSpout
在其中要求我们返回coordinater和emitter两个对象
略。
b. TransactionBolt
需要写一个类继承BaseBatchBolt
默认Bolt为Porcessor阶段的Bolt,可以通过实现ICommitter接口 或 通过setComitBolt方法将bolt声明为Commit阶段bolt