Storm 简单记录

Storm 简单记录

DRPC(Distributed Remote Procedure Call) 分布式远程调用过程

DRPC是通过一个DRPC服务端(DRPC server)来实现分布式RPC功能的,DRPC server负责接收RPC的请求,接收到请求后将其发送给Storm中运行Topology,等待接收Topology发送的处理结果,最后将结果返回给请求客户端
Storm 简单记录

客户端通过向DRPC服务器发送执行函数的名称依据该函数的参数来获得处理结果,实现该函数的拓扑使用一个DRPCspout从DRPC服务器中皆使用一个函数调用流,DRPC服务器会为每一个函数调用标记一个唯一的id,随后拓跋会执行函数计算结果,并在拓扑的最后使用一个名为ReturnResults的bolt连接到DRPC服务器,根据函数调用的id将结果返回

模式一(弃用)

package com.mjlf.drcp;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.cluster.StormClusterState;
import org.apache.storm.drpc.LinearDRPCInputDeclarer;
import org.apache.storm.drpc.LinearDRPCTopologyBuilder;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
 * @ClassName LinerBasicDrcpBuilder
 * @Author mjlft
 * @Date 2019/3/23 14:10
 * @Version 1.0
 * @Description TODO
 */
public class LinerBasicDrcpBuilder {

    public static class ExclaimBolt extends BaseBasicBolt{
        @Override
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            String str = tuple.getString(1);
            basicOutputCollector.emit(new Values(tuple.getValue(0), str + "~~~~"));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("id", "word"));
        }
    }

    public static void main(String[] args){
        /**
         * 此处需要传入一个方法名称, 后续具体执行时需要
         */
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exec");
        builder.addBolt(new ExclaimBolt(), 3);

        Config config = new Config();

        LocalDRPC drpc = new LocalDRPC();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("drcp_demo", config, builder.createLocalTopology(drpc));

        for (String word : new String[] { "hello", "goodbye" }) {
            System.err.println("Result for \"" + word + "\": " + drpc.execute("exec", word));
        }

        cluster.shutdown();
        drpc.shutdown();
    }
}

模式二:

package com.mjlf.drcp;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.clojure.RichShellBolt;
import org.apache.storm.drpc.DRPCSpout;
import org.apache.storm.drpc.ReturnResults;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
 * @ClassName ManualDRPC
 * @Author mjlft
 * @Date 2019/3/23 15:18
 * @Version 1.0
 * @Description TODO
 */
public class ManualDRPC {

    public static class EBolt extends BaseBasicBolt{
        @Override
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            String res = tuple.getString(0) + "----";
            basicOutputCollector.emit(new Values(res, tuple.getValue(1)));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("result", "result-info"));
        }
    }

    public static void main(String[] args){

        TopologyBuilder topologyBuilder = new TopologyBuilder();

        LocalDRPC localDRPC = new LocalDRPC();
        DRPCSpout drpcSpout = null;
        if(args != null && args.length > 0){
            drpcSpout = new DRPCSpout("exec");
        }else {
            drpcSpout = new DRPCSpout("exec",localDRPC);
        }

        topologyBuilder.setSpout("drcpspout", drpcSpout);
        topologyBuilder.setBolt("sss", new EBolt(), 1).shuffleGrouping("drcpspout");
        topologyBuilder.setBolt("resultBolt", new ReturnResults(), 1).shuffleGrouping("sss");
        Config config = new Config();

        if(args != null && args.length > 0){
            config.setNumWorkers(2);
            try {
                StormSubmitter.submitTopology("exec", config, topologyBuilder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            } catch (AuthorizationException e) {
                e.printStackTrace();
            }
        }else {
            LocalCluster localCluster = new LocalCluster();

            localCluster.submitTopology("xxx", config, topologyBuilder.createTopology());

            System.out.println(localDRPC.execute("exec", "xxx"));
            System.out.println(localDRPC.execute("exec", "yyy"));

            localCluster.shutdown();
            localDRPC.shutdown();
        }
    }
}

客户端:

public class DRPCClientDemo {

    public static void main(String[] args) throws TException, DRPCExecutionException {
        DRPCClient client = new DRPCClient("hd181", 3772);
        //需要注意此次的方法名称要和服务端注册的一致
        String result = client.execute("exec", "aaa");
        System.out.println(result);
    }
}

集群配置:

修改配置文件conf/storm.yaml
drpc.servers:
    - "node1“

启动DRPC Server
bin/storm drpc &

通过StormSubmitter.submitTopology提交拓扑
StormSubmitter.submitTopology(args[0], conf, builder.createTopology())

GROUPING

  1. Shuffle Grouping
    随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。
    轮询,平均分配
  2. Fields Grouping
    按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一个task, 而不同的"user-id"则可能会被分配到不同的task。
  3. All Grouping
    广播发送,对于每一个tuple,所有的bolts都会收到
  4. Global Grouping
    全局分组,把tuple分配给task id最低的task 。
  5. None Grouping
    不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。 有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。
  6. Direct Grouping
    指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)
  7. Local or shuffle grouping
    本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致
  8. customGrouping
    自定义,相当于mapreduce那里自己去实现一个partition一样。

ACK


//spout
package com.mjlf.spout;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.List;
import java.util.Map;

/**
 * @ClassName NumSpout
 * @Author mjlft
 * @Date 2019/3/19 23:12
 * @Version 1.0
 * @Description TODO
 */
public class NumSpout extends BaseRichSpout {
    private Map map;
    private TopologyContext topologyContext;
    private SpoutOutputCollector collector;

    private int i;

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.map = map;
        this.topologyContext = topologyContext;
        this.collector = spoutOutputCollector;
    }

    public void nextTuple() {
        i ++;
        List num = new Values(i);
        collector.emit(num, i);

        Utils.sleep(1000);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("num"));
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("Ack : " + msgId.toString());
    }

    @Override
    public void fail(Object msgId) {
        System.out.println("fail : " + msgId.toString());
    }
}


//bolt
package com.mjlf.bolt;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * @ClassName NumBolt
 * @Author mjlft
 * @Date 2019/3/19 23:26
 * @Version 1.0
 * @Description TODO
 */
public class NumBolt extends BaseRichBolt {
    private Map map;
    private TopologyContext topologyContext;
    private OutputCollector outputCollector;

    int sum = 0;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.map = map;
        this.topologyContext = topologyContext;
        this.outputCollector = outputCollector;
    }

    public void execute(Tuple tuple) {
        try {
            sum += tuple.getIntegerByField("num");
            System.out.println(sum);
            //回调spout中的方法
            this.outputCollector.ack(tuple);
        }catch (Exception e){
            this.outputCollector.fail(tuple);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}