Storm知识点学习
1 并行度
一个运行的topology由3类物理实体构成
- Worker进程
- Executor线程
- Task实例
当运行一个topology的时候,首先会在storm集群中启多个worker进程,每个worker进程中再起若干的executor线程,每个executor线程中运行一个或多个task的实例,每个executor中的task都属于同一个spout或者bolt。默认每个executor中运行一个task实例。
每个spout或者bolt都会被创建多个task实例执行具体的业务逻辑,注意一个executor中如果有多个task,它们之间是串行的。
Config conf = new Config();
conf.setNumWorkers(2); // 设置两个worker进程
topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // 设置并行度为2,这里指的就是executor的数量
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4) //设置这个bolt的task总数为4
.shuffleGrouping("blue-spout");
topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
.shuffleGrouping("green-bolt");
StormSubmitter.submitTopology(
"mytopology",
conf,
topologyBuilder.createTopology()
);
通过上面的代码片度,可以计算并行度:
说明
通过上面的解释对worker和executor的概念很好理解,这里着重说明一下task。为了提高并行度,对于运行中的topology官方提供了动态更新worker和executor数量的方法,但是并没有说tasks的数量在提高并行度中起到的作用,下面是引用官方对task的解释
A task performs the actual data processing — each spout or bolt that you implement in your code executes as many tasks across the cluster. ***The number of tasks for a component is always the same throughout the lifetime of a topology, but the number of executors (threads) for a component can change over time. This means that the following condition holds true: #threads ≤ #tasks. ***By default, the number of tasks is set to be the same as the number of executors, i.e. Storm will run one task per thread.
这里涉及到几个关键点:
- task的数量在整个topology的生命周期中是不会改变的
- executor的数量一定小于等于task的数量,大于task的数量没有任何意义
初始化配置的时候如果task的数量大于executor的数量,例如上例中的green-bolt。那么一个executor中只能顺序执行其中的task,通过rebalance命令重置executor达到和task的数量一致,那么每个executor执行一个task,这样所有的task就都可以并行了,提高了topology的并行度。
但是像上例中的blue-spout和yellow-bolt,初始化的时候没有指定task的数量,那么默认task的数量和executor数量是一样的,即使通过rebalance命令提高executor的数量,对于并行度也是没有任何影响的,threads的数量已经等于task了。
所以设置task的数量是为了以后提高并行度做准备,一旦topology运行之后task的数量就无法改变了,如果要通过rebalance命令提高并行度,初始设置的时候task的数量要大于executor的数量。
参考资料:https://www.jianshu.com/p/8d5928680fde
2 storm关键类的使用
2.1 SpoutOutputCollector
List<Integer> emit(List<Object> tuple)
//Emits a tuple to the default output stream with a null message id.
List<Integer> emit(List<Object> tuple, Object messageId)
//Emits a new tuple to the default output stream with the given message ID.
List<Integer> emit(String streamId, List<Object> tuple)
//Emits a tuple to the specified output stream with a null message id.
List<Integer> emit(String streamId, List<Object> tuple, Object messageId)
//Emits a new tuple to the specified output stream with the given message ID.
2.2OutputCollector
void ack(Tuple input)
void fail(Tuple input)
List<Integer> emit(List<Object> tuple)
//Emits a new unanchored tuple to the default stream.
List<Integer> emit(String streamId, List<Object> tuple)
//Emits a new unanchored tuple to the specified stream.
void emitDirect(int taskId, List<Object> tuple)
//Emits a tuple directly to the specified task id on the default stream.
2.2 Topolpgybuilder
StormTopology createTopology()
BoltDeclarer setBolt(String componentId, IBasicBolt bolt)
//Define a new bolt in this topology.
BoltDeclarer setBolt(String componentId, IBasicBolt bolt, Number parallelism_hint)
//Define a new bolt in this topology.
BoltDeclarer setBolt(String componentId, IRichBolt bolt)
//Define a new bolt in this topology with parallelism of just one thread.
BoltDeclarer setBolt(String componentId, IRichBolt bolt, Number parallelism_hint)
//Define a new bolt in this topology with the specified amount of parallelism.
2.3 InputDeclarer
T shuffleGrouping(String componentId)
//Tuples are randomly distributed across the bolt’s tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
T shuffleGrouping(String componentId, String streamId)
//Tuples are randomly distributed across the bolt’s tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
T fieldsGrouping(String componentId, Fields fields)
//The stream is partitioned by the fields specified in the grouping.
T fieldsGrouping(String componentId, String streamId, Fields fields)
//The stream is partitioned by the fields specified in the grouping.
问题描述1:当发送不同的tuple到不同的下级bolt时, 这个时候,就需要引入stream概念,发送方发送a 消息到接收方A'时使用stream A, 发送b 消息到接收方B'时,使用stream B
builder.setBolt(SequenceTopologyDef.SPLIT_BOLT_NAME, new SplitRecord(),2)
.shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(),1)
.shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME, // --- 发送方名字
SequenceTopologyDef.TRADE_STREAM_ID); // --- 接收发送方stream的tuple
builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
.shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME, // --- 发送方名字
SequenceTopologyDef.CUSTOMER_STREAM_ID);// --- 接收发送方该stream 的tuple
在发送消息时:
public void execute(Tuple tuple, BasicOutputCollector collector) {
tpsCounter.count();
Long tupleId = tuple.getLong(0);
Object obj = tuple.getValue(1);
if (obj instanceof TradeCustomer) {
TradeCustomer tradeCustomer = (TradeCustomer)obj;
Pair trade = tradeCustomer.getTrade();
Pair customer = tradeCustomer.getCustomer();
collector.emit(SequenceTopologyDef.TRADE_STREAM_ID, // --- streamID
new Values(tupleId, trade));
collector.emit(SequenceTopologyDef.CUSTOMER_STREAM_ID, // --- streamID
new Values(tupleId, customer));
}else if (obj != null){
LOG.info("Unknow type " + obj.getClass().getName());
}else {
LOG.info("Nullpointer " );
}
}
注意
注意1:当一个Bolt需要吧全部消息分发给不同的Bolt的时候,采用xxxGrouping(conpomnentID)确定父Bolt。
注意2:当一个Bolt的中不同的消息分发给不同的Bolt,那么在执行emit()的时候需要指定streamID,然后在built.setBolt()的时候指定父Bolt的conpomnentID以及streamID。
问题描述2:
storm的ack-fail机制也就是storm的可靠消息处理机制,通俗来讲就是给spout发出的每个tuple带上一个messageid,然后这个spout下面的每一个bolt都会给他返回一个完成情况,只有当每一个bolt都返回了正确的结果,整个发送过程才算成功,任何一个bolt处理不成功,则不成功。