SparkStreaming+kafka+flume+hbase日志实时流处理项目

1.项目背景:

互联网访问日志概述

为什么要记录访问日志的行为呢?

       通过日志我们可以得到网站页面的访问量,网站的黏性,推荐用户行为分析,是指在获得网站访问量基本数据的情况下,对有关数据进行统计、分析,从中发现用户访问网站的规律,并将这些规律与网络营销策略等相结合,从而发现目前网络营销活动中可能存在的问题,并为进一步修正或重新制定网络营销策略提供依据。

2.项目需求:

统计网页版爱艺奇视频:

 今天到现在为止的每个类别的访问量 。

今天到现在为止从搜索引擎引流过来的类别的访问量。

今天到现在为止每个栏目下面的销售额。

今天到现在为止每个省份的购买量。 

重点分析的数据

用户行为分析应该包含以下数据重点分析:
* 用户的来源地区、来路域名和页面;
* 用户在网站的停留时间、跳出率、回访者、新访问者、回访次数、回访相隔天数;
* 注册用户和非注册用户,分析两者之间的浏览习惯;
* 用户所使用的搜索引擎、关键词、关联关键词和站内关键字;
* 用户选择什么样的入口形式(广告或者网站入口链接)更为有效;
* 用户访问网站流程,用来分析页面结构设计是否合理;
* 用户在页面上的网页热点图分布数据和网页覆盖图数据;
* 用户在不同时段的访问量情况等:
* 用户对于网站的字体颜色的喜好程度。
日志格式字段: ip 地址 用户名 访问时间 访问的模块地址 使用的方式 .....

3.项目意义:

        通过对用户行为监测获得的数据进行分析,可以让企业更加详细、清楚地了解用户的行为习惯,从而找出网站、推广渠道等企业营销环境存在的问题,有助于企业发掘高转化率页面, 让企业的营销更加精准、有效,提高业务转化率,从而提升企业的广告收益。

4.项目结构框图:

SparkStreaming+kafka+flume+hbase日志实时流处理项目

5.具体设计流程

步骤一:

1. 日志分析

打开爱奇艺网站进入到页面,通过对爱奇艺网址进行分析,当点击电影频道时,主网址后面跟着 www/1,点击电视剧频道时,主网址后面跟着www/2. 所以通过对频道网址的分析得出如下数据:

www/1--代表电影    www/2--代表电视剧  

www/3-- 代表记录篇   www/4-- 代表动漫 

www/5--代表音乐       www/6--代表综艺    

www/7--代表娱乐      www/8--代表游戏      

www/9-- 代表旅游  www/10--代表片花

www/11--代表综艺     www/12--代表教育     

www/13--代表时尚     www/15--代表儿童

www/16--网络电视    www/17--体育

 www/20--广告         www/21--生活

 www/22--搞笑        www/24--财经 

 www/25--资讯       www/26--汽车 

 www/27--原创       www/28--军事

www/29--母婴    www/30--科技   

www/31--脱口秀    www/32--健康

SparkStreaming+kafka+flume+hbase日志实时流处理项目

SparkStreaming+kafka+flume+hbase日志实时流处理项目

2. 生成日志

使用pycharm编辑器生成日志,日志保存在f盘,总共生成500万条数据

# coding=UTF-8
import random
import time

url_paths = [
    "www/1",
    "www/2",
    "www/3",
    "www/4",
    "www/5",
    "www/6",
    "www/7",
    "www/8",
    "www/9",
    "www/10",
    "www/11",
    "www/12",
    "www/13",
    "www/15",
    "www/16",
    "www/17",
    "www/20",
    "www/21",
    "www/22",
    "www/24",
    "www/25",
    "www/26",
    "www/27",
    "www/28",
    "www/29",
    "www/30",
    "www/31",
    "www/32",
    "pianhua/130",
    "toukouxu/821"
]

status_code = [404, 302, 200]

ip_slices = [132, 156, 124, 10, 29, 167, 143, 187, 30, 100]

