Flume简介、特点、核心概念及安装
一、什么是flume?
flume是一个可分布式日志收集系统,为hadoop相关组件之一。
Flume 是可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据源中集中起来存储的工具/服务。
Flume可以采集文件,socket数据包(网络端口)、文件夹、kafka、mysql数据库等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、hive、kafka等众多外部存储系统中。
二、Flume特性
Flume是一个分布式、可靠、和高可用的海量日志采集、汇聚和传输的系统。Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景
三、Flume核心概念
1、agent
Flume中最核心的角色是agent,flume采集系统就是由一个个agent连接起来所形成的一个或简单或复杂的数据传输通道。
对于每一个Agent来说,它就是一个独立的守护进程(JVM),它负责从数据源接收数据,并发往下一个目的地,如下图所示:
Agent的3个组件的设计思想,主要考虑的是:source和sink之间解耦合,以及异步操作;
每一个agent相当于一个数据(被封装成Event对象)传递员,内部有3个核心组件:
- Source:采集组件,用于跟数据源对接,以获取数据;它有各种各样的内置实现;
- Sink:下沉组件,用于往下一级agent传递数据或者向最终存储系统传递数据
- Channel:传输通道组件,用于从source将数据传递到sink
单个agent采集数据:
多级agent之间串联:
2、Event
数据在channel中的封装形式;
因此,Source组件在获取到原始数据后,需要封装成Event放入channel;
Sink组件从channel中取出Event后,需要根据目标存储的需求,转成其他形式的数据输出。
Event封装对象主要有两部分组成: Headers和 Body
header是一个集合 Map[String,String],用于携带一些KV形式的元数据(标志、描述等)
body: 就是一个字节数组byte[];装载具体的数据内容
2018-11-03 18:44:44,913 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 61 20 61 20 61 61 61 20 61 20 0D a a aaa a . } |
3、interceptor拦截器
拦截器工作在source组件之后,source产生的event会被传入拦截器根据需要进行拦截处理,而且,拦截器可以组成拦截器链!
拦截器在flume中有一些内置的功能比较常用的拦截器
用户也可以根据自己的数据处理需求,自己开发自定义拦截器!
这也是flume的一个可以用来自定义扩展的接口!
4、channel selector
一个source可以对接多个channel,则event在这n个channel之间传递的策略,由配置的channel selector决定;
channel selector有2中实现: replicating(复制),multiplexing(多路复用)
5、sink processor
如果sink和channel是一对一关系,则不需要专门的sink processor;
如果要配置一个channel对多个sink,则需要将这多个sink配置成一个sink group(sink组);
event在一个组中的多个sink间如何传递,则由所配置的sink processor来决定;
sink processor有2种: load balance (round robing)和 fail over
6、Transaction:事务控制机制
Flume使用两个独立的事务:
- put操作:source读取数据源并写入event到channel
- take操作:sink从channel中获取event并写出到目标存储
事务的实现程度,取决于运行时所选择的具体的组件实现类;
再好的组件的组合,也只实现到了at least once!(不会丢失数据,但可能产生重复传输)
事务实现的核心点,就是记录状态(比如source,记录自己完成的数据的偏移量)
比如spooling directory source 为文件的每一个event batch创建一个事务,来记录状态,一旦事务中所有的事件全部传递到channel且提交成功,那么soucrce就将event batch标记为完成。
同理,事务以类似的方式处理从channel到sink的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚,且所有的事件都会保持到channel中,等待重新传递。
事务机制涉及到如下重要参数:
a1.sources.s1.batchSize =100
a1.sinks.k1.batchSize = 200
a1.channels.c1.transactionCapacity = 300 (应该大于source或者sink的batchSize)
< transactionCapacity 是说,channel中保存的事务的个数>
跟channel的数据缓存空间容量区别开来:
a1.channels.c1.capacity = 10000
那么事务是如何保证数据的端到端完整性的呢?看下面有两个agent的情况:
数据流程:
- source 1产生Event,通过“put”、“commit”操作将Event放到Channel 1中
- sink 1通过“take”操作从Channel 1中取出Event,并把它发送到Source 2中
- source 2通过“put”、“commit”操作将Event放到Channel 2中
- source 2向sink 1发送成功信号,sink 1“commit”步骤2中的“take”操作(其实就是删除Channel 1中的Event)
说明:在任何时刻,Event至少在一个Channel中是完整有效的
四、Flume安装部署
1、参数配置
Flume的安装非常简单,只需要解压即可,当然,前提是已有hadoop环境
1、上传安装包到数据源所在节点上,然后解压 tar -zxvf apache-flume-1.8.0-bin.tar.gz
2、根据数据采集的需求配置采集方案,描述在配置文件中(文件名可任意自定义)
3、指定采集方案配置文件,在相应的节点上启动flume agent
2、启动命令
bin/flume-ng agent -c ./conf ………….
commands: help 显示本帮助信息 agent 启动一个agent进程 avro-client 启动一个用于测试avro source的客户端(能够发送avro序列化流) version 显示当前flume的版本信息
global options: 全局通用选项 --conf,-c <conf> 指定flume的系统配置文件所在目录 --classpath,-C <cp> 添加额外的jar路径 --dryrun,-d 不去真实启动flume agent,而是打印当前命令 --plugins-path <dirs> 指定插件(jar)所在路径
-Dproperty=value 传入java环境参数 -Xproperty=value 传入所需的JVM配置参数 agent options: --name,-n <name> agent的别名(在用户采集方案配置文件中) --conf-file,-f <file> 指定用户采集方案配置文件的路径 --zkConnString,-z <str> 指定zookeeper的连接地址 --zkBasePath,-p <path> 指定用户配置文件所在的zookeeper path,比如:/flume/config --no-reload-conf 关闭配置文件动态加载 --help,-h display help text
avro-client options: --rpcProps,-P <file> RPC client properties file with server connection params --host,-H <host> avro序列化数据所要发往的目标主机(avro source所在机器) --port,-p <port> avro序列化数据所要发往的目标主机的端口号 --dirname <dir> 需要被序列化发走的数据所在目录(提前准备好测试数据放在一个文件中) --filename,-F <file> 需要被序列化发走的数据所在文件(default: std input) --headerFile,-R <file> 存储header key-value的文件 --help,-h 帮助信息
Either --rpcProps or both --host and --port must be specified. Note that if <conf> directory is specified, then it is always included first in the classpath. |
开启内置监控功能
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545