storm热力分布图项目(四)

一、storm整合hbase

pom.xml中变化

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.qianliu</groupId>
  <artifactId>storm</artifactId>
  <version>1.0-SNAPSHOT</version>

  <!--下载cdh版本的仓库-->
  <repositories>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
      <id>alibaba</id>
      <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </repository>
  </repositories>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <storm.version>1.1.1</storm.version>
    <storm.kafka.client.version>0.9.0.0</storm.kafka.client.version>
    <hadoop.version>2.6.0</hadoop.version>
    <hbase.version>1.2.0-cdh5.7.0</hbase.version>
    <scala.version>2.11.8</scala.version>
  </properties>

  <dependencies>
    <!--junit-->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>

    <!--引入storm-->
    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>${storm.version}</version>
    </dependency>

    <!--storm-kafka-->
    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-kafka</artifactId>
      <version>${storm.version}</version>
    </dependency>

    <!-- Hadoop 依赖-->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.5</version>
    </dependency>

    <!-- HBase 依赖-->
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>${hbase.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-server</artifactId>
      <version>${hbase.version}</version>
    </dependency>

    <!--scala-->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <!--mysql-->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.38</version>
    </dependency>

    <!--kafka-->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${storm.kafka.client.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
    </dependency>

    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>16.0.1</version>
    </dependency>

    <!--test dependencies -->
    <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-core</artifactId>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-junit-jupiter</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.hamcrest</groupId>
      <artifactId>java-hamcrest</artifactId>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-params</artifactId>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>log4j-over-slf4j</artifactId>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>${storm.kafka.client.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${storm.kafka.client.version}</version>
      <classifier>test</classifier>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>${storm.kafka.client.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-test</artifactId>
    </dependency>

    <dependency>
      <groupId>commons-lang</groupId>
      <artifactId>commons-lang</artifactId>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>


开启hbase

zkServer.sh start
start-all.sh
hbase-start.sh
# 进入hbase的命令行
hbase shell

创建数据库:times用来存放访问次数,local用来表示地理的经纬度

create 'state','local','times'

配置文件专用的类:

package com.qianliu.Utils;

public interface Config {
    public String HBaseTableName = "state";
    public String HBaseColumn0 = "local";    //rowkey
    public String HBaseColumn1= "times";
    public String col1_info1 = "times"; //存放访问次数
}

清洗数据的类做出修改

package com.qianliu;

import com.qianliu.Utils.DataUtils;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class LogPoccessBolt extends BaseRichBolt {

    private OutputCollector collector;


    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        try {
            byte[] bytes = input.getBinaryByField("bytes");
            String value = new String(bytes);

            /*
             * 进来的字符串形如:13999999999	116.410588,39.880172	[2019-29-04/29/19 13:29:33]
             * 进行解析
             * */
            String[] splits = value.split("\t");
            String phone = splits[0];
            String[] local = splits[1].split(",");
            String longitude = local[0]; //经度
            String latitude = local[1]; //纬度
            long time = DataUtils.getInstance().getTime(splits[2]); //[2019-29-04/29/19 13:29:33]格式的时间转换成一个时间戳

            //发向下一个bolt
            this.collector.emit(new Values(phone,longitude,latitude,time));
            //System.out.println(phone+"\t"+longitude+"\t"+latitude+"\t"+time);

            this.collector.ack(input);
        } catch (Exception e) {
            this.collector.fail(input);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("phone", "longitude","latitude","time"));
    }
}

编写一个hbase的工具类:

package com.qianliu.Utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

public class HBaseUtils {

    public static Configuration configuration;
    public static Connection con;
    static {
        configuration = new Configuration();
        configuration.set("hbase.zookeeper.quorum", "192.168.48.138:2181");
        configuration.set("hbase.rootdir", "hdfs://192.168.48.138:8020/hbase");
        try {
            con = ConnectionFactory.createConnection(configuration);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

一个storm整合hbase的类

package com.qianliu;

import com.qianliu.Utils.Config;
import com.qianliu.Utils.HBaseUtils;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TupleWindow;

import java.util.HashMap;
import java.util.Map;

/*
* 利用滑动窗口,每20秒中一个窗口,5秒更新一次数据
* */
public class HbaseBoltByUser extends BaseWindowedBolt {

    OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        HashMap<String,Integer> hashMap = new HashMap<String,Integer>(); //hashmap存放一个窗口内的数据<local,times>,local是经纬度,times是一个窗口内该local的访问次数

        try {
            for (Tuple input : inputWindow.get()){
                //删一个bolt发过来的信息
                String phone = input.getStringByField("phone");
                String longitude = input.getStringByField("longitude");
                String latitude = input.getStringByField("latitude");
                long time = input.getLongByField("time");

                //System.out.println(phone+"\t"+longitude+"\t"+latitude+"\t"+time);

                //如果第一次放入,则<local,0>
                //如果不是第一次放入,则<local,times++>
                if(hashMap.get(longitude+","+latitude)==null){
                    hashMap.put(longitude+","+latitude,0);
                }else {
                    int times = (int)hashMap.get(longitude+","+latitude); //统计次数
                    hashMap.put(longitude+","+latitude,++times);
                }

                collector.ack(input);
            }

            //将hashmap中的数据放入hbase
            HTable table = new HTable(HBaseUtils.con.getConfiguration(), Config.HBaseTableName);//连接hbase
            for (Map.Entry<String,Integer> entry : hashMap.entrySet()) {
                Put put = new Put(Bytes.toBytes(entry.getKey()));// rowkey是local
                //添加数据
                put.addColumn(Config.HBaseColumn1.getBytes(), Config.col1_info1.getBytes(), (entry.getValue()+"").getBytes());//访问次数
                table.put(put);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

主函数

package com.qianliu;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

public class StormKafkaTopo {
    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        //storm的builder
        TopologyBuilder builder = new TopologyBuilder();

        //创建spout,采集kafka数据
        BrokerHosts hosts = new ZkHosts("192.168.48.138:2181");
        String topic = "logstash";
        String zkRoot = "/"+topic;
        String id = UUID.randomUUID().toString();
        SpoutConfig spoutConfig = new SpoutConfig(hosts,topic,zkRoot,id);
        spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); //设置每次读取从上次读取结束的位置开始
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        //builder装载spout
        String SPOUT_ID = KafkaSpout.class.getSimpleName();//产生一个简称ID
        builder.setSpout(SPOUT_ID,kafkaSpout);

        //builder装载bolt,并且与spout创建关联关系
        String BOLT_ID = LogPoccessBolt.class.getSimpleName();
        builder.setBolt(BOLT_ID,new LogPoccessBolt()).shuffleGrouping(SPOUT_ID);

        //builder装载bolt2,并且与bolt创建关联关系
        String BOLT2_ID = HbaseBoltByUser.class.getSimpleName();
        builder.setBolt(BOLT2_ID,new HbaseBoltByUser()
                //统计20秒内的数据,每5秒计算一次结果
                .withWindow(new BaseWindowedBolt.Duration(20, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS)),1)
                .shuffleGrouping(BOLT_ID);

        //提交任务  -----两种模式 本地模式和集群模式
        Config config = new Config();
        config.setNumWorkers(1);
        if (args.length>0) {
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());//
        }else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("storm2kafka", config, builder.createTopology());
        }
    }
}

测试:
产生数据
storm热力分布图项目(四)
运行程序。。
写入hbase成功
storm热力分布图项目(四)