中国移动运营分析实时监控平台之数据采集及SparkStreaming直连Kafka

数据采集阶段

通过自定义source,将产生的日志从服务器上缓存到kafka中,并记录自定义的Offset。
部分数据:

{"bussinessRst":"0000","channelCode":"0705","chargefee":"10000","clientIp":"222.214.151.245","gateway_id":"CMPAY","idType":"01","interFacRst":"0000","logOutTime":"20170412080614847","orderId":"384681966165260394","phoneno":"15189718490","provinceCode":"250","receiveNotifyTime":"20170412080614806","requestId":"20170412080606314277006070809193","resultTime":"20170412080614","retMsg":"接口调用成功","serverIp":"10.255.254.10","serverPort":"8714","serviceName":"reChargeNotifyReq","sysId":"15"}
{"bussinessRst":"0000","channelCode":"0705","chargefee":"3150","clientIp":"171.217.123.87","gateway_id":"CMPAY","idType":"01","interFacRst":"0000","logOutTime":"20170412080615308","orderId":"384681969178557661","phoneno":"18323566642","provinceCode":"230","receiveNotifyTime":"20170412080615226","requestId":"20170412080609925965451280348478","resultTime":"20170412080616","retMsg":"接口调用成功","serverIp":"10.255.254.10","serverPort":"8714","serviceName":"reChargeNotifyReq","sysId":"15"}
{"bussinessRst":"0000","channelCode":"0705","chargefee":"3000","clientIp":"125.82.117.133","gateway_id":"CMPAY","interFacRst":"0000","logOutTime":"20170412080615349","orderId":"384681896178290781","payPhoneNo":"","phoneno":"15101443766","provinceCode":"931","rateoperateid":"1514","receiveNotifyTime":"20170412080615308","requestId":"20170412080456962712263849986037","retMsg":"接口调用成功","serverIp":"10.255.254.10","serverPort":"8714","serviceName":"payNotifyReq","shouldfee":"2985","srcChannel":"0003","sysId":"15"}
{"bussinessRst":"0000","channelCode":"0705","chargefee":"3000","clientIp":"42.236.170.120","gateway_id":"ALIPAY","interFacRst":"0000","logOutTime":"20170412080615385","orderId":"384681971178040991","payPhoneNo":"","phoneno":"18781554082","provinceCode":"280","rateoperateid":"1514","receiveNotifyTime":"20170412080615348","requestId":"20170412080611883464487092796684","retMsg":"接口调用成功","serverIp":"10.255.254.10","serverPort":"8714","serviceName":"payNotifyReq","shouldfee":"2985","srcChannel":"0003","sysId":"15"}

数据中的属性描述:
中国移动运营分析实时监控平台之数据采集及SparkStreaming直连Kafka
中国移动运营分析实时监控平台之数据采集及SparkStreaming直连Kafka
这里采用flume进行数据采集,直接采集到kafka上,模拟实现实时数据分析。
简单说下采集的原理,在自定义的配置文件中,flume监控一个文件夹,如果该文件夹中发生文件变动,则认为时数据,直接采集,通过配置文件将采集到的数据下沉到kafka中,然后SparkStreaming直连kafka,这样就取到了数据。

这是配置文件中的内容:

#定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述和配置source组件:r1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/kafka_2.11-0.10.2.1/data
a1.sources.r1.fileHeader = true

# 描述和配置sink组件:k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = JsonData
a1.sinks.k1.kafka.bootstrap.servers = hadoop-master:9092,hadoop-slave02:9092,hadoop-slave03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy


# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这是采集的命令:

bin/bin/flume-ng agent -c conf -f conf/mycat-logger.conf -n a1 -Dflume.root.logger=INFO,console

SparkStreaming直连Kafka的代码:

package cn.sheep.app

/**
  * 中国移动运营实时监控平台
  *
  * @author WangLeiKai
  *         2018/10/17  8:42
  */

import cn.sheep.utils.{CaculateTools, ConfUtil, KpiTools}
import com.alibaba.fastjson.{JSON}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object AppMain2 {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf()
    sparkConf.setAppName("中国移动运营实时监控平台-Monitor")
    //如果在集群上运行的话,需要去掉:sparkConf.setMaster("local[*]")
    sparkConf.setMaster("local[*]")
    //默认采用org.apache.spark.serializer.JavaSerializer
    //这是最基本的优化
    //将rdd以序列化格式来保存以减少内存的占用
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //rdd压缩
    sparkConf.set("spark.rdd.compress", "true")
    //batchSize = partitionNum * 分区数量 * 采样时间
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "100")
    //优雅的停止
    sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    /**
      * 广播省份code和name对应信息
      */
    val pcode2pname: Broadcast[Map[String, AnyRef]] = ssc.sparkContext.broadcast(ConfUtil.pcode2pname)

    /** 获取kafka的数据
      * LocationStrategies:位置策略,如果kafka的broker节点跟Executor在同一台机器上给一种策略,不在一台机器上给另外一种策略
      * 设定策略后会以最优的策略进行获取数据
      * 一般在企业中kafka节点跟Executor不会放到一台机器的,原因是kakfa是消息存储的,Executor用来做消息的计算,
      * 因此计算与存储分开,存储对磁盘要求高,计算对内存、CPU要求高
      * 如果Executor节点跟Broker节点在一起的话使用PreferBrokers策略,如果不在一起的话使用PreferConsistent策略
      * 使用PreferConsistent策略的话,将来在kafka中拉取了数据以后尽量将数据分散到所有的Executor上 */
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](ConfUtil.topic, ConfUtil.kafkaParams))

    /**
      * 数据处理
      */
    stream.foreachRDD(rdd => {
      val baseData = rdd
        // ConsumerRecord => JSONObject
        .map(cr => JSON.parseObject(cr.value()))
        // 过滤出充值通知日志
        .filter(obj => obj.getString("serviceName").equalsIgnoreCase("reChargeNotifyReq"))
        .map(obj => {

          // 判断该条日志是否是充值成功的日志
          val result = obj.getString("bussinessRst")
          //充值的金额
          val fee = obj.getDouble("chargefee")
          //获取数据对应的省份的code
          val provinceCode = obj.getString("provinceCode")
          // 充值发起时间和结束时间
          val requestId = obj.getString("requestId")
          // 数据当前日期,小时,分钟
          val day = requestId.substring(0, 8)
          val hour = requestId.substring(8, 10)
          val minute = requestId.substring(10, 12)
          val receiveTime = obj.getString("receiveNotifyTime")

          //充值花费的时间
          val costTime = CaculateTools.caculateTime(requestId, receiveTime)
          val succAndFeeAndTime: (Double, Double, Double) = if (result.equals("0000")) (1, fee, costTime) else (0, 0, 0)

          // (日期, 小时,分钟, Kpi(订单,成功订单,订单金额,订单时长),省份)
          (day, hour, minute, List[Double](1, succAndFeeAndTime._1, succAndFeeAndTime._2, succAndFeeAndTime._3), provinceCode)
        }).cache()
		//打印数据
		baseData.foreach(println)
	//启动sparkstreaming
    ssc.start()
    //优雅的停止
    ssc.awaitTermination()