storm热力分布图项目(二)
一、使用高德地图
https://lbs.amap.com/api/javascript-api/example/selflayer/heatmap/
进入控制台申请一个key
新建一个test.html来测试,在key后面你输入上面申请的key
经纬度查询:
把故宫作为地图的中心:补上经纬度
地图工具来开发的API: https://lbs.amap.com/api/javascript-api/reference/plugin#AMap.MouseTool
热力图的开发API: https://lbs.amap.com/api/javascript-api/reference/layer#m_AMap.Heatmap
二、storm整合kafka
官方文档: https://storm.apache.org/releases/1.1.2/storm-kafka-client.html
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.qianliu</groupId>
<artifactId>storm</artifactId>
<version>1.0-SNAPSHOT</version>
<name>storm</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<storm.version>1.1.1</storm.version>
<storm.kafka.client.version>0.9.0.0</storm.kafka.client.version>
</properties>
<dependencies>
<!--junit-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!--引入storm-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<!--storm-kafka-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${storm.kafka.client.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>
<!--test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>java-hamcrest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${storm.kafka.client.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${storm.kafka.client.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${storm.kafka.client.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
StormKafkaTopo.java
package com.qianliu;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.topology.TopologyBuilder;
import java.util.UUID;
public class StormKafkaTopo {
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
//storm的builder
TopologyBuilder builder = new TopologyBuilder();
//创建spout,采集kafka数据
BrokerHosts hosts = new ZkHosts("192.168.48.138:2181");
String topic = "logstash";
String zkRoot = "/"+topic;
String id = UUID.randomUUID().toString();
SpoutConfig spoutConfig = new SpoutConfig(hosts,topic,zkRoot,id);
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
//builder装载spout
String SPOUT_ID = KafkaSpout.class.getSimpleName();//产生一个简称ID
builder.setSpout(SPOUT_ID,kafkaSpout);
//builder装载bolt,并且与spout创建关联关系
String BOLT_ID = LogPoccessBolt.class.getSimpleName();
builder.setBolt(BOLT_ID,new LogPoccessBolt()).shuffleGrouping(SPOUT_ID);
//提交任务 -----两种模式 本地模式和集群模式
Config config = new Config();
config.setNumWorkers(1);
if (args.length>0) {
StormSubmitter.submitTopology(args[0], config, builder.createTopology());//
}else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("storm2kafka", config, builder.createTopology());
}
}
}
LogProccessBolt.java
package com.qianliu;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import java.util.Map;
public class LogPoccessBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
byte[] bytes = input.getBinaryByField("bytes");
String value = new String(bytes);
System.out.println(value);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
启动zookeeper,kafka,logstash
zkServer.sh start
bin/kafka-server-start.sh config/server.properties
bin/logstash -f file_kafka.conf
启动程序,并向监控的logstash.txt中增加数据:
出现如下情形表示logstash-kafka-storm打通了。