Storm入门案例
1.Storm单机模式
Storm提供了单机开发模式,即使没有Storm集群也可以在本地java环境中进行开发测试。
2.入门案例
a.需求
利用Storm实现实时的单词统计
b.分析需求
c.创建java项目,导入Storm相关开发包
d.开发SentenceSpout
想要开发一个Spout需要写一个类实现 IComponent接口 和 ISpout接口,实现相应方法。
直接实现这两个接口需要实现的方法比较多,可以选择继承BaseRichSpout默认实现类,此类已经实现过如上两个接口,并将大部分方法做了默认空实现,只需继承者开发最重要的三个方法即可。而其它方法只需在需要时覆盖父类的默认空实现即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
package cn.tedu.storm;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;
public class SentenceSpout extends BaseRichSpout{
private String [] senteces = { "my name is park", "i am so shuai", "do you like me", "are you sure you do not like me", "ok i am sure" };
private SpoutOutputCollector collector = null;
/** * 初始化的方法,当当前spout被初始化时调用 * * conf:配置对象,包含配置信息,配置信息合并自集群配置和当前拓扑的配置 * context:上下文对象,代表当前spout运行的环境,可以通过该对象获取 任务 组件 拓扑相关的信息 * collector:用来发送tuple的对象,可以任意位置发送tuple,这个方法线程安全,通常保存在类的内部方便使用 */ @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; }
/** * storm会不停的调用此方法,来要求spout发送一个tuple。 * 如果有tuple则通过collector发送,如果没有也不要阻塞这个方法,直接返回即可。 * storm的底层在用一个死循环不停的调用 nextTuple ack fail方法 * 如果真的没有tuple要发送,在返回之前最好睡上一毫秒,以便于不至于浪费过多cpu。 */ int index = 0; @Override public void nextTuple() { if(index<senteces.length){ collector.emit(new Values(senteces[index])); index++; }else{ try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return; } }
/** * 声明当前组件发送的tuple的结构 * declarer:通过此对象声明 当前组件发送的tuple的结构 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); }
} |
e.开发SplitSentenceBolt
想要开发一个Bolt需要写一个类实现 IComponent接口 和 IBolt接口,实现相应方法。
直接实现这两个接口需要实现的方法比较多,可以选择继承BaseRichBolt默认实现类,此类已经实现过如上两个接口,并将大部分方法做了默认空实现,只需继承者开发最重要的三个方法即可。而其它方法只需在需要时覆盖父类的默认空实现即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package cn.tedu.storm;
import java.util.Map;
import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;
public class SplitSentenceBolt extends BaseRichBolt{ private OutputCollector collector = null;
/** * 初始化的方法,当当前Bolt被初始化时调用 * * stormConf:配置对象,包含配置信息,配置信息合并自集群配置和当前拓扑的配置 * context:上下文对象,代表当前bolt运行的环境,可以通过该对象获取 任务 组件 拓扑相关的信息 * collector:用来发送tuple的对象,可以任意位置发送tuple,这个方法线程安全,通常保存在类的内部方便使用 */ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; }
/** * 针对输入的每一个tuple都触发这个方法,对tuple进行处理。 * input:当前要处理的tuple,可以通过tuple获取到组件 流 任务相关信息。 */ @Override public void execute(Tuple input) { String sentence = input.getStringByField("sentence"); String words [] = sentence.split(" "); for(String word : words){ collector.emit(new Values(word)); } }
/** * 声明当前组件发送的tuple的结构 * declarer:通过此对象声明 当前组件发送的tuple的结构 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }
}
|
f.WordCountBolt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
package cn.tedu.storm;
import java.util.HashMap; import java.util.Map;
import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;
public class WordCountBolt extends BaseRichBolt { private OutputCollector collector = null;
@Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; }
private Map<String,Integer> map = new HashMap<>(); @Override public void execute(Tuple input) { String word = input.getStringByField("word"); map.put(word, map.containsKey(word) ? map.get(word)+1 : 1); collector.emit(new Values(word,map.get(word))); }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","count")); }
} |
g.ReportBolt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
package cn.tedu.storm;
import java.util.HashMap; import java.util.Map;
import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple;
public class ReportBolt extends BaseRichBolt { private OutputCollector collector = null;
@Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; }
private Map<String,Integer> map = new HashMap<>(); @Override public void execute(Tuple input) { String word = input.getStringByField("word"); int count = input.getIntegerByField("count"); map.put(word, count); System.out.println("--word:["+word+"]--count:["+count+"]--"); }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override public void cleanup() { System.out.println("==================="); for(Map.Entry<String, Integer> entry : map.entrySet()){ String word = entry.getKey(); int count = entry.getValue(); System.out.println("--["+word+"~"+count+"]--"); } System.out.println("==================="); } } |
h.WcTopology
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
package cn.tedu.storm;
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields;
public class WcTopology { public static void main(String[] args) throws Exception { //1.创建组件 SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt(); WordCountBolt wordCountBolt = new WordCountBolt(); ReportBolt reportBolt = new ReportBolt();
//2.创建拓扑构建者 TopologyBuilder builder = new TopologyBuilder();
//3.声明拓扑结构 builder.setSpout("Sentence_Spout", spout); builder.setBolt("Split_Sentence_Bolt", splitSentenceBolt) .shuffleGrouping("Sentence_Spout"); builder.setBolt("Word_Count_Bolt", wordCountBolt) .fieldsGrouping("Split_Sentence_Bolt", new Fields("word")); builder.setBolt("Report_Bolt", reportBolt) .globalGrouping("Word_Count_Bolt");
//4.创建拓扑 StormTopology topology = builder.createTopology();
//5.提交拓扑到集群中运行 //Config conf = new Config(); //StormSubmitter.submitTopology("Wc_Topology", conf, topology);
//5.提交拓扑到集群中运行 - 本地模拟 LocalCluster cluster = new LocalCluster(); Config conf = new Config(); cluster.submitTopology("Wc_Topology", conf, topology);
Thread.sleep(3000); cluster.killTopology("Wc_Topology"); cluster.shutdown(); } } |