http_referers = [
    "https://www.baidu.com/s?wd={query}",
    "https://www.sogou.com/web?qu={query}",
    "http://cn.bing.com/search?q={query}",
    "https://search.yahoo.com/search?p={query}",
    "http://www.chinaso.com/search/pagesearch.htm?q={query}",
    "https://search.yahoo.com/search?p={query}",
    "https://iask.sina.com.cn/search?searchWord={query}",
    "https://www.so.com/s?ie={query}"
]

search_keyword = [
    "比悲伤更悲伤的故事",
    "流浪地球",
    "复仇者联盟4:终局之战",
    "疯狂的外星人",
    "飞驰人生",
    "大黄蜂",
    "惊奇队长",
    "阿丽塔:战斗天使",
    "反贪风暴4",
    "熊出没·原始时代",
    "新喜剧之王",
    "大侦探皮卡丘",
    "绿皮书",
    "白蛇:缘起",
    "驯龙高手3",
    "老师·好",
    "何以为家",
    "调音师",
    "雷霆沙赞!",
    "死侍2:我爱我家",
    "密室逃生",
    "一吻定情",
    "神探蒲松龄",
    "小飞象",
    "小猪佩奇过大年",
    "下一任:前任",
    "廉政风云",
    "海市蜃楼",
    "波西米亚狂想曲",
    "一个母亲的复仇",
    "一条狗的使命2",
    "祈祷落幕时",
    "风中有朵雨做的云",
    "神奇乐园历险记",
    "地久天长",
    "狂暴凶狮",
    "破冰行动",
    "封神演义",
    "重耳**",
    "反恐特战队之天狼",
    "一场遇见爱情的旅行",
    "我的真朋友",
    "我要和你在一起",
    "燕阳春",
    "白发",
    "绽放吧,百合",
    "危机迷雾",
    "如果可以这样爱",
    "黄河英雄",
    "爱上你治愈我",
    "守望正义",
    "密查",
    "女人当官2",
    "荡寇",
    "推手",
    "春暖花又开",
    "我们的生活充满阳光",
    "刘家媳妇",
    "面具背后",
    "都挺好",
    "趁我们还年轻",
    "新白娘子**",
    "奔跑吧第3季",
    "向往的生活第3季",
    "极限挑战第5季",
    "王牌对王牌第4季",
    "爱情保卫战2019",
    "妻子的浪漫旅行第2季",
    "我是唱作人",
    "金牌调解",
    "陪你莫属",
    "青春的花路",
    "青春环游记",
    "非诚勿扰",
    "第三解调室",
    "经典**",
    "极限挑战第4季",
    "跨界喜剧王第3季",
    "你看谁来了",
    "中国新相亲",
    "向往的生活第2季",
    "航海王",
    "阿衰",
    "万古仙穹第3季",
    "猫和老鼠",
    "斗罗大陆2绝世唐门",
    "天价宠妻",
    "妖精的尾巴",
    "逆天邪神",
    "黑色四叶草"
]


# ip地址
def sample_ip():
    slice = random.sample(ip_slices, 4)
    return ".".join([str(item) for item in slice])


def sample_url():
    return random.sample(url_paths, 1)[0]


def sample_status():
    return random.sample(status_code, 1)[0]


def sample_referer():
    if random.uniform(0, 1) > 0.2:
        return "-"
    refer_str = random.sample(http_referers, 1)
    # print refer_str[0]
    query_str = random.sample(search_keyword, 1)
    # print query_str[0]
    return refer_str[0].format(query=query_str[0])


# 产生log
def generate_log(count=10):
    time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    f = open("f:\\logs","w+")
    #f = open("/home/centos/log/log", "a+")
    while count >= 1:
        query_log = "{ip}\t{localtime}\t\"GET {url} HTTP/1.0\"\t{referece}\t{status1}".format(ip=sample_ip(),
                                                                                              url=sample_url(),
                                                                                              status1=sample_status(),
                                                                                              referece=sample_referer(),
                                                                                              localtime=time_str)
        # print query_log
        f.write(query_log + "\n")
        count = count - 1;


if __name__ == '__main__':
    generate_log(5000000)
# print "1111"

生成的日志部分截图为:

