spark案例---实现ip地址查询
通过Spark实现ip地址查询
1.需求分析
在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。
通过日志信息(运行商家或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬度。
2.技术调研
因为我们的需求是完成一张报表信息,所以对程序的实时性没有要求,所以可以选择内存计算spark来实现上述功能。
3.架构设计
搭建spark集群
4.开发流程
4.1数据准备
4.2 ip日志信息
在ip日志信息中,我们只需要关心ip这一个维度就可以了,其他的不做介绍
4.3 城市ip端信息
5.代码开发
5.1代码开发思路
- (1)加载城市ip段信息,获取起始数字和结束数字,经度,维度。
- (2) 加载日志文件,获取ip地址,然后转换为数字,和ip段比较
- (3)比较的时候采用二分法查找,找到对应的经度和维度
- (4)然后对经度和维度做单词计数。
5.2代码实现
(1)创建一个maven工程
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>Spark_01</artifactId>
<groupId>zt.Spark_01</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark02</artifactId>
<properties>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.4</hadoop.version>
<spark.version>2.1.3</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.35</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<!--/* *-->
<!--<plugin>-->
<!--<groupId>org.apache.maven.plugins</groupId>-->
<!--<artifactId>maven-shade-plugin</artifactId>-->
<!--<version>2.4.3</version>-->
<!--<executions>-->
<!--<execution>-->
<!--<phase>package</phase>-->
<!--<goals>-->
<!--<goal>shade</goal>-->
<!--</goals>-->
<!--<configuration>-->
<!--<filters>-->
<!--<filter>-->
<!--<artifact>*:*</artifact>-->
<!--<excludes>-->
<!--<exclude>META-INF/*.SF</exclude>-->
<!--<exclude>META-INF/*.DSA</exclude>-->
<!--<exclude>META-INF/*.RSA</exclude>-->
<!--</excludes>-->
<!--</filter>-->
<!--</filters>-->
<!--<transformers>-->
<!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">-->
<!--<mainClass>zt.spark.wordCount.WordCount_onlie</mainClass>-->
<!--</transformer>-->
<!--</transformers>-->
<!--</configuration>-->
<!--</execution>-->
<!--</executions>-->
<!--</plugin>-->
<!--*/-->
</plugins>
</build>
</project>
(2)代码实现
package zt.spark.iplocation
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//todo: 这里是实现利用spark实现ip地址查询
object IpLocation {
//这个方法是:经ip日志中的ip地址转换为Long类型的数据
def ip2Long(ip: String): Long = {
var ips: Array[String] = ip.split("\\.")
var ipNum = 0L
//遍历数组
for (i <- ips) {
//左移8位
ipNum = i.toLong | ipNum << 8L
}
ipNum
}
//这个方法是:通过二分查找获取long类型的数字在数组中下标
def binarySearch(ipNum: Long, broadcastValue: Array[(String, String, String, String)]): Int = {
var start = 0
var end = broadcastValue.length - 1
while (start <= end) {
val middle = (start + end) / 2
//表示找到
if (ipNum >= broadcastValue(middle)._1.toLong && ipNum <= broadcastValue(middle)._2.toLong) {
return middle
}
//表示需要查找的数据在上半部分
if (ipNum < broadcastValue(middle)._1.toLong) {
end = middle - 1
}
//表示需要查找的数据在下半部分
if (ipNum > broadcastValue(middle)._2.toLong) {
start = middle + 1
}
}
//若是没有查找到就返回-1
-1
}
//把查询的结果保存在数据中
def data2mysql(iter: Iterator[((String, String), Int)]): Unit = {
//定义数据连接
var conn: Connection = null;
var ps: PreparedStatement = null;
val sql = "insert into iplocation(longitude,latitude,total_count)values(?,?,?)"
conn = DriverManager.getConnection("jdbc:mysql://192.168.111.133:3306/spark","root","123456")
ps = conn.prepareStatement(sql)
//遍历迭代器
iter.foreach(line => {
//给占位符赋值
ps.setString(1, line._1._1)
ps.setString(2, line._1._2)
ps.setLong(3, line._2)
//执行sql
ps.execute()
})
}
def main(args: Array[String]): Unit = {
//1.获取sparkConf对象
val sparkConf: SparkConf = new SparkConf().setAppName("ip_city").setMaster("local[2]")
//2.获取sparkContext对象
val sc: SparkContext = new SparkContext(sparkConf)
//3.加载基站ip数据
var ip_City: RDD[String] = sc.textFile("e:\\ip.txt")
//4.加载ip日志信息
var ipsLog: RDD[String] = sc.textFile("e:\\20090121000132.394251.http.format")
//5.获取ip日志信息中的ip地址字段
val ips: RDD[String] = ipsLog.map(x => x.split("\\|")(1))
//6.获取基站的数据的经度 维度 ip开始字段 ip结束字段
var ipCitys: RDD[(String, String, String, String)] = ip_City.map(x => x.split("\\|")).map(x => (x(2), x(3), x(x.length - 2), x(x.length - 1)))
//7.由于城市ip每一条ip日志信息都是需要的 这里为了减少每次都要进行加载基站信息 这类通过spark的广播机制 只需要把基站的数据发送到每一个worker即可
var broadcast: Broadcast[Array[(String, String, String, String)]] = sc.broadcast(ipCitys.collect())
//8.遍历ip日志信息 获取ip 并把ip转换成为long类型的数据 最后在基站中进二分查找
var result: RDD[((String, String), Int)] = ips.mapPartitions(iter => {
//通过函数体来控制上面的逻辑
//8.1 获取广播的内容
var broadcastValue: Array[(String, String, String, String)] = broadcast.value
//8.2 遍历迭代器
iter.map(ip => {
//获取ip 把ip转换为long类型
val iplong: Long = ip2Long(ip)
//通过二分查找获取long类型的数字在数组中下标
val index = binarySearch(iplong: Long, broadcastValue)
//获取对应下标的值
var value: (String, String, String, String) = broadcastValue(index)
//把获取的结果封装在一个元组中((经度,维度),1)
((value._3, value._4), 1)
})
})
//9.把相同的经度和维度出现1 进行累加
val finalResult: RDD[((String, String), Int)] = result.reduceByKey(_ + _)
//10.进行打印
finalResult.foreach(println)
//11.把数据保存在mysql数据库中
finalResult.foreachPartition(data2mysql)
//12.关闭 sparkContext
sc.stop()
}
}
(3)运行结果
- idea端
- 数据库端
6.案例技术要点
-
long类型转化
-
二分查找
-
spark广播机制
-
数据导入到数据库