Storm实时数据处理

Storm实时数据处理
参考:http://ifeve.com/getting-started-with-stom-index/
一,storm概念 
        Storm是一个分布式的,可靠的,容错的数据流处理系统。它会把工作任务委托给不同类型的组件,每个组件负责处理一项简单特定的任务。Storm集群的输入流由一个被称作spout的组件管理,spout把数据传递给bolt, bolt要么把数据保存到某种存储器,要么把数据传递给其它的bolt。你可以想象一下,一个Storm集群就是在一连串的bolt之间转换spout传过来的数据。
       Storm 分布式计算结构称为 topology(拓扑),由 stream(数据流), spout(数据流的生成者), bolt(运算)组成。
Storm 的核心数据结构是 tuple。 tuple是 包 含 了 一 个 或 者 多 个 键 值 对 的 列 表,Stream 是 由 无 限 制 的 tuple 组 成 的 序 列。 
spout 代表了一个 Storm topology 的主要数据入口,充当采集器的角色,连接到数据源,将数据转化为一个个 tuple,并将 tuple 作为数据流进行发射。
        Storm框架主要由7部分组成:
        Topology:一个实时应用的计算任务被打包作为Topology发布,这同Hadoop的MapReduce任务相似。 
        Spout:Storm中的消息源,用于为Topology生产消息(数据),一般是从外部数据源(如Message Queue、RDBMS、NoSQL、                            Realtime Log)不间断地读取数据并发送给Topology消息(tuple元组)。 
        Bolt:Storm中的消息处理者,用于为Topology进行消息的处理Bolt可以执行过滤,聚合, 查询数据库等操作,而且可以一级一级的进                    行处理。 
        Stream:产生的数据(tuple元组)。 
        Stream grouping:在Bolt任务中定义的Stream进行区分。 
        Task:每个Spout或者Bolt在集群执行许多任务。 
        Worker:Topology跨一个或多个Worker节点的进程执行。
bolt 可以理解为计算程序中的运算或者函数,将一个或者多个数据流作为输入,对数据实施运算后,选择性地输出一个或者多个数据流。 bolt 可以订阅多个由 spout 或者其他bolt 发射的数据流,这样就可以建立复杂的数据流转换网络,它和mapreduce不同,拓扑一旦启动就会一直实时的分析处理,除非人为的将它的进程杀掉。
Storm实时数据处理

本例子单词计数 topology 的数据流大概是这样:
Storm实时数据处理

一,storm拓扑实现方法:
1,声明Storm中的消息源Spout需要实现IRichSpout接口中的方法
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
       1)第一个参数:配置对象,在定义topology对象是创建
       2)第二个参数:TopologyContext对象,包含所有拓扑数据
       3)第三个参数:SpoutOutputCollector对象,它能让我们发布交给bolts处理的数据
     IRichSpout接口中的open方法把待处理的数据交给指定的处理器bolt进行处理,这个方法里创建了一个FileReader对象,用来读取文件
      public void nextTuple(),通过此方法来获取下一个待处理元数据交给bolt处理器进行处理。
2,声明处理者组件Bolt需要实现backtype.storm.topology.IRichBolt接口中的方法
     public void execute(Tuple input)调用这个方法对元数据组进行逻辑处理(过滤,汇总)
      collector.ack(input);
      最后,每次都调用collector对象的ack()方法确认已成功处理了一个元组,确保数据处理的准确。
 3,创建主类:你可以在主类中创建拓扑和一个本地集群对象,以便于在本地测试和调试
      1)创建拓扑对象(它决定Storm如何安排各节点,以及它们交换数据的方式。)
           TopologyBuilder builder = new TopologyBuilder();
 
           builder.setSpout("word-reader", new WordReader());
 
           builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
 
           builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");
      在spout和bolts之间通过shuffleGrouping方法连接。这种分组方式决定了Storm会以随机分配方式从源节点向目标节点发送消息。
      2)创建一个包含拓扑配置的Config对象(它会在运行时与集群配置合并,并通过prepare方法发送给所有节点)
           Config conf = new Config();
 
           conf.put("wordsFile", args[0]);
 
           conf.setDebug(true);
      3)用一个LocalCluster对象运行这个拓扑,调用createTopology和submitTopology,运行拓扑,休眠两秒钟(拓扑在另外的线程运                  行),然后关闭集群。
            LocalCluster cluster = new LocalCluster();
 
            cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
 
            Thread.sleep(2000);
 
            cluster.shutdown();
项目搭建  (统计相同单词的个数)