SparkStreaming+kafka+flume+hbase日志实时流处理项目

3.定时在虚拟机中产生日志

1) 选择一台虚拟机用来定时生成日志   例如普通用户为hadoop  主机名为 mini1 :

SparkStreaming+kafka+flume+hbase日志实时流处理项目

2)如果没有logs 文件夹 自行创建一个文件夹  mkdir logs    该logs 文件用来存放自动生成日志的python代码

cd  logs

3) 创建generate.py    然后将脚本复制粘贴进去    ctrl+x 保存   按 Y 确定   按 Enter 退出。

SparkStreaming+kafka+flume+hbase日志实时流处理项目

# coding=UTF-8
import random
import time

url_paths = [
    "www/1",
    "www/2",
    "www/3",
    "www/4",
    "www/5",
    "www/6",
    "www/7",
    "www/8",
    "www/9",
    "www/10",
    "www/11",
    "www/12",
    "www/13",
    "www/15",
    "www/16",
    "www/17",
    "www/20",
    "www/21",
    "www/22",
    "www/24",
    "www/25",
    "www/26",
    "www/27",
    "www/28",
    "www/29",
    "www/30",
    "www/31",
    "www/32",
    "pianhua/130",
    "toukouxu/821"
]

status_code = [404, 302, 200]

ip_slices = [132, 156, 124, 10, 29, 167, 143, 187, 30, 100]

http_referers = [
    "https://www.baidu.com/s?wd={query}",
    "https://www.sogou.com/web?qu={query}",
    "http://cn.bing.com/search?q={query}",
    "https://search.yahoo.com/search?p={query}",
    "http://www.chinaso.com/search/pagesearch.htm?q={query}",
    "https://search.yahoo.com/search?p={query}",
    "https://iask.sina.com.cn/search?searchWord={query}",
    "https://www.so.com/s?ie={query}"
]

search_keyword = [
    "比悲伤更悲伤的故事",
    "流浪地球",
    "复仇者联盟4:终局之战",
    "疯狂的外星人",
    "飞驰人生",
    "大黄蜂",
    "惊奇队长",
    "阿丽塔:战斗天使",
    "反贪风暴4",
    "熊出没·原始时代",
    "新喜剧之王",
    "大侦探皮卡丘",
    "绿皮书",
    "白蛇:缘起",
    "驯龙高手3",
    "老师·好",
    "何以为家",
    "调音师",
    "雷霆沙赞!",
    "死侍2:我爱我家",
    "密室逃生",
    "一吻定情",
    "神探蒲松龄",
    "小飞象",
    "小猪佩奇过大年",
    "下一任:前任",
    "廉政风云",
    "海市蜃楼",
    "波西米亚狂想曲",
    "一个母亲的复仇",
    "一条狗的使命2",
    "祈祷落幕时",
    "风中有朵雨做的云",
    "神奇乐园历险记",
    "地久天长",
    "狂暴凶狮",
    "破冰行动",
    "封神演义",
    "重耳**",
    "反恐特战队之天狼",
    "一场遇见爱情的旅行",
    "我的真朋友",
    "我要和你在一起",
    "燕阳春",
    "白发",
    "绽放吧,百合",
    "危机迷雾",
    "如果可以这样爱",
    "黄河英雄",
    "爱上你治愈我",
    "守望正义",
    "密查",
    "女人当官2",
    "荡寇",
    "推手",
    "春暖花又开",
    "我们的生活充满阳光",
    "刘家媳妇",
    "面具背后",
    "都挺好",
    "趁我们还年轻",
    "新白娘子**",
    "奔跑吧第3季",
    "向往的生活第3季",
    "极限挑战第5季",
    "王牌对王牌第4季",
    "爱情保卫战2019",
    "妻子的浪漫旅行第2季",
    "我是唱作人",
    "金牌调解",
    "陪你莫属",
    "青春的花路",
    "青春环游记",
    "非诚勿扰",
    "第三解调室",
    "经典**",
    "极限挑战第4季",
    "跨界喜剧王第3季",
    "你看谁来了",
    "中国新相亲",
    "向往的生活第2季",
    "航海王",
    "阿衰",
    "万古仙穹第3季",
    "猫和老鼠",
    "斗罗大陆2绝世唐门",
    "天价宠妻",
    "妖精的尾巴",
    "逆天邪神",
    "黑色四叶草"
]


