flume的安装及使用及各种案例
1.下载flume-1.6.0(apache-flume-1.6.0-bin.tar.gz)
http://flume.apache.org/download.html
2.上传到集群(本文上传到hadoop04节点)
3.解压安装包
tar -zxvf apache-flume-1.6.0-bin.tar.gz -C apps/
并且修改文件夹名字
mv apache-flume-1.6.0-bin flume-1.6.0
4.进入conf文件夹修改配置信息
cd /home/hadoop/apps/flume-1.6.0/conf
5.修改配置文件
[[email protected] conf]$ cp flume-env.sh.template flume-env.sh
vim flume-env.sh
6.配置环境变量
vim .bashrc
增加下面的内容
export FLUME_HOME=/home/hadoop/apps/flume-1.6.0/bin
export PATH=$PATH:$FLUME_HOME/bin
source .bashrc
7.测试是否成功
[[email protected] ~]$ flume-ng version
8.flume的使用
官方帮助文档:点击打开链接
Source:读数据
Avro Source:
监听Avro的一个端口和接收事件从外部的Avro client获取流数据(监听指定的端口,并且在指定端口上接收数据),在linux里面有一个nc命令(可以向指定端口发送数据,就跟ping baidu.com一样,即通过nc向端口发送数据,Avro Source就可以在这个端口接收到数据)
Spooling Directory Source: (日志常用)
监控指定文件的数据,在磁盘上去写一个指定的spooling的目录,这个Source可以观测指定的目录的新文件并且把这个事件粘贴到新文件里面。(监控指定的文件夹或者目录,这个文件夹下面是没有子文件夹的,一旦这个目录被监控以后,往该目录下写文件,这个文件将被Spooling DirectorySource读取文件中的内容,读取完之后将文件名增加后缀.COMPLETED
Exec Source:(日志常用)
运行在一个给定的Unix或者Linux命令,当命令执行时候,Exec Source会去读取接收命令所接受到的数据(比如说cat,tail命令),其实就是使用一个linux命令读取数据信息,然后通过Exec Source就可以获取出来,其实也是写日志。(tail命令就是获取
获取最后多少行,获取最新的新增加的)
Exec Source 和Spool Source 比较
1、ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法何证日志数据的完整性。
2、SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。
3、总结:如果应用无法实现以分钟切割日志文件的话,可以两种 收集方式结合使用。
Channel:消息队列,缓存
Memory Channel:
让内存作为我们中间的缓存,所有的事件数据都放到内存队列里去,可以去配置最大的内存大小,可以实现高速的吞吐,但是无法保证数据的完整性。
JDBC Channel:
Kafaka Channel:
File Channel:文件也可以作为Channel,保证数据的完整性与一致性。在具体配置不现的FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。
Spillable Memory Channel:可分割内存的Channel
MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。
Sink:将Channel中的数据写入到目的地,进行相应的存储文件系统,数据库,或者提交到远程服务器。
HDFS Sink:把事件写到HDFS上,支持创建新的文本和序列化文件和压缩格式,还支持回滚(最大的好处就是从HDFS中直接分区导入到Hive中)
Hive Sink:
Logger Sink:将日志事件在信息的级别去显示(可以显示到控制台)
Avro Sink:写到指定的端口去写数据
Thrift Sink:
flume部署种类:
单一代理流程:
多代理流程:
流合并:(多代理的一种)
多路复用:从一个地方收集信息最终写到不同的地方去,(多路复制是接收到的数据都是一样的)
注意:一个sink只能指定一个channel
下面是模板:
-
#文件名后缀是.properties
-
#agent的名称可以自定义 a1 ,sources、sinks、channels的名称也可以自定义
-
a1.sources = s1
-
a1.sinks = k1
-
a1.channels = c1
-
#为source指定他的channel
-
a1.sources.s1.channels = c1
-
#为sink指定他的channel
-
a1.sinks.k1.channel = c1
流配置:
单一代理流配置:
Spooling Directory Source,Memory Channel,Logger Sink的案例:
-
#监控指定的目录,如果有新文件产生,那么将文件的内容显示到控制台(Spooling Directory Source,Memory Channel,Logger Sink)
-
#运行命令:flume-ng agent --conf conf --conf-file case_single.properties --name a1 -Dflume.hadoop.logger=INFO,console
-
#运行命令解释:flume-ng agent --conf conf --conf-file HDFS上的配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中
-
#agent的名称可以自定义 a1 ,sources、sinks、channels的名称也可以自定义
-
a1.sources = s1
-
a1.sinks = k1
-
a1.channels = c1
-
#配置source
-
a1.sources.s1.type = spooldir
-
a1.sources.s1.spoolDir = /home/hadoop/flumetest
-
#配置channel
-
a1.channels.c1.type = memory
-
#配置sink
-
a1.sinks.k1.type = logger
-
#为source指定他的channel
-
a1.sources.s1.channels = c1
-
#为sink指定他的channel
-
a1.sinks.k1.channel = c1
把上面的代码复制到case_single.properties文本文件中,然后上传到hadoop04家目录下面,并且在hadoop用户家目录下新建一个flumetest的文件夹(mkdir flumetest)
使用命令:[[email protected] ~]$ flume-ng agent --conf conf --conf-file case_single.properties --name a1 -Dflume.hadoop.logger=INFO,console
然后再hadoop04上复制一个新的连接窗口,使用 vim t 在里面写入hello flume 然后保存。执行下面的命令
[[email protected] ~]$ mv t flumetest/
此时hadoop04的flume窗口会自动出现下图所示样子:
到此测试成功!
NetCat TCP Source,Memory Channel,Logger Sink的案例
-
#通过NetCat TCP Source读取指定端口的输入数据到控制台显示(NetCat TCP Source,Memory Channel,Logger Sink)
-
#往端口输入数据 可以通过nc命令,telnet命令也可以去发送数据的
-
#运行命令:flume-ng agent --conf conf --conf-file case_tcp.properties --name a1 -Dflume.hadoop.logger=INFO,console
-
#运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中
-
a1.sources = s1
-
a1.sinks = k1
-
a1.channels = c1
-
#配置source
-
a1.sources.s1.type = netcat
-
a1.sources.s1.bind = 192.168.123.104
-
a1.sources.s1.port = 55555
-
#配置channel
-
a1.channels.c1.type = memory
-
#配置sink
-
a1.sinks.k1.type = logger
-
#为source指定他的channel
-
a1.sources.s1.channels = c1
-
#为sink指定他的channel
-
a1.sinks.k1.channel = c1
复制上面的代码到case_tcp.properties文本文件中,然后从hadoo04节点上传到hadoop用户的根目录下。执行下面的命令
[[email protected] ~]$ flume-ng agent --conf conf --conf-file case_tcp.properties --name a1 -Dflume.hadoop.logger=INFO,console
然后再复制hadoop04节点连接会话,输入telnet hadoop04 55555
[[email protected] ~]$ sudo yum install telnet
安装好telnet后使用下面命令发送数据:
[[email protected] ~]$ telnet hadoop04 55555
然后就可以输入数据了
另一端就能接收到:
Avro Source,Memory Channel,Logger Sink的案例
-
#通过Avro Source读取指定端口的输入数据到控制台显示(Avro Source,Memory Channel,Logger Sink)
-
#往端口输入数据 可以通过nc命令,telnet命令也可以去发送数据的
-
#运行命令:flume-ng agent --conf conf --conf-file case_avro.properties --name a1 -Dflume.hadoop.logger=INFO,console
-
#运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中
-
a1.sources = s1
-
a1.sinks = k1
-
a1.channels = c1
-
#配置source
-
a1.sources.s1.type = avro
-
a1.sources.s1.bind = 192.168.123.104
-
a1.sources.s1.port = 55555
-
#配置channel
-
a1.channels.c1.type = memory
-
#配置sink
-
a1.sinks.k1.type = logger
-
#为source指定他的channel
-
a1.sources.s1.channels = c1
-
#为sink指定他的channel
-
a1.sinks.k1.channel = c1
复制上面的代码到case_avro.properties文本文件中,然后从hadoo04节点上传到hadoop用户的根目录下。执行下面的命令
[[email protected] ~]$ flume-ng agent --conf conf --conf-file case_avro.properties --name a1 -Dflume.hadoop.logger=INFO,console
然后再复制hadoop04节点连接会话,输入echo "hello avro" | nc hadoop04 55555
[[email protected] ~]$ sudo yum install nc
输入nc命令查看是否安装成功
再次输入
[[email protected] ~]$ echo "hello avro" | nc hadoop04 55555
先创建一个vim t.log 然后写qwert world
[[email protected] ~]$ flume-ng avro-client -c ./conf -H hadoop04 -p 55555 -F t.log
Exec Source,Memory Channel,hdfs-Sink的案例
准备:
模拟服务器的业务系统不断的写日志,不断的往里面写入数据,最终通过flume写到HDFS上面
只要这个窗口在就不断的往里面写数据
注意这个窗口别关!!
复制一个新的窗口输入下面的命令:
[[email protected] tom]$ tail -F catalina.out
查看是不是正常状态,这个窗口可以结束。
正文:
首先启动整个集群,在HDFS下面建一个flumetest的文件夹
hadoop fs -mkdir /flumetest
然后新建一个case_hdfs.properties的文本文件复制下面的代码:
-
#使用Exec Source通过Linux命令去取数据,然后不断的写入HDFS中(Exec Source,Memory Channel,hdfs-Sink)
-
#运行命令:flume-ng agent --conf conf --conf-file case_hdfs.properties --name a1 -Dflume.hadoop.logger=INFO,console
-
#运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中
-
a1.sources = s1
-
a1.sinks = k1
-
a1.channels = c1
-
#配置source
-
a1.sources.s1.type = exec
-
#tail -F 和-f的区别是 -F追踪的是文件名字。-f是追踪id。因为.out文件满了就变成了.out.1了,再会去重建一个.out文件
-
a1.sources.s1.command = tail -F /home/hadoop/tom/catalina.out
-
#配置channel
-
a1.channels.c1.type = memory
-
#配置sink
-
a1.sinks.k1.type = hdfs
-
a1.sinks.k1.hdfs.path = /flumetest/%Y-%m-%d/%H%M
-
#设置目录回滚(首先根据时间创建个文件夹,写了一分钟后我们就需要重新建文件夹了,所以我们要指定回滚为true)
-
a1.sinks.k1.hdfs.round = true
-
a1.sinks.k1.hdfs.roundValue = 1
-
a1.sinks.k1.hdfs.roundUnit = minute
-
a1.sinks.k1.hdfs.useLocalTimeStamp = true
-
a1.sinks.k1.hdfs.filePrefix = taobao
-
a1.sinks.k1.hdfs.fileSuffix = log
-
#设置文件的回滚(下面三个条件只要有一个满足都会进行回滚)
-
#每过多少秒回滚一次,所以一分钟内我们写6个文件(满足)
-
a1.sinks.k1.hdfs.rollInterval = 10
-
#文件多大的时候再回滚
-
a1.sinks.k1.hdfs.rollSize = 1024
-
#写了多少条就回滚(满足)
-
a1.sinks.k1.hdfs.rollCount = 10
-
#设置压缩格式为不压缩
-
a1.sinks.k1.hdfs.fileType = DataStream
-
#为source指定他的channel
-
a1.sources.s1.channels = c1
-
#为sink指定他的channel
-
a1.sinks.k1.channel = c1
然后上传到hadoop04的家目录下,执行命令:
[[email protected] ~]$ flume-ng agent --conf conf --conf-file case_hdfs.properties --name a1 -Dflume.hadoop.logger=INFO,console
flume会不断的在写之前准备阶段的数据到HDFS上面的flumetest文件夹里面,并且,每个文件夹下面有6个文件
单代理多流配置
单代理指的是一个agent,多流指的是多个Source,多个Channel,多个sinks。注意单代理端口不能冲突
多代理流程
分别在hadoop04和hadoop05上跑代码
在hadoop04下面的代码:
-
#运行命令:flume-ng agent --conf conf --conf-file case_source.properties --name a1 -Dflume.hadoop.logger=INFO,console
-
#运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中
-
a1.sources = s1
-
a1.sinks = k1
-
a1.channels = c1
-
a1.sources.s1.type = netcat
-
a1.sources.s1.bind = 192.168.123.104
-
a1.sources.s1.port = 44455
-
a1.channels.c1.type = memory
-
a1.sinks.k1.type = avro
-
a1.sinks.k1.hostname = 192.168.123.105
-
a1.sinks.k1.port = 44466
-
a1.sources.s1.channels = c1
-
a1.sinks.k1.channel = c1
新建一个名为case_source.properties的文本文件
复制上面的代码并上传至hadoop04上面
在hadoop05上面执行下面的代码:
-
#运行命令:flume-ng agent --conf conf --conf-file case_sink.properties --name a1 -Dflume.hadoop.logger=INFO,console
-
#运行命令解释:flume-ng agent --conf conf --conf-file 配置文件的位置 --name agent的名称 打印出来的日志打印到控制台当中
-
a1.sources = s1
-
a1.sinks = k1
-
a1.channels = c1
-
a1.sources.s1.type = avro
-
a1.sources.s1.bind = 192.168.123.105
-
a1.sources.s1.port =44466
-
a1.channels.c1.type = memory
-
a1.sinks.k1.type = logger
-
a1.sources.s1.channels = c1
-
a1.sinks.k1.channel = c1
新建一个名为case_sink.properties的文本文件
复制上面的代码并上传至hadoop05上面
注意:先执行hadoop05上面的代码:
[[email protected] ~]$ flume-ng agent --conf conf --conf-file case_sink.properties --name a1 -Dflume.hadoop.logger=INFO,console
在执行hadoop04上面的代码:
[[email protected] ~]$ flume-ng agent --conf conf --conf-file case_source.properties --name a1 -Dflume.hadoop.logger=INFO,console
启动之后请回到hadoop05查看日志是不是增加了下图所示
现在开始写入数据:
复制hadoop04的连接并执行下面的命令:
telnet hadoop04 44455
然后输入hello
然后再hadoop05可以看到下图的消息(消息传输没想象中那么快):
多路复制流
目的:把hadoop04的数据一模一样的发送给hadoop05和hadoop03
把下面的代码上传到hadoop04上去:
新建 case_replicate.properties 文件:
-
#2个channel和2个sink的配置文件
-
a1.sources = s1
-
a1.sinks = k1 k2
-
a1.channels = c1 c2
-
# Describe/configure the source
-
a1.sources.s1.type = netcat
-
a1.sources.s1.port = 44455
-
a1.sources.s1.bind = 192.168.123.104
-
#默认就是replicating就是复制,multiplexing是复用
-
a1.sources.s1.selector.type = replicating
-
a1.sources.s1.channels = c1 c2
-
# Describe the sink
-
#发送给hadoop05
-
a1.sinks.k1.type = avro
-
a1.sinks.k1.channel = c1
-
a1.sinks.k1.hostname = 192.168.123.105
-
a1.sinks.k1.port = 44466
-
#发送给hadoop03
-
a1.sinks.k2.type = avro
-
a1.sinks.k2.channel = c2
-
a1.sinks.k2.hostname = 192.168.123.103
-
a1.sinks.k2.port = 44466
-
# Use a channel which buffers events in memory
-
a1.channels.c1.type = memory
-
a1.channels.c1.capacity = 1000
-
a1.channels.c1.transactionCapacity = 100
-
a1.channels.c2.type = memory
-
a1.channels.c2.capacity = 1000
-
a1.channels.c2.transactionCapacity = 100
把下面的代码上传到hadoop05上面去:
新建 case_replicate_s1.properties 文件
-
# Name the components on this agent
-
a1.sources = s1
-
a1.sinks = k1
-
a1.channels = c1
-
# Describe/configure the source
-
a1.sources.s1.type = avro
-
a1.sources.s1.channels = c1
-
a1.sources.s1.bind = 192.168.123.105
-
a1.sources.s1.port = 44466
-
# Describe the sink
-
a1.sinks.k1.type = logger
-
a1.sinks.k1.channel = c1
-
# Use a channel which buffers events in memory
-
a1.channels.c1.type = memory
-
a1.channels.c1.capacity = 1000
-
a1.channels.c1.transactionCapacity = 100
把下面的代码放到hadoop03上面去:
新建 case_replicate_s2.properties 文件:
-
# Name the components on this agent
-
a1.sources = s1
-
a1.sinks = k1
-
a1.channels = c1
-
# Describe/configure the source
-
a1.sources.s1.type = avro
-
a1.sources.s1.channels = c1
-
a1.sources.s1.bind = 192.168.123.103
-
a1.sources.s1.port = 44466
-
# Describe the sink
-
a1.sinks.k1.type = logger
-
a1.sinks.k1.channel = c1
-
# Use a channel which buffers events in memory
-
a1.channels.c1.type = memory
-
a1.channels.c1.capacity = 1000
-
a1.channels.c1.transactionCapacity = 100
然后执行
在hadoop05上执行:
[[email protected] ~]$ flume-ng agent --conf conf --conf-file case_replicate_s1.properties --name a1 -Dflume.hadoop.logger=INFO,console
在hadoop03上执行:
[[email protected] ~]$ flume-ng agent --conf conf --conf-file case_replicate_s2.properties --name a1 -Dflume.hadoop.logger=INFO,console
在hadoop04上执行:
[[email protected] ~]$ flume-ng agent --conf conf --conf-file case_replicate.properties --name a1 -Dflume.hadoop.logger=INFO,console
同时hadoop03增加显示:
同时hadoop05增加显示:
然后在hadoop04上写数据:
然后观察hadoop03和hadoop05都接受到下图所示同一数据:
多路复用:
在hadoop04上新建 case_multi_sink.properties 文件
-
#2个channel和2个sink的配置文件
-
a1.sources = s1
-
a1.sinks = k1 k2
-
a1.channels = c1 c2
-
# Describe/configure the source
-
a1.sources.s1.type = org.apache.source.http.HTTPSource
-
a1.sources.s1.port = 44455
-
a1.sources.s1.host = 192.168.123.104
-
#默认就是replicating就是复制,multiplexing是复用
-
a1.sources.s1.selector.type = multiplexing
-
a1.sources.s1.channels = c1 c2
-
#state就是发送信息的头部信息,是CZ开头往c1走,默认往c1走
-
a1.sources.s1.selector.header = state
-
a1.sources.s1.selector.mapping.CZ = c1
-
a1.sources.s1.selector.mapping.US = c2
-
a1.sources.s1.selector.default = c1
-
# Describe the sink
-
#发送给hadoop05
-
a1.sinks.k1.type = avro
-
a1.sinks.k1.channel = c1
-
a1.sinks.k1.hostname = 192.168.123.105
-
a1.sinks.k1.port = 44466
-
#发送给hadoop03
-
a1.sinks.k2.type = avro
-
a1.sinks.k2.channel = c2
-
a1.sinks.k2.hostname = 192.168.123.103
-
a1.sinks.k2.port = 44466
-
# Use a channel which buffers events in memory
-
a1.channels.c1.type = memory
-
a1.channels.c1.capacity = 1000
-
a1.channels.c1.transactionCapacity = 100
-
a1.channels.c2.type = memory
-
a1.channels.c2.capacity = 1000
-
a1.channels.c2.transactionCapacity = 100
在hadoop05上新建 case_multi_s1.properties 文件:
-
# Name the components on this agent
-
a1.sources = s1
-
a1.sinks = k1
-
a1.channels = c1
-
# Describe/configure the source
-
a1.sources.s1.type = avro
-
a1.sources.s1.channels = c1
-
a1.sources.s1.bind = 192.168.123.105
-
a1.sources.s1.port = 44466
-
# Describe the sink
-
a1.sinks.k1.type = logger
-
a1.sinks.k1.channel = c1
-
# Use a channel which buffers events in memory
-
a1.channels.c1.type = memory
-
a1.channels.c1.capacity = 1000
-
a1.channels.c1.transactionCapacity = 100
在hadoop03上新建 case_multi_s2.properties 文件
-
# Name the components on this agent
-
a1.sources = s1
-
a1.sinks = k1
-
a1.channels = c1
-
# Describe/configure the source
-
a1.sources.s1.type = avro
-
a1.sources.s1.channels = c1
-
a1.sources.s1.bind = 192.168.123.103
-
a1.sources.s1.port = 44466
-
# Describe the sink
-
a1.sinks.k1.type = logger
-
a1.sinks.k1.channel = c1
-
# Use a channel which buffers events in memory
-
a1.channels.c1.type = memory
-
a1.channels.c1.capacity = 1000
-
a1.channels.c1.transactionCapacity = 100
按顺序执行下面的命令:
[[email protected] ~]$ flume-ng agent --conf conf --conf-file case_multi_s2.properties --name a1 -Dflume.hadoop.logger=INFO,console
[[email protected] ~]$ flume-ng agent --conf conf --conf-file case_multi_s1.properties --name a1 -Dflume.hadoop.logger=INFO,console
[[email protected] ~]$ flume-ng agent --conf conf --conf-file case_multi_sink.properties --name a1 -Dflume.hadoop.logger=INFO,console
在hadoop04上发送数据
curl-X POST -d '[{ "headers" :{"state" :"CZ"},"body" : "TEST1"}]' http://localhost:44455
curl-X POST -d '[{ "headers" :{"state" :"US"},"body" : "TEST2"}]' http://localhost:44455
curl-X POST -d '[{ "headers" :{"state" :"cn"},"body" : "TEST2"}]' http://localhost:44455