storm_简单的例子
写一个简单的topology,spout处理各单词,第一组bolt给各单词后面添加!!!,然后汇聚到同一个bolt并记录所有的单词及个数。
新建一个java项目,HelloStorm。新建一个lib文件夹。将storm0.9.5的lib全拷进去。并全Add to build path。
新建三个包:com.gary.topology、com.gary.spout、com.gary.bolt
- Spout
public class WordSpout extends BaseRichSpout{
SpoutOutputCollector _collector;
String[] _words = new String[]{"one", "two", "three", "four", "five"};
@Override
public void nextTuple() {
Utils.sleep(100);
final Random rand = new Random();
String word = _words[rand.nextInt(_words.length)];
_collector.emit(new Values(word));
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
2.Bolt
public class ExclamationBolt extends BaseRichBolt{
OutputCollector _collector;
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
_collector.emit(tuple, new Values(word + "!!!"));
_collector.ack(tuple);
}
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("ExclamationWord"));
}
}
public class MergeBolt extends BaseRichBolt{
OutputCollector _collector;
Map<String, Integer> _MergeWords = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("ExclamationWord");
if (!_MergeWords.containsKey(word)){
_MergeWords.put(word, 1);
}else{
_MergeWords.put(word, _MergeWords.get(word)+1);
}
_collector.ack(tuple);
System.out.println("--------------------");
Iterator iter = _MergeWords.keySet().iterator();
while (iter.hasNext()){
String keyword = (String) iter.next();
Integer nCount = (Integer) _MergeWords.get(keyword);
System.out.println(keyword + " : " + nCount);
}
}
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("MergeWord"));
}
}
3.Topology
public class WordToplogy {
public static void main(String[] args){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new WordSpout());
builder.setBolt("ExclamationBolt", new ExclamationBolt()).shuffleGrouping("word");
builder.setBolt("MergeBolt", new MergeBolt()).allGrouping("ExclamationBolt");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("word");
cluster.shutdown();
}
}
此处两个bolt,第一个使用的是shuffleGrouping分组,即随机分组。第二个bolt使用的是汇聚,即都进入同一个bolt。从而达到统计的目的。
也可以使用按字段分组,将统计的功能放到按字段分组的bolt中,再汇聚到一个bolt。那样的话bolt的写法不一样,后面其他例子再这么写。
在WordToplogy类中右键运行。10秒后运行结束,后台最后一次输出为:
--------------------
two!!! : 18
four!!! : 11
one!!! : 21
five!!! : 10
three!!! : 19
可见,五个word单词随机发出,经过第一个bolt处理后多了!!!,最后汇聚到了同一个bolt并输出。