Storm 简单记录
Storm 简单记录
DRPC(Distributed Remote Procedure Call) 分布式远程调用过程
DRPC是通过一个DRPC服务端(DRPC server)来实现分布式RPC功能的,DRPC server负责接收RPC的请求,接收到请求后将其发送给Storm中运行Topology,等待接收Topology发送的处理结果,最后将结果返回给请求客户端
客户端通过向DRPC服务器发送执行函数的名称依据该函数的参数来获得处理结果,实现该函数的拓扑使用一个DRPCspout从DRPC服务器中皆使用一个函数调用流,DRPC服务器会为每一个函数调用标记一个唯一的id,随后拓跋会执行函数计算结果,并在拓扑的最后使用一个名为ReturnResults的bolt连接到DRPC服务器,根据函数调用的id将结果返回
模式一(弃用)
package com.mjlf.drcp;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.cluster.StormClusterState;
import org.apache.storm.drpc.LinearDRPCInputDeclarer;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* @ClassName LinerBasicDrcpBuilder
* @Author mjlft
* @Date 2019/3/23 14:10
* @Version 1.0
* @Description TODO
*/
public class LinerBasicDrcpBuilder {
public static class ExclaimBolt extends BaseBasicBolt{
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String str = tuple.getString(1);
basicOutputCollector.emit(new Values(tuple.getValue(0), str + "~~~~"));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("id", "word"));
}
}
public static void main(String[] args){
/**
* 此处需要传入一个方法名称, 后续具体执行时需要
*/
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exec");
builder.addBolt(new ExclaimBolt(), 3);
Config config = new Config();
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("drcp_demo", config, builder.createLocalTopology(drpc));
for (String word : new String[] { "hello", "goodbye" }) {
System.err.println("Result for \"" + word + "\": " + drpc.execute("exec", word));
}
cluster.shutdown();
drpc.shutdown();
}
}
模式二:
package com.mjlf.drcp;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.clojure.RichShellBolt;
import org.apache.storm.drpc.DRPCSpout;
import org.apache.storm.drpc.ReturnResults;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* @ClassName ManualDRPC
* @Author mjlft
* @Date 2019/3/23 15:18
* @Version 1.0
* @Description TODO
*/
public class ManualDRPC {
public static class EBolt extends BaseBasicBolt{
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String res = tuple.getString(0) + "----";
basicOutputCollector.emit(new Values(res, tuple.getValue(1)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("result", "result-info"));
}
}
public static void main(String[] args){
TopologyBuilder topologyBuilder = new TopologyBuilder();
LocalDRPC localDRPC = new LocalDRPC();
DRPCSpout drpcSpout = null;
if(args != null && args.length > 0){
drpcSpout = new DRPCSpout("exec");
}else {
drpcSpout = new DRPCSpout("exec",localDRPC);
}
topologyBuilder.setSpout("drcpspout", drpcSpout);
topologyBuilder.setBolt("sss", new EBolt(), 1).shuffleGrouping("drcpspout");
topologyBuilder.setBolt("resultBolt", new ReturnResults(), 1).shuffleGrouping("sss");
Config config = new Config();
if(args != null && args.length > 0){
config.setNumWorkers(2);
try {
StormSubmitter.submitTopology("exec", config, topologyBuilder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
}else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("xxx", config, topologyBuilder.createTopology());
System.out.println(localDRPC.execute("exec", "xxx"));
System.out.println(localDRPC.execute("exec", "yyy"));
localCluster.shutdown();
localDRPC.shutdown();
}
}
}
客户端:
public class DRPCClientDemo {
public static void main(String[] args) throws TException, DRPCExecutionException {
DRPCClient client = new DRPCClient("hd181", 3772);
//需要注意此次的方法名称要和服务端注册的一致
String result = client.execute("exec", "aaa");
System.out.println(result);
}
}
集群配置:
修改配置文件conf/storm.yaml
drpc.servers:
- "node1“
启动DRPC Server
bin/storm drpc &
通过StormSubmitter.submitTopology提交拓扑
StormSubmitter.submitTopology(args[0], conf, builder.createTopology())
GROUPING
- Shuffle Grouping
随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。
轮询,平均分配 - Fields Grouping
按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一个task, 而不同的"user-id"则可能会被分配到不同的task。 - All Grouping
广播发送,对于每一个tuple,所有的bolts都会收到 - Global Grouping
全局分组,把tuple分配给task id最低的task 。 - None Grouping
不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。 有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。 - Direct Grouping
指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id) - Local or shuffle grouping
本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致 - customGrouping
自定义,相当于mapreduce那里自己去实现一个partition一样。
ACK
//spout
package com.mjlf.spout;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.List;
import java.util.Map;
/**
* @ClassName NumSpout
* @Author mjlft
* @Date 2019/3/19 23:12
* @Version 1.0
* @Description TODO
*/
public class NumSpout extends BaseRichSpout {
private Map map;
private TopologyContext topologyContext;
private SpoutOutputCollector collector;
private int i;
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.map = map;
this.topologyContext = topologyContext;
this.collector = spoutOutputCollector;
}
public void nextTuple() {
i ++;
List num = new Values(i);
collector.emit(num, i);
Utils.sleep(1000);
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("num"));
}
@Override
public void ack(Object msgId) {
System.out.println("Ack : " + msgId.toString());
}
@Override
public void fail(Object msgId) {
System.out.println("fail : " + msgId.toString());
}
}
//bolt
package com.mjlf.bolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* @ClassName NumBolt
* @Author mjlft
* @Date 2019/3/19 23:26
* @Version 1.0
* @Description TODO
*/
public class NumBolt extends BaseRichBolt {
private Map map;
private TopologyContext topologyContext;
private OutputCollector outputCollector;
int sum = 0;
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.map = map;
this.topologyContext = topologyContext;
this.outputCollector = outputCollector;
}
public void execute(Tuple tuple) {
try {
sum += tuple.getIntegerByField("num");
System.out.println(sum);
//回调spout中的方法
this.outputCollector.ack(tuple);
}catch (Exception e){
this.outputCollector.fail(tuple);
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}