storm热力分布图项目(四)
一、storm整合hbase
pom.xml中变化
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qianliu</groupId>
<artifactId>storm</artifactId>
<version>1.0-SNAPSHOT</version>
<!--下载cdh版本的仓库-->
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>alibaba</id>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<storm.version>1.1.1</storm.version>
<storm.kafka.client.version>0.9.0.0</storm.kafka.client.version>
<hadoop.version>2.6.0</hadoop.version>
<hbase.version>1.2.0-cdh5.7.0</hbase.version>
<scala.version>2.11.8</scala.version>
</properties>
<dependencies>
<!--junit-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!--引入storm-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<!--storm-kafka-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${storm.version}</version>
</dependency>
<!-- Hadoop 依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<!-- HBase 依赖-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<!--scala-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${storm.kafka.client.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>
<!--test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>java-hamcrest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${storm.kafka.client.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${storm.kafka.client.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${storm.kafka.client.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
开启hbase
zkServer.sh start
start-all.sh
hbase-start.sh
# 进入hbase的命令行
hbase shell
创建数据库:times用来存放访问次数,local用来表示地理的经纬度
create 'state','local','times'
配置文件专用的类:
package com.qianliu.Utils;
public interface Config {
public String HBaseTableName = "state";
public String HBaseColumn0 = "local"; //rowkey
public String HBaseColumn1= "times";
public String col1_info1 = "times"; //存放访问次数
}
清洗数据的类做出修改
package com.qianliu;
import com.qianliu.Utils.DataUtils;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class LogPoccessBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
try {
byte[] bytes = input.getBinaryByField("bytes");
String value = new String(bytes);
/*
* 进来的字符串形如:13999999999 116.410588,39.880172 [2019-29-04/29/19 13:29:33]
* 进行解析
* */
String[] splits = value.split("\t");
String phone = splits[0];
String[] local = splits[1].split(",");
String longitude = local[0]; //经度
String latitude = local[1]; //纬度
long time = DataUtils.getInstance().getTime(splits[2]); //[2019-29-04/29/19 13:29:33]格式的时间转换成一个时间戳
//发向下一个bolt
this.collector.emit(new Values(phone,longitude,latitude,time));
//System.out.println(phone+"\t"+longitude+"\t"+latitude+"\t"+time);
this.collector.ack(input);
} catch (Exception e) {
this.collector.fail(input);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("phone", "longitude","latitude","time"));
}
}
编写一个hbase的工具类:
package com.qianliu.Utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
public class HBaseUtils {
public static Configuration configuration;
public static Connection con;
static {
configuration = new Configuration();
configuration.set("hbase.zookeeper.quorum", "192.168.48.138:2181");
configuration.set("hbase.rootdir", "hdfs://192.168.48.138:8020/hbase");
try {
con = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
}
一个storm整合hbase的类
package com.qianliu;
import com.qianliu.Utils.Config;
import com.qianliu.Utils.HBaseUtils;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TupleWindow;
import java.util.HashMap;
import java.util.Map;
/*
* 利用滑动窗口,每20秒中一个窗口,5秒更新一次数据
* */
public class HbaseBoltByUser extends BaseWindowedBolt {
OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
HashMap<String,Integer> hashMap = new HashMap<String,Integer>(); //hashmap存放一个窗口内的数据<local,times>,local是经纬度,times是一个窗口内该local的访问次数
try {
for (Tuple input : inputWindow.get()){
//删一个bolt发过来的信息
String phone = input.getStringByField("phone");
String longitude = input.getStringByField("longitude");
String latitude = input.getStringByField("latitude");
long time = input.getLongByField("time");
//System.out.println(phone+"\t"+longitude+"\t"+latitude+"\t"+time);
//如果第一次放入,则<local,0>
//如果不是第一次放入,则<local,times++>
if(hashMap.get(longitude+","+latitude)==null){
hashMap.put(longitude+","+latitude,0);
}else {
int times = (int)hashMap.get(longitude+","+latitude); //统计次数
hashMap.put(longitude+","+latitude,++times);
}
collector.ack(input);
}
//将hashmap中的数据放入hbase
HTable table = new HTable(HBaseUtils.con.getConfiguration(), Config.HBaseTableName);//连接hbase
for (Map.Entry<String,Integer> entry : hashMap.entrySet()) {
Put put = new Put(Bytes.toBytes(entry.getKey()));// rowkey是local
//添加数据
put.addColumn(Config.HBaseColumn1.getBytes(), Config.col1_info1.getBytes(), (entry.getValue()+"").getBytes());//访问次数
table.put(put);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
主函数
package com.qianliu;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class StormKafkaTopo {
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
//storm的builder
TopologyBuilder builder = new TopologyBuilder();
//创建spout,采集kafka数据
BrokerHosts hosts = new ZkHosts("192.168.48.138:2181");
String topic = "logstash";
String zkRoot = "/"+topic;
String id = UUID.randomUUID().toString();
SpoutConfig spoutConfig = new SpoutConfig(hosts,topic,zkRoot,id);
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); //设置每次读取从上次读取结束的位置开始
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
//builder装载spout
String SPOUT_ID = KafkaSpout.class.getSimpleName();//产生一个简称ID
builder.setSpout(SPOUT_ID,kafkaSpout);
//builder装载bolt,并且与spout创建关联关系
String BOLT_ID = LogPoccessBolt.class.getSimpleName();
builder.setBolt(BOLT_ID,new LogPoccessBolt()).shuffleGrouping(SPOUT_ID);
//builder装载bolt2,并且与bolt创建关联关系
String BOLT2_ID = HbaseBoltByUser.class.getSimpleName();
builder.setBolt(BOLT2_ID,new HbaseBoltByUser()
//统计20秒内的数据,每5秒计算一次结果
.withWindow(new BaseWindowedBolt.Duration(20, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS)),1)
.shuffleGrouping(BOLT_ID);
//提交任务 -----两种模式 本地模式和集群模式
Config config = new Config();
config.setNumWorkers(1);
if (args.length>0) {
StormSubmitter.submitTopology(args[0], config, builder.createTopology());//
}else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("storm2kafka", config, builder.createTopology());
}
}
}
测试:
产生数据
运行程序。。
写入hbase成功