# ip地址
def sample_ip():
    slice = random.sample(ip_slices, 4)
    return ".".join([str(item) for item in slice])


def sample_url():
    return random.sample(url_paths, 1)[0]


def sample_status():
    return random.sample(status_code, 1)[0]


def sample_referer():
    if random.uniform(0, 1) > 0.2:
        return "-"
    refer_str = random.sample(http_referers, 1)
    # print refer_str[0]
    query_str = random.sample(search_keyword, 1)
    # print query_str[0]
    return refer_str[0].format(query=query_str[0])


# 产生log
def generate_log(count=10):
    time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    # f = open("f:\\logs1","w+")
    f = open("/home/hadoop/log/log", "a+")
    while count >= 1:
        query_log = "{ip}\t{localtime}\t\"GET {url} HTTP/1.0\"\t{referece}\t{status1}".format(ip=sample_ip(),
                                                                                              url=sample_url(),
                                                                                              status1=sample_status(),
                                                                                              referece=sample_referer(),
                                                                                              localtime=time_str)
        # print query_log
        f.write(query_log + "\n")
        count = count - 1;


if __name__ == '__main__':
    generate_log(5000)
# print "1111"

4) 日志生成的目录为 /home/hadoop/log/log    所以创建一个目录 用来保存日志

[[email protected] ~]$ mkdir -p /home/hadoop/log

5)进入到 logs 目录下 运行puthon脚本  

[[email protected] logs]$ python generate.py

SparkStreaming+kafka+flume+hbase日志实时流处理项目

6)重新创建一个脚本执行它,让每次在log中追加日志

[[email protected] logs]$ nano log_generator.sh 

7)  在文件中复制粘贴如下脚本,保存退出,

python  /home/hadoop/logs/generate.py

8)然后添加执行权限 :

[[email protected] logs]$ chmod u+x log_generator.sh

SparkStreaming+kafka+flume+hbase日志实时流处理项目

9) 执行: 点击执行 就会再原来的generate.py 里面追加5000条数据

[[email protected] logs]$ ./log_generator.sh

10) 创建一个定时器 让他定时执行./log_generator.sh   每分钟产生5000条日志追加到log日志里   使用crontab命令

crontab -e  

复制粘贴如下命令; 保存退出

*/1 * * * * /home/hadoop/logs/log_generator.sh

11) 启动定时  这样就会每分钟在log中产生日志

[[email protected] ~]$ crontab -l

上述的步骤 执行完成之后 就会每分钟产生5000条日志数据追加到log日志中,完成了结构框图中的前两步。

步骤二:

本文以mini1 、mini2、mini3三台虚拟机作为集群,mini1作为namenode 主节点  mini2、mini3作为datanode 从节点 

 

节点 说明   ip地址    zookeeper     flume    kafka spark
mini1   主节点 192.168.63.181   是     是 主节点
mini2 从节点 192.168.63.182   是    是 主节点
mini3   从节点 192.168.63.183   是     是 从节点
mini4           从节点


 启动和配置flume和kafka和zookeeper

flume安装配置启动 教程:https://blog.csdn.net/weixin_38201936/article/details/88642373

kafka安装配置启动 教程:https://blog.csdn.net/weixin_38201936/article/details/89226897

zookeeper安装配置启动教程:https://blog.csdn.net/weixin_38201936/article/details/88821559

上述安装配置完成后启动结果如下图:

SparkStreaming+kafka+flume+hbase日志实时流处理项目

SparkStreaming+kafka+flume+hbase日志实时流处理项目

SparkStreaming+kafka+flume+hbase日志实时流处理项目

1) 首先查看一下kafka中的所有topic 

kafka-topics.sh --list --zookeeper mini1:2181

2)创建一个topic:  flumeTopic

kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 1 --partitions 1 --topic flumeTopic

SparkStreaming+kafka+flume+hbase日志实时流处理项目

3) 启动一个消费者用来消费日志

[[email protected] bin]$ kafka-console-consumer.sh --zookeeper mini1:2181 --topic flumeTopic --from-beginning

现在对flume进行配置:

1)进入到flume/conf目录下 创建一个a1.conf   该文件是来采集日志文件,然后让kafka进行消费

SparkStreaming+kafka+flume+hbase日志实时流处理项目

2)创建的内容如下: 创建成功后保存退出

# 定义 agent
a1.sources = src1
a1.channels = ch1
a1.sinks = k1
# 定义 sources
a1.sources.src1.type = exec
a1.sources.src1.command=tail -F /home/hadoop/log/log
a1.sources.src1.channels=ch1
# 定义 sinks
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flumeTopic
a1.sinks.k1.brokerList = mini1:9092
a1.sinks.k1.batchSize = 20 
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.channel = ch1
# 定义 channels
a1.channels.ch1.type = memory
a1.channels.ch1.capacity = 1000

3)启动flume配置文件  启动成功后会看到如下截图

[[email protected] apache-flume-1.6.0-bin]$ bin/flume-ng agent --conf conf --conf-file conf/a1.conf --name a1 -Dflume.root.logger=INFO,console

SparkStreaming+kafka+flume+hbase日志实时流处理项目

上面已经将flume和kafka进行串联起来

总结一下 该怎么进行执行 

首先打开3个mini1 窗口 每个窗口修改一下名字  

mini1(1)  --  定时生产log日志 

mini1(2)  --  启动flume配置文件

mini1(3) --  kafka consume进行消费

SparkStreaming+kafka+flume+hbase日志实时流处理项目

执行命令 

1)在定时生产log日志窗口上 执行 刚才设置的定时命令  这样每分钟就会生产5000条日志数据  命令如下:

[[email protected] logs]$ crontab -l

2)在启动flume配置文件上 执行flume的配置文件   命令如下:  

[[email protected] apache-flume-1.6.0-bin]$ bin/flume-ng agent --conf conf --conf-file conf/a1.conf --name a1 -Dflume.root.logger=INFO,console

3)在 kafka consume进行消费 窗口上 执行 consume消费命令   命令如下:

[[email protected] bin]$ kafka-console-consumer.sh --zookeeper mini1:2181 --topic flumeTopic --from-beginning

三个命令全部执行后 flume+kafka就会串联起来

每分钟定时产生日志5000条日志,由flume进行采集,然后由kafka中的consume 进行消费

步骤三:

打通sparkStreaming +kafka 

1)环境搭建 在idea上创建maven项目,选择scala jar包

SparkStreaming+kafka+flume+hbase日志实时流处理项目

编写包名和类名

SparkStreaming+kafka+flume+hbase日志实时流处理项目

选择对应的settings.xml  和jar存放的仓库

SparkStreaming+kafka+flume+hbase日志实时流处理项目

创建成功后如图所示:刚创建好的项目版本是2.7.0

SparkStreaming+kafka+flume+hbase日志实时流处理项目

修改pom.xml文件 将新创建的pox.xml文件用下面代码替换

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com..study.spark</groupId>
  <artifactId>SparkTrain</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.11.8</scala.version>
    <kafka.version>0.10.0.0</kafka.version>
    <spark.version>2.2.0</spark.version>
    <hadoop.version>2.7.2</hadoop.version>
    <hbase.version>1.2.0</hbase.version>
  </properties>


  <dependencies>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <!--kafka 依 赖 -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.10.0.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>1.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-server</artifactId>
      <version>1.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>
package com.study.spark.project


import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext, kafka010}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

/**
  * Create by lyz on 2019/5/20
  */
object StateStreamingApp {

  def main(args: Array[String]): Unit = {


    val ssc = new StreamingContext("local[*]", "StatStreamingApp", Seconds(5))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.63.181:9092,192.168.63.181:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "example",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = List("flumeTopic").toSet
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    ).map(_.value())
    stream.print()

    ssc.start()
    ssc.awaitTermination();
  }
}