中国移动运营分析实时监控平台之数据采集及SparkStreaming直连Kafka
数据采集阶段
通过自定义source,将产生的日志从服务器上缓存到kafka中,并记录自定义的Offset。
部分数据:
{"bussinessRst":"0000","channelCode":"0705","chargefee":"10000","clientIp":"222.214.151.245","gateway_id":"CMPAY","idType":"01","interFacRst":"0000","logOutTime":"20170412080614847","orderId":"384681966165260394","phoneno":"15189718490","provinceCode":"250","receiveNotifyTime":"20170412080614806","requestId":"20170412080606314277006070809193","resultTime":"20170412080614","retMsg":"接口调用成功","serverIp":"10.255.254.10","serverPort":"8714","serviceName":"reChargeNotifyReq","sysId":"15"}
{"bussinessRst":"0000","channelCode":"0705","chargefee":"3150","clientIp":"171.217.123.87","gateway_id":"CMPAY","idType":"01","interFacRst":"0000","logOutTime":"20170412080615308","orderId":"384681969178557661","phoneno":"18323566642","provinceCode":"230","receiveNotifyTime":"20170412080615226","requestId":"20170412080609925965451280348478","resultTime":"20170412080616","retMsg":"接口调用成功","serverIp":"10.255.254.10","serverPort":"8714","serviceName":"reChargeNotifyReq","sysId":"15"}
{"bussinessRst":"0000","channelCode":"0705","chargefee":"3000","clientIp":"125.82.117.133","gateway_id":"CMPAY","interFacRst":"0000","logOutTime":"20170412080615349","orderId":"384681896178290781","payPhoneNo":"","phoneno":"15101443766","provinceCode":"931","rateoperateid":"1514","receiveNotifyTime":"20170412080615308","requestId":"20170412080456962712263849986037","retMsg":"接口调用成功","serverIp":"10.255.254.10","serverPort":"8714","serviceName":"payNotifyReq","shouldfee":"2985","srcChannel":"0003","sysId":"15"}
{"bussinessRst":"0000","channelCode":"0705","chargefee":"3000","clientIp":"42.236.170.120","gateway_id":"ALIPAY","interFacRst":"0000","logOutTime":"20170412080615385","orderId":"384681971178040991","payPhoneNo":"","phoneno":"18781554082","provinceCode":"280","rateoperateid":"1514","receiveNotifyTime":"20170412080615348","requestId":"20170412080611883464487092796684","retMsg":"接口调用成功","serverIp":"10.255.254.10","serverPort":"8714","serviceName":"payNotifyReq","shouldfee":"2985","srcChannel":"0003","sysId":"15"}
数据中的属性描述:
这里采用flume进行数据采集,直接采集到kafka上,模拟实现实时数据分析。
简单说下采集的原理,在自定义的配置文件中,flume监控一个文件夹,如果该文件夹中发生文件变动,则认为时数据,直接采集,通过配置文件将采集到的数据下沉到kafka中,然后SparkStreaming直连kafka,这样就取到了数据。
这是配置文件中的内容:
#定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source组件:r1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/kafka_2.11-0.10.2.1/data
a1.sources.r1.fileHeader = true
# 描述和配置sink组件:k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = JsonData
a1.sinks.k1.kafka.bootstrap.servers = hadoop-master:9092,hadoop-slave02:9092,hadoop-slave03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
这是采集的命令:
bin/bin/flume-ng agent -c conf -f conf/mycat-logger.conf -n a1 -Dflume.root.logger=INFO,console
SparkStreaming直连Kafka的代码:
package cn.sheep.app
/**
* 中国移动运营实时监控平台
*
* @author WangLeiKai
* 2018/10/17 8:42
*/
import cn.sheep.utils.{CaculateTools, ConfUtil, KpiTools}
import com.alibaba.fastjson.{JSON}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object AppMain2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setAppName("中国移动运营实时监控平台-Monitor")
//如果在集群上运行的话,需要去掉:sparkConf.setMaster("local[*]")
sparkConf.setMaster("local[*]")
//默认采用org.apache.spark.serializer.JavaSerializer
//这是最基本的优化
//将rdd以序列化格式来保存以减少内存的占用
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//rdd压缩
sparkConf.set("spark.rdd.compress", "true")
//batchSize = partitionNum * 分区数量 * 采样时间
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "100")
//优雅的停止
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(sparkConf, Seconds(2))
/**
* 广播省份code和name对应信息
*/
val pcode2pname: Broadcast[Map[String, AnyRef]] = ssc.sparkContext.broadcast(ConfUtil.pcode2pname)
/** 获取kafka的数据
* LocationStrategies:位置策略,如果kafka的broker节点跟Executor在同一台机器上给一种策略,不在一台机器上给另外一种策略
* 设定策略后会以最优的策略进行获取数据
* 一般在企业中kafka节点跟Executor不会放到一台机器的,原因是kakfa是消息存储的,Executor用来做消息的计算,
* 因此计算与存储分开,存储对磁盘要求高,计算对内存、CPU要求高
* 如果Executor节点跟Broker节点在一起的话使用PreferBrokers策略,如果不在一起的话使用PreferConsistent策略
* 使用PreferConsistent策略的话,将来在kafka中拉取了数据以后尽量将数据分散到所有的Executor上 */
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](ConfUtil.topic, ConfUtil.kafkaParams))
/**
* 数据处理
*/
stream.foreachRDD(rdd => {
val baseData = rdd
// ConsumerRecord => JSONObject
.map(cr => JSON.parseObject(cr.value()))
// 过滤出充值通知日志
.filter(obj => obj.getString("serviceName").equalsIgnoreCase("reChargeNotifyReq"))
.map(obj => {
// 判断该条日志是否是充值成功的日志
val result = obj.getString("bussinessRst")
//充值的金额
val fee = obj.getDouble("chargefee")
//获取数据对应的省份的code
val provinceCode = obj.getString("provinceCode")
// 充值发起时间和结束时间
val requestId = obj.getString("requestId")
// 数据当前日期,小时,分钟
val day = requestId.substring(0, 8)
val hour = requestId.substring(8, 10)
val minute = requestId.substring(10, 12)
val receiveTime = obj.getString("receiveNotifyTime")
//充值花费的时间
val costTime = CaculateTools.caculateTime(requestId, receiveTime)
val succAndFeeAndTime: (Double, Double, Double) = if (result.equals("0000")) (1, fee, costTime) else (0, 0, 0)
// (日期, 小时,分钟, Kpi(订单,成功订单,订单金额,订单时长),省份)
(day, hour, minute, List[Double](1, succAndFeeAndTime._1, succAndFeeAndTime._2, succAndFeeAndTime._3), provinceCode)
}).cache()
//打印数据
baseData.foreach(println)
//启动sparkstreaming
ssc.start()
//优雅的停止
ssc.awaitTermination()