Storm实时数据处理
Storm实时数据处理
参考:http://ifeve.com/getting-started-with-stom-index/
一,storm概念
Storm是一个分布式的,可靠的,容错的数据流处理系统。它会把工作任务委托给不同类型的组件,每个组件负责处理一项简单特定的任务。Storm集群的输入流由一个被称作spout的组件管理,spout把数据传递给bolt, bolt要么把数据保存到某种存储器,要么把数据传递给其它的bolt。你可以想象一下,一个Storm集群就是在一连串的bolt之间转换spout传过来的数据。
Storm 分布式计算结构称为 topology(拓扑),由 stream(数据流), spout(数据流的生成者), bolt(运算)组成。
Storm 的核心数据结构是 tuple。 tuple是 包 含 了 一 个 或 者 多 个 键 值 对 的 列 表,Stream 是 由 无 限 制 的 tuple 组 成 的 序 列。
spout 代表了一个 Storm topology 的主要数据入口,充当采集器的角色,连接到数据源,将数据转化为一个个 tuple,并将 tuple 作为数据流进行发射。
Storm框架主要由7部分组成:
Topology:一个实时应用的计算任务被打包作为Topology发布,这同Hadoop的MapReduce任务相似。
Spout:Storm中的消息源,用于为Topology生产消息(数据),一般是从外部数据源(如Message Queue、RDBMS、NoSQL、 Realtime Log)不间断地读取数据并发送给Topology消息(tuple元组)。
Bolt:Storm中的消息处理者,用于为Topology进行消息的处理Bolt可以执行过滤,聚合, 查询数据库等操作,而且可以一级一级的进 行处理。
Stream:产生的数据(tuple元组)。
Stream grouping:在Bolt任务中定义的Stream进行区分。
Task:每个Spout或者Bolt在集群执行许多任务。
Worker:Topology跨一个或多个Worker节点的进程执行。
bolt 可以理解为计算程序中的运算或者函数,将一个或者多个数据流作为输入,对数据实施运算后,选择性地输出一个或者多个数据流。 bolt 可以订阅多个由 spout 或者其他bolt 发射的数据流,这样就可以建立复杂的数据流转换网络,它和mapreduce不同,拓扑一旦启动就会一直实时的分析处理,除非人为的将它的进程杀掉。
本例子单词计数 topology 的数据流大概是这样:
一,storm拓扑实现方法:
1,声明Storm中的消息源Spout需要实现IRichSpout接口中的方法
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
1)第一个参数:配置对象,在定义topology对象是创建
2)第二个参数:TopologyContext对象,包含所有拓扑数据
3)第三个参数:SpoutOutputCollector对象,它能让我们发布交给bolts处理的数据
IRichSpout接口中的open方法把待处理的数据交给指定的处理器bolt进行处理,这个方法里创建了一个FileReader对象,用来读取文件
public void nextTuple(),通过此方法来获取下一个待处理元数据交给bolt处理器进行处理。
2,声明处理者组件Bolt需要实现backtype.storm.topology.IRichBolt接口中的方法
public void execute(Tuple input)调用这个方法对元数据组进行逻辑处理(过滤,汇总)
collector.ack(input);
最后,每次都调用collector对象的ack()方法确认已成功处理了一个元组,确保数据处理的准确。
3,创建主类:你可以在主类中创建拓扑和一个本地集群对象,以便于在本地测试和调试
1)创建拓扑对象(它决定Storm如何安排各节点,以及它们交换数据的方式。)
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");
在spout和bolts之间通过shuffleGrouping方法连接。这种分组方式决定了Storm会以随机分配方式从源节点向目标节点发送消息。
2)创建一个包含拓扑配置的Config对象(它会在运行时与集群配置合并,并通过prepare方法发送给所有节点)
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(true);
3)用一个LocalCluster对象运行这个拓扑,调用createTopology和submitTopology,运行拓扑,休眠两秒钟(拓扑在另外的线程运 行),然后关闭集群。
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();
项目搭建 (统计相同单词的个数)
//Spout实例 负责从文件按行读取文本,并把文本行提供给第一个bolt
public class WordReader implements IRichSpout {
private static final long serialVersionUID = 1L;
private boolean completed = false;
private static FileReader fileReader = null;
private SpoutOutputCollector collector;
private TopologyContext context;
/*
* ack(Acknowledgement)确认字符
* 在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。
*
*/
@Override
public void ack(Object msgId) {
System.out.println("OK:"+msgId);
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
/*
* 返回确认失败字符
* 在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收有误。
*
*/
@Override
public void fail(Object msgId) {
System.out.println("FAIL:"+msgId);
}
/*
* 这个方法做的惟一做的事情就是分发文件中的文本行
* 这个方法会不断的被调用,直到整个文件都读完了,我们将等待并返回
*
*/
@Override
public void nextTuple() {
String str = "";
/*
* 没读取文件中的一行文本就停顿1秒,用于减轻处理器的处理压力
*/
if(completed){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
BufferedReader reader = new BufferedReader(fileReader);
try {
while((str = reader.readLine()) != null){
/**
* 按行发布一个新值
*/
this.collector.emit(new Values(str),str);
}
} catch (IOException e) {
throw new RuntimeException("Error reading tuple",e);
}finally {
completed = true;
}
}
/**
* 我们将创建一个文件并维持一个collector对象
*/
@SuppressWarnings("static-access")
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file ["+conf.get("words")+"]");
}
this.collector = collector;
}
/*
* 声明bolt的出参
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
//将文本行会全部转化成小写,并切分它,从中得到所有单词。
public class WordNormalizer implements IRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
/**
* *bolt*从单词文件接收到文本行,并标准化它。
* 文本行会全部转化成小写,并切分它,从中得到所有单词。
*/
@Override
public void execute(Tuple input) {
//获取文本中的一行字符串
String sentence = input.getString(0);
//以空格分割
String[] words = sentence.split(" ");
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word=word.toLowerCase();
//发布这个单词
List a = new ArrayList();
a.add(input);
collector.emit(a,new Values(word));
}
}
//对元组做出应答
collector.ack(input);
}
@Override
public void prepare(Map arg0, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
/**
* 这个*bolt*只会发布“word”域
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
//统计相同单词的个数
public class WordCounter implements IRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
Integer id;
String name;
Map<String,Integer> counters;
/**
* 这个spout结束时(集群关闭的时候),我们会显示单词数量
*/
@Override
public void cleanup() {
System.out.println("-- 单词数 【"+name+"-"+id+"】 --");
for(Map.Entry<String,Integer> entry : counters.entrySet()){
System.out.println(entry.getKey()+": "+entry.getValue());
}
}
/**
* 为每个单词计数
*/
@Override
public void execute(Tuple input) {
String str=input.getString(0);
/**
* 如果单词尚不存在于map,我们就创建一个,如果已在,我们就为它加1
*/
if(!counters.containsKey(str)){
counters.put(str,1);
}else{
Integer c = counters.get(str) + 1;
counters.put(str,c);
}
//对元组作为应答
collector.ack(input);
}
@Override
public void prepare(Map arg0, TopologyContext context, OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
//启动storm本地集群,调用创建好的spouts和bolts
public class TopologyMain {
//启动storm本地集群
public static void main(String[] args) {
//定义拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));
//配置
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
//运行拓扑
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cluster.shutdown();
}
}
运行结果:
参考:http://ifeve.com/getting-started-with-stom-index/
一,storm概念
Storm是一个分布式的,可靠的,容错的数据流处理系统。它会把工作任务委托给不同类型的组件,每个组件负责处理一项简单特定的任务。Storm集群的输入流由一个被称作spout的组件管理,spout把数据传递给bolt, bolt要么把数据保存到某种存储器,要么把数据传递给其它的bolt。你可以想象一下,一个Storm集群就是在一连串的bolt之间转换spout传过来的数据。
Storm 分布式计算结构称为 topology(拓扑),由 stream(数据流), spout(数据流的生成者), bolt(运算)组成。
Storm 的核心数据结构是 tuple。 tuple是 包 含 了 一 个 或 者 多 个 键 值 对 的 列 表,Stream 是 由 无 限 制 的 tuple 组 成 的 序 列。
spout 代表了一个 Storm topology 的主要数据入口,充当采集器的角色,连接到数据源,将数据转化为一个个 tuple,并将 tuple 作为数据流进行发射。
Storm框架主要由7部分组成:
Topology:一个实时应用的计算任务被打包作为Topology发布,这同Hadoop的MapReduce任务相似。
Spout:Storm中的消息源,用于为Topology生产消息(数据),一般是从外部数据源(如Message Queue、RDBMS、NoSQL、 Realtime Log)不间断地读取数据并发送给Topology消息(tuple元组)。
Bolt:Storm中的消息处理者,用于为Topology进行消息的处理Bolt可以执行过滤,聚合, 查询数据库等操作,而且可以一级一级的进 行处理。
Stream:产生的数据(tuple元组)。
Stream grouping:在Bolt任务中定义的Stream进行区分。
Task:每个Spout或者Bolt在集群执行许多任务。
Worker:Topology跨一个或多个Worker节点的进程执行。
bolt 可以理解为计算程序中的运算或者函数,将一个或者多个数据流作为输入,对数据实施运算后,选择性地输出一个或者多个数据流。 bolt 可以订阅多个由 spout 或者其他bolt 发射的数据流,这样就可以建立复杂的数据流转换网络,它和mapreduce不同,拓扑一旦启动就会一直实时的分析处理,除非人为的将它的进程杀掉。
本例子单词计数 topology 的数据流大概是这样:
一,storm拓扑实现方法:
1,声明Storm中的消息源Spout需要实现IRichSpout接口中的方法
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
1)第一个参数:配置对象,在定义topology对象是创建
2)第二个参数:TopologyContext对象,包含所有拓扑数据
3)第三个参数:SpoutOutputCollector对象,它能让我们发布交给bolts处理的数据
IRichSpout接口中的open方法把待处理的数据交给指定的处理器bolt进行处理,这个方法里创建了一个FileReader对象,用来读取文件
public void nextTuple(),通过此方法来获取下一个待处理元数据交给bolt处理器进行处理。
2,声明处理者组件Bolt需要实现backtype.storm.topology.IRichBolt接口中的方法
public void execute(Tuple input)调用这个方法对元数据组进行逻辑处理(过滤,汇总)
collector.ack(input);
最后,每次都调用collector对象的ack()方法确认已成功处理了一个元组,确保数据处理的准确。
3,创建主类:你可以在主类中创建拓扑和一个本地集群对象,以便于在本地测试和调试
1)创建拓扑对象(它决定Storm如何安排各节点,以及它们交换数据的方式。)
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");
在spout和bolts之间通过shuffleGrouping方法连接。这种分组方式决定了Storm会以随机分配方式从源节点向目标节点发送消息。
2)创建一个包含拓扑配置的Config对象(它会在运行时与集群配置合并,并通过prepare方法发送给所有节点)
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(true);
3)用一个LocalCluster对象运行这个拓扑,调用createTopology和submitTopology,运行拓扑,休眠两秒钟(拓扑在另外的线程运 行),然后关闭集群。
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();
项目搭建 (统计相同单词的个数)
//Spout实例 负责从文件按行读取文本,并把文本行提供给第一个bolt
public class WordReader implements IRichSpout {
private static final long serialVersionUID = 1L;
private boolean completed = false;
private static FileReader fileReader = null;
private SpoutOutputCollector collector;
private TopologyContext context;
/*
* ack(Acknowledgement)确认字符
* 在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误。
*
*/
@Override
public void ack(Object msgId) {
System.out.println("OK:"+msgId);
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
/*
* 返回确认失败字符
* 在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收有误。
*
*/
@Override
public void fail(Object msgId) {
System.out.println("FAIL:"+msgId);
}
/*
* 这个方法做的惟一做的事情就是分发文件中的文本行
* 这个方法会不断的被调用,直到整个文件都读完了,我们将等待并返回
*
*/
@Override
public void nextTuple() {
String str = "";
/*
* 没读取文件中的一行文本就停顿1秒,用于减轻处理器的处理压力
*/
if(completed){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
BufferedReader reader = new BufferedReader(fileReader);
try {
while((str = reader.readLine()) != null){
/**
* 按行发布一个新值
*/
this.collector.emit(new Values(str),str);
}
} catch (IOException e) {
throw new RuntimeException("Error reading tuple",e);
}finally {
completed = true;
}
}
/**
* 我们将创建一个文件并维持一个collector对象
*/
@SuppressWarnings("static-access")
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file ["+conf.get("words")+"]");
}
this.collector = collector;
}
/*
* 声明bolt的出参
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
//将文本行会全部转化成小写,并切分它,从中得到所有单词。
public class WordNormalizer implements IRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
/**
* *bolt*从单词文件接收到文本行,并标准化它。
* 文本行会全部转化成小写,并切分它,从中得到所有单词。
*/
@Override
public void execute(Tuple input) {
//获取文本中的一行字符串
String sentence = input.getString(0);
//以空格分割
String[] words = sentence.split(" ");
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word=word.toLowerCase();
//发布这个单词
List a = new ArrayList();
a.add(input);
collector.emit(a,new Values(word));
}
}
//对元组做出应答
collector.ack(input);
}
@Override
public void prepare(Map arg0, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
/**
* 这个*bolt*只会发布“word”域
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
//统计相同单词的个数
public class WordCounter implements IRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
Integer id;
String name;
Map<String,Integer> counters;
/**
* 这个spout结束时(集群关闭的时候),我们会显示单词数量
*/
@Override
public void cleanup() {
System.out.println("-- 单词数 【"+name+"-"+id+"】 --");
for(Map.Entry<String,Integer> entry : counters.entrySet()){
System.out.println(entry.getKey()+": "+entry.getValue());
}
}
/**
* 为每个单词计数
*/
@Override
public void execute(Tuple input) {
String str=input.getString(0);
/**
* 如果单词尚不存在于map,我们就创建一个,如果已在,我们就为它加1
*/
if(!counters.containsKey(str)){
counters.put(str,1);
}else{
Integer c = counters.get(str) + 1;
counters.put(str,c);
}
//对元组作为应答
collector.ack(input);
}
@Override
public void prepare(Map arg0, TopologyContext context, OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
//启动storm本地集群,调用创建好的spouts和bolts
public class TopologyMain {
//启动storm本地集群
public static void main(String[] args) {
//定义拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));
//配置
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
//运行拓扑
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cluster.shutdown();
}
}
运行结果: