flume_简介及使用

一、简介

Flume Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合

和传输的系统,Flume 支持在日志系统中定制各类数据发送方,用于收集数据;同时,

Flume 提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

 

二、Flume体系结构

flume_简介及使用

 

flume官网http://flume.apache.org/

首页就有体系结构图:

 flume_简介及使用

 Flume核心:agent = source+channel+sink

Source组件:用于采集各种不同源的日志

Channel组件:数据缓冲区、通道

Sink组件:用于将日志写到不同的目的地。

 

如果要使用flume,最重要的就是在配置文件中配置agent

 

 

三、Source组件

在官网找到帮助文档,左侧的documentaion,如下:

 flume_简介及使用

flume source章节,可以看到有很多source,如最常用的几个:监控目录,kafkahttp,自定义等

 flume_简介及使用flume_简介及使用

 

四、Channel组件

数据缓冲区,通道。

最常用的是放到内存中:memory channel

 flume_简介及使用

 

五、Sink组件

 flume_简介及使用

  

六、使用

解压flume包,如:/opt/flume/apache-flume-1.7.0-bin

 

例子:监控一个目录中的日志文件,通过内存进行缓冲,最后写到hdfs

1. 创建agent的配置文件,如:/opt/myagent/a4.conf

内容如下:

#定义 agent 名, sourcechannelsink 的名称

a4.sources = r1

a4.channels = c1

a4.sinks = k1

#具体定义 source监控目录)

a4.sources.r1.type = spooldir

a4.sources.r1.spoolDir = /root/training/logs

#具体定义 channel内存缓冲)

a4.channels.c1.type = memory

a4.channels.c1.capacity = 10000

a4.channels.c1.transactionCapacity = 100

#定义拦截器,为消息添加时间戳

a4.sources.r1.interceptors = i1

a4.sources.r1.interceptors.i1.type =

org.apache.flume.interceptor.TimestampInterceptor$Builder

#具体定义 sink写到hdfs中)

a4.sinks.k1.type = hdfs

a4.sinks.k1.hdfs.path = hdfs://192.168.56.111:9000/flume/%Y%m%d

a4.sinks.k1.hdfs.filePrefix = events-

a4.sinks.k1.hdfs.fileType = DataStream

#不按照条数生成文件

a4.sinks.k1.hdfs.rollCount = 0

#HDFS 上的文件达到 128M 时生成一个文件

a4.sinks.k1.hdfs.rollSize = 134217728

#HDFS 上的文件达到 60 秒生成一个文件

a4.sinks.k1.hdfs.rollInterval = 60

#组装 sourcechannelsink

a4.sources.r1.channels = c1

a4.sinks.k1.channel = c1

 

2. 执行命令

cd /opt/flume/apache-flume-1.7.0-bin

bin/flume-ng agent -n a4 -f /opt/myagent/a4.conf -c conf -Dflume.root.logger=INFO,console

执行flume-ng命令,后面指定agent

-n指定agent的名字,必须和配置文件中那个a4同名;

-f指定配置文件

-c 使用配置,即安装目录下的conf路径

-D参数,指定当前日志级别,输出到工作台

 

执行上面的命令,启动flume

然后给监控的目录/root/training/logs中添加文件,模拟生成日志。

这时在hdfs/flume下会按日期产生一个新目录,目录下有一个tmp文件,因为上面配置了128M或者60秒才会写真正的文件,因此暂时为tmp文件,触发后不再是tmp

 

其他用例:

netcat监控8080端口的请求数据

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 8080

 

监控一直在追加的日志数据,通过exec命令的方式:

a2.sources.r1.type = exec

a2.sources.r1.command = tail -f /home/hadoop/a.log

#只输出到控制台

a2.sinks.k1.type = logger

 

其他内容可以参考官网上的例子