Flume Interceptor 拦截器

 近期因为业务需要,解决数据漂移问题

(数据漂移:flume从kafka拉取日志到hdfs,因为使用得拦截器timestamp,就是使用系统时间作为日志落地得时间,因为kafka数据乱序问题,所以有些昨天得数据会被打上今天得系统时间;举个例子:日志内得时间戳是2019-5-20 23:59:59,但是却存在了2019-5-21得文件夹下

所以为了解决这个问题,就要提取日志中得时间戳,作文存储文件夹得名称,并落地在时间对应得文件夹下。

有俩种方法:

一、

因为我们采用得是flume-kafka-flume-hdfs,而且数据在flume采集得时候,经过flume自定义拦截器进行了数据清洗。

所以方法一就是在自定义Interceptor里,给event得body中得直接获取日志时间,并覆盖到header中得timestamp上。

但是因为,服务器多达九十多台,如果每个都修改拦截器并重新打包到lib里,不仅暂停业务,还有工作量都特别大。

二、

在flume得启动配置文件上入手;

官方上提供的已有的拦截器有:

Timestamp Interceptor
Host Interceptor
Static Interceptor
Regex Filtering Interceptor
Regex Extractor Interceptor

flume的拦截器是chain形式的,可以对一个source指定多个拦截器,按先后顺序依次处理。
Timestamp Interceptor :在event的header中添加一个key叫:timestamp,value为当前的时间戳。(这个就是公司之前一直使用得拦截器,但如果对数据时间上不是特别敏感得,用这个也没什么问题)
Host Interceptor:在event的header中添加一个key叫:host,value为当前机器的hostname或者ip。
Static Interceptor:可以在event的header中添加自定义的key和value。
Regex Filtering Interceptor:通过正则来清洗或包含匹配的events。(这个用好了也很不错)
Regex Extractor Interceptor:通过正则表达式来在header中添加指定的key,value则为正则匹配的部分(这个就是解决数据漂移问题得拦截器)

代码如下:

Flume Interceptor 拦截器

有一个坑研究了一整天。。。

就是在正则抽取数据得时候

一定要加上()括号!!!一定要加上()括号!!!一定要加上()括号!!!

要么正则无效