//Spout实例 负责从文件按行读取文本,并把文本行提供给第一个bolt
public class WordReader implements IRichSpout {


private static final long serialVersionUID = 1L;
private boolean completed = false;
private static FileReader fileReader = null;
private SpoutOutputCollector collector;
private TopologyContext context;
     
/*
* ack(Acknowledgement)确认字符
* 在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。

*/
@Override
public void ack(Object msgId) {
      System.out.println("OK:"+msgId);
}


@Override
public void activate() {
// TODO Auto-generated method stub


}


@Override
public void close() {
// TODO Auto-generated method stub


}


@Override
public void deactivate() {
// TODO Auto-generated method stub


}

/*
* 返回确认失败字符
* 在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收有误。

*/
@Override
public void fail(Object msgId) {
System.out.println("FAIL:"+msgId);


}


/*
* 这个方法做的惟一做的事情就是分发文件中的文本行
* 这个方法会不断的被调用,直到整个文件都读完了,我们将等待并返回

*/
@Override
public void nextTuple() {
String str = "";
/*
* 没读取文件中的一行文本就停顿1秒,用于减轻处理器的处理压力
*/
if(completed){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
        BufferedReader reader = new BufferedReader(fileReader);
        try {
while((str = reader.readLine()) != null){
  /**
                 * 按行发布一个新值
                 */
this.collector.emit(new Values(str),str);
}
} catch (IOException e) {
throw new RuntimeException("Error reading tuple",e);
}finally {
completed = true;
}
}
  /**
     * 我们将创建一个文件并维持一个collector对象
     */
@SuppressWarnings("static-access")
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
   try {
               this.context = context;
               this.fileReader = new FileReader(conf.get("wordsFile").toString());
           } catch (FileNotFoundException e) {
               throw new RuntimeException("Error reading file ["+conf.get("words")+"]");
           }
           this.collector = collector;
}
/*
* 声明bolt的出参
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields("word"));


}


@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}


//将文本行会全部转化成小写,并切分它,从中得到所有单词。
public class WordNormalizer implements IRichBolt {


private static final long serialVersionUID = 1L;
private OutputCollector collector;


@Override
public void cleanup() {
// TODO Auto-generated method stub


}
  /**
     * *bolt*从单词文件接收到文本行,并标准化它。
     * 文本行会全部转化成小写,并切分它,从中得到所有单词。
    */
@Override
public void execute(Tuple input) {
  //获取文本中的一行字符串
      String sentence  =  input.getString(0);
      //以空格分割
      String[] words = sentence.split(" ");
      for(String word : words){
          word = word.trim();
          if(!word.isEmpty()){
              word=word.toLowerCase();
              //发布这个单词
              List a = new ArrayList();
              a.add(input);
              collector.emit(a,new Values(word));
          }
      }
      //对元组做出应答
      collector.ack(input);
}


@Override
public void prepare(Map arg0, TopologyContext context, OutputCollector collector) {
this.collector=collector;


}
/**
     * 这个*bolt*只会发布“word”域
     */
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));


}


@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}


}


//统计相同单词的个数
public class WordCounter implements IRichBolt {


private static final long serialVersionUID = 1L;
private OutputCollector collector;
Integer id;
String name;
    Map<String,Integer> counters;

    /**
     * 这个spout结束时(集群关闭的时候),我们会显示单词数量
     */
@Override
public void cleanup() {
   System.out.println("-- 单词数 【"+name+"-"+id+"】 --");
       for(Map.Entry<String,Integer> entry : counters.entrySet()){
           System.out.println(entry.getKey()+": "+entry.getValue());
       }
}
/**
     *  为每个单词计数
     */
@Override
public void execute(Tuple input) {
  String str=input.getString(0);
        /**
         * 如果单词尚不存在于map,我们就创建一个,如果已在,我们就为它加1
         */
        if(!counters.containsKey(str)){
        counters.put(str,1);
        }else{
            Integer c = counters.get(str) + 1;
            counters.put(str,c);
        }
        //对元组作为应答
        collector.ack(input);
}


@Override
public void prepare(Map arg0, TopologyContext context, OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
        this.collector = collector;
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();


}


@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub


}


@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}


}


//启动storm本地集群,调用创建好的spouts和bolts
public class TopologyMain {

//启动storm本地集群
public static void main(String[] args) {
//定义拓扑
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader", new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));


    //配置
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        conf.setDebug(false);


    //运行拓扑
         conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
        try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
        cluster.shutdown();
}
}


运行结果: