Flume案例:实时采集python爬取的豆瓣最新电影
首先,让我们看一下本案例的背景:通过python爬虫抓取豆瓣最新上映的电影信息,抓取的信息通过flume传输到HDFS中。python的版本是3.6,flume的版本是1.8。
Python 爬虫程序讲解
(1)编写网页爬虫程序,首先要对网页进行访问,python中使用的urllib库,代码如下:
from urllib import request
resp = request.urlopen('https://movie.douban.com/nowplaying/wuhan/')
html_data = resp.read().decode('utf-8')
其中https://movie.douban.com/nowplaying/wuhan/ 是豆瓣最新上映的电影页面,可以在浏览器中输入该网址进行查看。html_data是字符串类型的变量,里面存放了网页的html代码。
(2)对html代码进行解析,提取我们需要的数据。在python中使用BeautifulSoup库进行html代码的解析。(注:如果没有安装此库,则使用pip3 install beautifulsoup4进行安装即可!)
BeautifulSoup使用的格式如下:
BeautifulSoup(html,"html.parser")
第一个参数为需要提取数据的html,第二个参数是指定解析器。
(3)打开我们爬取网页的html代码,查看我们需要的数据所在html标签,如下图所示。
从上图中可以看出从div id=”nowplaying“标签开始是我们想要的数据,里面有电影的名称、评分、主演等信息。所以相应的代码编写如下:
from bs4 import BeautifulSoup as bs
soup = bs(html_data, 'html.parser')
nowplaying_movie = soup.find_all('div', id='nowplaying')
nowplaying_movie_list = nowplaying_movie[0].find_all('li', class_='list-item')
nowplaying_list = []
for item in nowplaying_movie_list:
nowplaying_dict = {}
nowplaying_dict['id'] = item['data-subject']
nowplaying_dict['title'] = item['data-title']
nowplaying_dict['score'] = item['data-score']
nowplaying_dict['region'] = item['data-region']
nowplaying_dict['director'] = item['data-director']
nowplaying_dict['votecount'] = item['data-votecount']
nowplaying_list.append(nowplaying_dict)
return nowplaying_list
(4)存储电影信息到CSV文件,相应的代码编写如下:
csv_file = open("/home/hadoop/data/flume-test/movielist/movielist.csv","w",newline = '')
writer = csv.writer(csv_file)
writer.writerow(['id','title','score','region','director','votecount'])
commentList = []
NowPlayingMovie_list = getNowPlayingMovie_list()
for i in range(len(NowPlayingMovie_list)):
NowPlayingMovie_list = getNowPlayingMovie_list()
for i in range(len(NowPlayingMovie_list)):
writer.writerow([NowPlayingMovie_list[i]['id'],NowPlayingMovie_list[i]['title'],NowPlayingMovie_list[i]['score'],NowPlayingMovie_list[i]['region'],NowPlayingMovie_list[i]['director'],NowPlayingMovie_list[i]['votecount']])
csv_file.close()
运行爬虫程序, 执行如下命令,将抓取的电影信息放在/home/hadoop/data/flume-test/movielist目录下的movielist.csv文件中,运行结果如图所示。
[[email protected] movielist]$ python3 scrape-douban.py
[[email protected] movielist]$ cat movielist.csv
Flume 配置文件
了解了flume 的运行机制后,接下来我们看一下本实例的flume配置文件。添加如下信息到flume.conf文件:
指定Agent的组件名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 指定Flume source(要监听的路径)
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/data/flume-test
a1.sources.r1.fileHeader = true
# 指定Flume sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/tmp/flume-movies/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 指定Flume channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 绑定source和sink到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
保存配置文件后, 启动flume agent,启动效果如图所示。
[[email protected] ~]$ cd /home/hadoop/software/apache-flume-1.8.0-bin
[[email protected] apache-flume-1.8.0-bin]$ bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1 -Dflume.root.logger=INFO,console
登陆到HDFS,查询flume采集的电影信息,执行如下命令,查询结果如图所示。
[[email protected] ~]$hadoop fs -ls /tmp/flume-movies
查询子文件的具体内容,执行如下命令,查询结果如下所示。
[[email protected] ~]$hadoop fs -cat /tmp/flume-movies/20180427/events.xxxxxx
2018.11.15
dhp