spark案例---实现ip地址查询

通过Spark实现ip地址查询

1.需求分析

​ 在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。

spark案例---实现ip地址查询

通过日志信息(运行商家或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬度。

2.技术调研

因为我们的需求是完成一张报表信息,所以对程序的实时性没有要求,所以可以选择内存计算spark来实现上述功能。

3.架构设计

搭建spark集群

4.开发流程

4.1数据准备

4.2 ip日志信息

在ip日志信息中,我们只需要关心ip这一个维度就可以了,其他的不做介绍
spark案例---实现ip地址查询

4.3 城市ip端信息

spark案例---实现ip地址查询

5.代码开发

5.1代码开发思路

  • (1)加载城市ip段信息,获取起始数字和结束数字,经度,维度。
  • (2) 加载日志文件,获取ip地址,然后转换为数字,和ip段比较
  • (3)比较的时候采用二分法查找,找到对应的经度和维度
  • (4)然后对经度和维度做单词计数。

5.2代码实现

(1)创建一个maven工程

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>Spark_01</artifactId>
        <groupId>zt.Spark_01</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spark02</artifactId>
    <properties>
        <scala.version>2.11.8</scala.version>
        <hadoop.version>2.7.4</hadoop.version>
        <spark.version>2.1.3</spark.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.35</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
           <!--/* *-->
            <!--<plugin>-->
                <!--<groupId>org.apache.maven.plugins</groupId>-->
                <!--<artifactId>maven-shade-plugin</artifactId>-->
                <!--<version>2.4.3</version>-->
                <!--<executions>-->
                    <!--<execution>-->
                        <!--<phase>package</phase>-->
                        <!--<goals>-->
                            <!--<goal>shade</goal>-->
                        <!--</goals>-->
                        <!--<configuration>-->
                            <!--<filters>-->
                                <!--<filter>-->
                                    <!--<artifact>*:*</artifact>-->
                                    <!--<excludes>-->
                                        <!--<exclude>META-INF/*.SF</exclude>-->
                                        <!--<exclude>META-INF/*.DSA</exclude>-->
                                        <!--<exclude>META-INF/*.RSA</exclude>-->
                                    <!--</excludes>-->
                                <!--</filter>-->
                            <!--</filters>-->
                            <!--<transformers>-->
                                <!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">-->
                                    <!--<mainClass>zt.spark.wordCount.WordCount_onlie</mainClass>-->
                                <!--</transformer>-->
                            <!--</transformers>-->
                        <!--</configuration>-->
                    <!--</execution>-->
                <!--</executions>-->
            <!--</plugin>-->
            <!--*/-->
        </plugins>
    </build>

</project>

(2)代码实现

package zt.spark.iplocation
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

//todo:  这里是实现利用spark实现ip地址查询
object IpLocation {

  //这个方法是:经ip日志中的ip地址转换为Long类型的数据
  def ip2Long(ip: String): Long = {
    var ips: Array[String] = ip.split("\\.")
    var ipNum = 0L
    //遍历数组
    for (i <- ips) {
      //左移8位
      ipNum = i.toLong | ipNum << 8L
    }
    ipNum
  }


  //这个方法是:通过二分查找获取long类型的数字在数组中下标
  def binarySearch(ipNum: Long, broadcastValue: Array[(String, String, String, String)]): Int = {
    var start = 0
    var end = broadcastValue.length - 1
    while (start <= end) {
      val middle = (start + end) / 2
      //表示找到
      if (ipNum >= broadcastValue(middle)._1.toLong && ipNum <= broadcastValue(middle)._2.toLong) {
        return middle
      }
      //表示需要查找的数据在上半部分
      if (ipNum < broadcastValue(middle)._1.toLong) {
        end = middle - 1
      }
      //表示需要查找的数据在下半部分
      if (ipNum > broadcastValue(middle)._2.toLong) {
        start = middle + 1
      }
    }
    //若是没有查找到就返回-1
    -1
  }


  //把查询的结果保存在数据中
  def data2mysql(iter: Iterator[((String, String), Int)]): Unit = {
    //定义数据连接
    var conn: Connection = null;
    var ps: PreparedStatement = null;
    val sql = "insert into iplocation(longitude,latitude,total_count)values(?,?,?)"
    conn = DriverManager.getConnection("jdbc:mysql://192.168.111.133:3306/spark","root","123456")
    ps = conn.prepareStatement(sql)
    //遍历迭代器
    iter.foreach(line => {
      //给占位符赋值
      ps.setString(1, line._1._1)
      ps.setString(2, line._1._2)
      ps.setLong(3, line._2)
      //执行sql
      ps.execute()
    })
  }

  def main(args: Array[String]): Unit = {
    //1.获取sparkConf对象
    val sparkConf: SparkConf = new SparkConf().setAppName("ip_city").setMaster("local[2]")
    //2.获取sparkContext对象
    val sc: SparkContext = new SparkContext(sparkConf)
    //3.加载基站ip数据
    var ip_City: RDD[String] = sc.textFile("e:\\ip.txt")
    //4.加载ip日志信息
    var ipsLog: RDD[String] = sc.textFile("e:\\20090121000132.394251.http.format")
    //5.获取ip日志信息中的ip地址字段
    val ips: RDD[String] = ipsLog.map(x => x.split("\\|")(1))
    //6.获取基站的数据的经度  维度 ip开始字段  ip结束字段
    var ipCitys: RDD[(String, String, String, String)] = ip_City.map(x => x.split("\\|")).map(x => (x(2), x(3), x(x.length - 2), x(x.length - 1)))
    //7.由于城市ip每一条ip日志信息都是需要的  这里为了减少每次都要进行加载基站信息 这类通过spark的广播机制 只需要把基站的数据发送到每一个worker即可
    var broadcast: Broadcast[Array[(String, String, String, String)]] = sc.broadcast(ipCitys.collect())
    //8.遍历ip日志信息 获取ip  并把ip转换成为long类型的数据  最后在基站中进二分查找
    var result: RDD[((String, String), Int)] = ips.mapPartitions(iter => {
      //通过函数体来控制上面的逻辑
      //8.1 获取广播的内容
      var broadcastValue: Array[(String, String, String, String)] = broadcast.value
      //8.2  遍历迭代器
      iter.map(ip => {
        //获取ip  把ip转换为long类型
        val iplong: Long = ip2Long(ip)
        //通过二分查找获取long类型的数字在数组中下标
        val index = binarySearch(iplong: Long, broadcastValue)
        //获取对应下标的值
        var value: (String, String, String, String) = broadcastValue(index)
        //把获取的结果封装在一个元组中((经度,维度),1)
        ((value._3, value._4), 1)
      })
    })
    //9.把相同的经度和维度出现1 进行累加
    val finalResult: RDD[((String, String), Int)] = result.reduceByKey(_ + _)
    //10.进行打印
    finalResult.foreach(println)
    //11.把数据保存在mysql数据库中
     finalResult.foreachPartition(data2mysql)
    //12.关闭  sparkContext
    sc.stop()

  }
}

(3)运行结果

  • idea端

spark案例---实现ip地址查询

  • 数据库端

spark案例---实现ip地址查询

6.案例技术要点

  • long类型转化

  • 二分查找

  • spark广播机制

  • 数据导入到数据库