Storm学习-集群提交拓扑
一、写一个word-count案例,关于一些介绍已经在代码注释里面做了介绍,在这里就不用额外的篇幅来写一下storm的一下使用了
代码如下:
1.先写一个生成句子的spout,如下
/**
* @Auther: 18030501
* @Date: 2018/10/24 14:25
* @Description: 数据流生成者
*
* spout与bolt流程如下
* SentenceSpout-->SplitSentenceBolt-->WordCountBolt-->ReportBolt
*/
@Slf4j
public class SentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = {
"my name is whale",
"i like play games",
"my game name is The boy with the cannon",
"so no one dares to provoke me.",
"my girl friend is beautiful"
};
private int index = 0;
/**
* open()方法中是ISpout接口中定义,在Spout组件初始化时被调用。
* open()接受三个参数:
* 一个包含Storm配置的Map
* 一个TopologyContext对象,提供了topology中组件的信息
* SpoutOutputCollector对象提供发射tuple的方法
* 在这个例子中,我们不需要执行初始化,只是简单的存储在一个SpoutOutputCollector实例变量。
*/
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
log.info("----SentenceSpout.open----");
this.collector = spoutOutputCollector;
}
/**
* nextTuple()方法是任何Spout实现的核心。
* Storm调用这个方法,向输出的collector发出tuple。
* 在这里,我们只是发出当前索引的句子,并增加该索引准备发射下一个句子。
*/
@Override
public void nextTuple() {
if (index < sentences.length){
this.collector.emit(new Values(sentences[index]));
index++;
}
Utils.sleep(1);
}
/**
* declareOutputFields是在IComponent接口中定义的,所有Storm的组件(spout和bolt)都必须实现这个接口
* 用于告诉Storm流组件将会发出那些数据流,每个流的tuple将包含的字段
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
log.info("-----SentenceSpout.declareOutputFields----");
declarer.declare(new Fields("sentence"));
}
}
2、切分句子的Bolt
/**
* @Auther: 18030501
* @Date: 2018/10/24 14:41
* @Description: 单词分割器,订阅sentence spout发射的tuple流,实现分割单词
*/
@Slf4j
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
/**
* prepare()方法类似于ISpout 的open()方法。
* 这个方法在blot初始化时调用,可以用来准备bolt用到的资源,比如数据库连接。
* 本例子和SentenceSpout类一样,SplitSentenceBolt类不需要太多额外的初始化,
* 所以prepare()方法只保存OutputCollector对象的引用。
*/
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
log.info("----SplitSentenceBolt.prepare----");
this.collector = outputCollector;
}
/**
* SplitSentenceBolt核心功能是在类IBolt定义execute()方法,这个方法是IBolt接口中定义。
* 每次Bolt从流接收一个订阅的tuple,都会调用这个方法。
* 本例中,收到的元组中查找“sentence”的值,
* 并将该值拆分成单个的词,然后按单词发出新的tuple。
*/
@Override
public void execute(Tuple input) {
String sentence = input.getStringByField("sentence");
// 使用空格将句子分割成单词
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit(new Values(word));//向下一个bolt发射数据
}
}
/**
* splitSentenceBolt类定义一个元组流,每个包含一个字段(“word”)
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
log.info("----SplitSentenceBolt.declareOutputFields----");
declarer.declare(new Fields("word"));
}
}
3、单词计数的Bolt
/**
* @Auther: 18030501
* @Date: 2018/10/24 14:54
* @Description: 订阅 split sentence bolt的输出流,实现单词计数,并发送当前计数给下一个bolt
*/
@Slf4j
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
// 存储单词和对应的计数
private Map<String, Long> countMap = null;
/**
* 大部分实例变量通常是在prepare()中进行实例化,这个设计模式是由topology的部署方式决定的
* 因为在部署拓扑时,组件spout和bolt是在网络上发送的序列化的实例变量。
* 如果spout或bolt有任何non-serializable实例变量在序列化之前被实例化(例如,在构造函数中创建)
* 会抛出NotSerializableException并且拓扑将无法发布。
* 本例中因为HashMap 是可序列化的,所以可以安全地在构造函数中实例化。
* 但是,通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行复制和实例化
* 而在prepare()方法中对不可序列化的对象进行实例化。
*/
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
log.info("----WordCountBolt.prepare----");
this.collector = outputCollector;
this.countMap = new HashMap<>();
}
/**
* 在execute()方法中,我们查找的收到的单词的计数(如果不存在,初始化为0)
* 然后增加计数并存储,发出一个新的词和当前计数组成的二元组。
* 发射计数作为流允许拓扑的其他bolt订阅和执行额外的处理。
*/
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Long count = this.countMap.get(word);
if (count == null) {
count = 0L;//如果不存在,初始化为0
}
count++;//增加计数
this.countMap.put(word, count);//存储计数
this.collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//声明一个输出流,其中tuple包括了单词和对应的计数,向后发射
//其他bolt可以订阅这个数据流进一步处理
log.info("----WordCountBolt.declareOutputFields----");
declarer.declare(new Fields("word", "count"));
}
}
4、收集最终结果的Bolt
/**
* @Auther: 18030501
* @Date: 2018/10/24 15:02
* @Description: 报告生成器
*/
@Slf4j
public class ReportBolt extends BaseRichBolt {
// 保存单词和对应的计数
private HashMap<String, Long> counts = null;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
log.info("----ReportBolt.prepare----");
this.counts = Maps.newHashMap();
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Long count = input.getLongByField("count");
this.counts.put(word, count);
//实时输出
log.info("实时输出结果:{}", this.counts);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//这里是末端bolt,不需要发射数据流,这里无需定义
}
/**
* cleanup是IBolt接口中定义
* Storm在终止一个bolt之前会调用这个方法
* 本例我们利用cleanup()方法在topology关闭时输出最终的计数结果
* 通常情况下,cleanup()方法用来释放bolt占用的资源,如打开的文件句柄或数据库连接
*
* 但是当Storm拓扑在一个集群上运行,IBolt.cleanup()方法不能保证执行(这里是开发模式,生产环境不要这样做)。
*/
@Override
public void cleanup() {
log.info("----ReportBolt.cleanup----");
log.info("---------- FINAL COUNTS -----------");
ArrayList<String> keys = new ArrayList<>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for (String key : keys) {
System.out.println(key + " : " + this.counts.get(key));
}
log.info("----------------------------");
}
}
5、定义启动类,这里提供测试和提交集群的两种方式:
提交集群模式:
/**
* @Auther: 18030501
* @Date: 2018/10/24 15:08
* @Description: 实现单词计数topology
* <p>
* Storm的路由模式:
* shuffle grouping:洗牌模式,随机平均的发到下游节点上
* fields grouping:按照某一个字段来分配,拥有相同值的字段会分配到同一个节点上(即可连续跟踪某个固定特征的数据流)
* global grouping: 强制到某唯一的节点,实际上如果有多个节点去到任务号最低的节点
* all grouping: 强制到所有节点,需小心使用
* Partial Key grouping: 最新支持的,带负载均衡的Fields grouping
* Direct grouping: 手动指定要流动到的节点
*/
@Slf4j
public class StormApp {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) {
// 1.实例化spout和bolt
SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
// 2.创建拓扑实例
//TopologyBuilder提供流式风格的API来定义topology组件之间的数据流
TopologyBuilder builder = new TopologyBuilder();
// 3.注册一个sentence spout,默认一个Executor(线程)一个task
builder.setSpout(SENTENCE_SPOUT_ID, spout, 1);
// 4.注册一个SplitSentenceBolt并订阅sentence发出的数据流
// shuffleGrouping方法告诉Storm要将SentenceSpout发射的tuple随机均匀的分发给SplitSentenceBolt的实例
// SplitSentenceBolt单词分割器设置2个Task,1个Executor(线程)
builder.setBolt(SPLIT_BOLT_ID, splitBolt, 1).setNumTasks(2).shuffleGrouping(SENTENCE_SPOUT_ID);
// 5. 注册WordCountBolt,并订阅SplitSentenceBolt
//fieldsGrouping将含有特定数据的tuple路由到特殊的bolt实例中
//这里fieldsGrouping()方法保证所有"word"字段相同的tuple会被路由到同一个WordCountBolt实例中
//WordCountBolt单词计数器设置2个Executor(线程)
builder.setBolt(COUNT_BOLT_ID, countBolt, 2).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
// 6. 注册ReportBolt,并订阅WordCountBolt
//globalGrouping是把WordCountBolt发射的所有tuple路由到唯一的ReportBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
// Config类是一个HashMap<String,Object>的子类,用来配置topology运行时的行为
Config config = new Config();
// 设置worker数量
config.setNumWorkers(1);
config.setDebug(false);
config.setMaxSpoutPending(1000);
// 提交到集群
try {
StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
} catch (AlreadyAliveException e) {
log.error("submit topology error",e);
} catch (InvalidTopologyException e) {
log.error("submit topology error",e);
}
}
}
本地测试模式:
/**
* @Auther: 18030501
* @Date: 2018/10/26 11:30
* @Description: 本地测试模式
*/
public class StormAppTest {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) {
// 1.实例化spout和bolt
SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
// 2.创建拓扑实例
//TopologyBuilder提供流式风格的API来定义topology组件之间的数据流
TopologyBuilder builder = new TopologyBuilder();
// 3.注册一个sentence spout,设置两个Executor(线程),默认一个
builder.setSpout(SENTENCE_SPOUT_ID, spout, 1);
// 4.注册一个SplitSentenceBolt并订阅sentence发出的数据流
// shuffleGrouping方法告诉Storm要将SentenceSpout发射的tuple随机均匀的分发给SplitSentenceBolt的实例
// SplitSentenceBolt单词分割器设置2个Task,1个Executor(线程)
builder.setBolt(SPLIT_BOLT_ID, splitBolt, 1).setNumTasks(2).shuffleGrouping(SENTENCE_SPOUT_ID);
// 5. 注册WordCountBolt,并订阅SplitSentenceBolt
//fieldsGrouping将含有特定数据的tuple路由到特殊的bolt实例中
//这里fieldsGrouping()方法保证所有“word”字段相同的tuple会被路由到同一个WordCountBolt实例中
//WordCountBolt单词计数器设置2个Executor(线程)
builder.setBolt(COUNT_BOLT_ID, countBolt, 2).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
// 6. 注册ReportBolt,并订阅WordCountBolt
//globalGrouping是把WordCountBolt发射的所有tuple路由到唯一的ReportBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
// Config类是一个HashMap<String,Object>的子类,用来配置topology运行时的行为
Config config = new Config();
// 设置worker数量
config.setNumWorkers(1);
LocalCluster cluster = new LocalCluster();
// 本地提交
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}
至此代码已经写完了,接下来开始打包提交到storm机器上
这里需要注意的是:
1.使用maven打包需要把storm的jar包排除掉,不能打进jar包中
2.使用以下打包配置
<!-- 使用该插件把依赖的包也打进去 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.example6.demo6.storm.StormApp</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
操作步骤:
1.把jar包上传至服务器
2.启动storm服务
3.向集群提交任务,使用如下命令:
./storm jar /home/storm/demo6-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.example6.demo6.storm.StormApp word-count-topology
参数说明:
jar:运行的job
/home/storm/demo6-0.0.1-SNAPSHOT-jar-with-dependencies.jar :你jar包的路径
com.example6.demo6.storm.StormApp:启动类
word-count-topology:拓扑的名称
执行命令之后,效果如下:
此时登陆StormUI查看运行状态:
整个Storm的概览:
进入具体拓扑查看具体情况:
查看storm运行日志:
具体执行日志在work-port.log文件中: