Storm入门案例

1.Storm单机模式

Storm提供了单机开发模式,即使没有Storm集群也可以在本地java环境中进行开发测试。

2.入门案例

  a.需求

        利用Storm实现实时的单词统计

  b.分析需求

Storm入门案例

Storm入门案例

c.创建java项目,导入Storm相关开发包

Storm入门案例

d.开发SentenceSpout

想要开发一个Spout需要写一个类实现 IComponent接口 和 ISpout接口,实现相应方法。

直接实现这两个接口需要实现的方法比较多,可以选择继承BaseRichSpout默认实现类,此类已经实现过如上两个接口,并将大部分方法做了默认空实现,只需继承者开发最重要的三个方法即可。而其它方法只需在需要时覆盖父类的默认空实现即可。

Storm入门案例

    1

    2

    3

    4

    5

    6

    7

    8

    9

   10

   11

   12

   13

   14

   15

   16

   17

   18

   19

   20

   21

   22

   23

   24

   25

   26

   27

   28

   29

   30

   31

   32

   33

   34

   35

   36

   37

   38

   39

   40

   41

   42

   43

   44

   45

   46

   47

   48

   49

   50

   51

   52

   53

   54

   55

   56

   57

   58

   59

   60

   61

   62

   63

   64

   65

   66

   67

package cn.tedu.storm;

 

import java.util.Map;

 

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

 

public class SentenceSpout extends BaseRichSpout{

        

        private String [] senteces = {

                "my name is park",

                "i am so shuai",

                "do you like me",

                "are you sure you do not like me",

                "ok i am sure"

        };

        

        private SpoutOutputCollector collector = null;

        

        /**

         * 初始化的方法,当当前spout被初始化时调用

         *

         * conf:配置对象,包含配置信息,配置信息合并自集群配置和当前拓扑的配置

         * context:上下文对象,代表当前spout运行的环境,可以通过该对象获取 任务 组件 拓扑相关的信息

         * collector:用来发送tuple的对象,可以任意位置发送tuple,这个方法线程安全,通常保存在类的内部方便使用

         */

        @Override

        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

                this.collector = collector;

        }

 

        /**

         * storm会不停的调用此方法,来要求spout发送一个tuple。

         * 如果有tuple则通过collector发送,如果没有也不要阻塞这个方法,直接返回即可。

         * storm的底层在用一个死循环不停的调用 nextTuple ack fail方法

         * 如果真的没有tuple要发送,在返回之前最好睡上一毫秒,以便于不至于浪费过多cpu。

         */

        int index = 0;

        @Override

        public void nextTuple() {

                if(index<senteces.length){

                        collector.emit(new Values(senteces[index]));

                        index++;

                }else{

                        try {

                                Thread.sleep(1);

                        } catch (InterruptedException e) {

                                e.printStackTrace();

                        }

                        return;

                }

        }

 

        /**

         * 声明当前组件发送的tuple的结构

         * declarer:通过此对象声明 当前组件发送的tuple的结构

         */

        @Override

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

                declarer.declare(new Fields("sentence"));

        }

        

}

 

e.开发SplitSentenceBolt

想要开发一个Bolt需要写一个类实现 IComponent接口 和 IBolt接口,实现相应方法。

直接实现这两个接口需要实现的方法比较多,可以选择继承BaseRichBolt默认实现类,此类已经实现过如上两个接口,并将大部分方法做了默认空实现,只需继承者开发最重要的三个方法即可。而其它方法只需在需要时覆盖父类的默认空实现即可。

Storm入门案例

    1

    2

    3

    4

    5

    6

    7

    8

    9

   10

   11

   12

   13

   14

   15

   16

   17

   18

   19

   20

   21

   22

   23

   24

   25

   26

   27

   28

   29

   30

   31

   32

   33

   34

   35

   36

   37

   38

   39

   40

   41

   42

   43

   44

   45

   46

   47

   48

   49

   50

package cn.tedu.storm;

 

import java.util.Map;

 

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

 

public class SplitSentenceBolt extends BaseRichBolt{

        private OutputCollector collector = null;

        

        /**

         * 初始化的方法,当当前Bolt被初始化时调用

         *

         * stormConf:配置对象,包含配置信息,配置信息合并自集群配置和当前拓扑的配置

         * context:上下文对象,代表当前bolt运行的环境,可以通过该对象获取 任务 组件 拓扑相关的信息

         * collector:用来发送tuple的对象,可以任意位置发送tuple,这个方法线程安全,通常保存在类的内部方便使用

         */

        @Override

        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

                this.collector = collector;

        }

 

        /**

         * 针对输入的每一个tuple都触发这个方法,对tuple进行处理。

         * input:当前要处理的tuple,可以通过tuple获取到组件 流 任务相关信息。

         */

        @Override

        public void execute(Tuple input) {

                String sentence = input.getStringByField("sentence");

                String words [] = sentence.split(" ");

                for(String word : words){

                        collector.emit(new Values(word));

                }

        }

 

        /**

         * 声明当前组件发送的tuple的结构

         * declarer:通过此对象声明 当前组件发送的tuple的结构

         */

        @Override

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

                declarer.declare(new Fields("word"));

        }

        

}

 

 

f.WordCountBolt

    1

    2

    3

    4

    5

    6

    7

    8

    9

   10

   11

   12

   13

   14

   15

   16

   17

   18

   19

   20

   21

   22

   23

   24

   25

   26

   27

   28

   29

   30

   31

   32

   33

   34

   35

package cn.tedu.storm;

 

import java.util.HashMap;

import java.util.Map;

 

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

 

public class WordCountBolt extends BaseRichBolt {

        private OutputCollector collector = null;

        

        @Override

        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

                this.collector = collector;

        }

 

        private Map<String,Integer> map = new HashMap<>();

        @Override

        public void execute(Tuple input) {

                String word = input.getStringByField("word");

                map.put(word, map.containsKey(word) ? map.get(word)+1 : 1);

                collector.emit(new Values(word,map.get(word)));

        }

 

        @Override

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

                declarer.declare(new Fields("word","count"));

        }

 

}

g.ReportBolt

    1

    2

    3

    4

    5

    6

    7

    8

    9

   10

   11

   12

   13

   14

   15

   16

   17

   18

   19

   20

   21

   22

   23

   24

   25

   26

   27

   28

   29

   30

   31

   32

   33

   34

   35

   36

   37

   38

   39

   40

   41

   42

   43

   44

package cn.tedu.storm;

 

import java.util.HashMap;

import java.util.Map;

 

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.tuple.Tuple;

 

public class ReportBolt extends BaseRichBolt {

        private OutputCollector collector = null;

        

        @Override

        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

                this.collector = collector;

        }

        

        private Map<String,Integer> map = new HashMap<>();

        @Override

        public void execute(Tuple input) {

                String word = input.getStringByField("word");

                int count = input.getIntegerByField("count");

                map.put(word, count);

                System.out.println("--word:["+word+"]--count:["+count+"]--");

        }

 

        @Override

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

                

        }

        

        @Override

        public void cleanup() {

                System.out.println("===================");

                for(Map.Entry<String, Integer> entry : map.entrySet()){

                        String word = entry.getKey();

                        int count = entry.getValue();

                        System.out.println("--["+word+"~"+count+"]--");

                }

                System.out.println("===================");

        }

}

 

h.WcTopology

    1

    2

    3

    4

    5

    6

    7

    8

    9

   10

   11

   12

   13

   14

   15

   16

   17

   18

   19

   20

   21

   22

   23

   24

   25

   26

   27

   28

   29

   30

   31

   32

   33

   34

   35

   36

   37

   38

   39

   40

   41

   42

   43

   44

   45

package cn.tedu.storm;

 

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.generated.StormTopology;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;

 

public class WcTopology {

        public static void main(String[] args) throws Exception {

                //1.创建组件

                SentenceSpout spout = new SentenceSpout();

                SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt();

                WordCountBolt wordCountBolt = new WordCountBolt();

                ReportBolt reportBolt = new ReportBolt();

                

                //2.创建拓扑构建者

                TopologyBuilder builder = new TopologyBuilder();

                

                //3.声明拓扑结构

                builder.setSpout("Sentence_Spout", spout);

                builder.setBolt("Split_Sentence_Bolt", splitSentenceBolt)

                        .shuffleGrouping("Sentence_Spout");

                builder.setBolt("Word_Count_Bolt", wordCountBolt)

                        .fieldsGrouping("Split_Sentence_Bolt", new Fields("word"));

                builder.setBolt("Report_Bolt", reportBolt)

                        .globalGrouping("Word_Count_Bolt");

                

                //4.创建拓扑

                StormTopology topology = builder.createTopology();

                

                //5.提交拓扑到集群中运行

                //Config conf = new Config();

                //StormSubmitter.submitTopology("Wc_Topology", conf, topology);

        

                //5.提交拓扑到集群中运行 - 本地模拟

                LocalCluster cluster = new LocalCluster();

                Config conf = new Config();

                cluster.submitTopology("Wc_Topology", conf, topology);

        

                Thread.sleep(3000);

                cluster.killTopology("Wc_Topology");

                cluster.shutdown();

        }

}