Flume笔记三之Inteceptor,selector,processor

Flume笔记三之Inteceptor,selector,processor

原创 2016年11月20日 20:18:28
  • 287

Inteceptor

    在flume中可以针对source定义拦截器,通过拦截器可以加入指定的信息然后在event的headers中就可以看到我们指定的信息。

    拦截器种类:

    Timestamp Interceptor

          在headers中加一个时间戳可以知道日志的时间便于后期处理。

   Host Interceptor

          将当前source所在主机的ip加入headers中。

          type:拦截器的类型,必须为host

          preserveExisting:如果此拦截器增加的key已经存在,如果这个值设置为true则保持原来的值,否则覆盖原来的值。默认为false

          useIP:如果设置为true则使用ip地址,否则使用主机名,默认为true

          hostHeader:使用的header的key名字,默认为host

 

   Static Interceptor

          固定一些key,value,source过来的数据经过此拦截器时就会被打上此标志。

          type:avrosource的类型,必须是static。

          preserveExisting:如果此拦截器增加的key已经存在,如果这个值设置为true则保持原来的值,否则覆盖原来的值。默认为false

          key:静态拦截器添加的key的名字

          value:静态拦截器添加的key对应的value值

 

   UUIDInterceptor

       指定一个全球唯一的标识。

    Searchand Replace Interceptor

       查找并替换数据中的内容。

   RegexExtractor Interceptor

        正则表达式

 

以下在一个agent中加入source拦截器,在event的headers中加入时间戳和source所在主机的ip。

以下是拦截器的配置实例,配置了两个拦截器一个时间戳一个host

#定义两个拦截器

a1.sources.r1.interceptors=i1 i2

#配置定义的拦截器

a1.sources.r1.interceptors.i1.type=host

a1.sources.r1.interceptors.i1.preserveExisting=false

a1.sources.r1.interceptors.i1.hostHeader=hostname

 

a1.sources.r1.interceptors.i2.type=timestamp


selector

    渠道选择器,在实际应用中我们可能有一个source向两个以上的channel发送数据这样就需要使用选择器,选择器通过判断event的header中的key来决定向那个channel发送数据,默认情况下source会向所有的channel发送数据。

    Multiplexing Channel Selector 根据headerkey的值分配channel

    selector.type 默认为replicating

    selector.header:选择作为判断的key

    selector.default:默认的channel配置

    selector.mapping.*:匹配到的channel的配置

 

以下是配置代码

a1.sources.r1.channels= c1 c2

a1.sources.r1.selector.type= multiplexing

a1.sources.r1.selector.header= hostname

a1.sources.r1.selector.mapping.192.168.1.155=c1

a1.sources.r1.selector.default= c2

 

以下是选择器的配置实例,这是我的一个配置文件,配置了两个channel和两个sink,131过来的信息走c1并在131机器上打印,其他节点过来的信息以avro格式序列化后再发送给139。但是因为我本机环境问题无法从139*问131所以这个例子只是部分实现,如果想使用需要修改的请注意


Flume笔记三之Inteceptor,selector,processor

a1.sources=r1

a1.channels=c1 c2

a1.sinks=k1 k2

 

a1.sources.r1.type=exec 

a1.sources.r1.command=tail -F/usr/local/success.log

 

a1.sources.r1.interceptors=i1 i2

a1.sources.r1.interceptors.i1.type=host

a1.sources.r1.interceptors.i1.preserveExisting=false

a1.sources.r1.interceptors.i1.hostHeader=hostname

 

a1.sources.r1.interceptors.i2.type=timestamp

 

a1.channels.c1.type=memory 

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100

 

a1.channels.c2.type=memory

a1.channels.c2.capacity=1000

a1.channels.c2.transactioncapacity=100

 

a1.sources.r1.selector.type=multiplexing

a1.sources.r1.selector.header=hostname

a1.sources.r1.selector.mapping.192.168.79.131=c1

a1.sources.r1.selector.default=c2

 

a1.sinks.k1.type=logger

 

a1.sinks.k2.type=avro

a1.sinks.k2.hostname=192.168.79.139

a1.sinks.k2.port=42424

 

a1.sources.r1.channels=c1 c2

a1.sinks.k1.channel=c1

a1.sinks.k2.channel=c2


processor

         一个channel可以有多个sink应用场景有两种。

1.负载均衡

web节点上的一个agent可以使用两个sink向两个以上的flume节点发送数据以减小flume节点服务器的压力。

 

a1.sinkgroups=g1

a1.sinkgroups.g1.sinks=k1k2

a1.sinkgroups.g1.processor.type=load_balance

a1.sinkgroups.g1.processor.backoff=true

a1.sinkgroups.g1.processor.selector=round_robin

a1.sinkgroups.g1.processor.selector.maxTimeOut=30000

 

backoff:开启后,故障的节点会列入黑名单,过一定时间再次发送,如果还失败,则等待是指数增长;直到达到最大的时间。

如果不开启,故障的节点每次都会被重试。

selector.maxTimeOut:最大的黑名单时间(单位为毫秒)。

 

 

2.故障转移

我们可以配置两个flume节点来接收web节点发送过来的数据,一个主一个备提高可用性。当主节点宕机后web节点可以转而向备用节点发送数据直到主节点恢复正常。

a1.sinkgroups=g1

a1.sinkgroups.g1.sinks=k1k2

a1.sinkgroups.g1.processor.type=failover

a1.sinkgroups.g1.processor.priority.k1=10

a1.sinkgroups.g1.processor.priority.k2=5

a1.sinkgroups.g1.processor.maxpenalty=10000

#maxpenalty 对于故障的节点最大的黑名单时间 (in millis 毫秒)

 

Inteceptor

    在flume中可以针对source定义拦截器,通过拦截器可以加入指定的信息然后在event的headers中就可以看到我们指定的信息。

    拦截器种类:

    Timestamp Interceptor

          在headers中加一个时间戳可以知道日志的时间便于后期处理。

   Host Interceptor

          将当前source所在主机的ip加入headers中。

          type:拦截器的类型,必须为host

          preserveExisting:如果此拦截器增加的key已经存在,如果这个值设置为true则保持原来的值,否则覆盖原来的值。默认为false

          useIP:如果设置为true则使用ip地址,否则使用主机名,默认为true

          hostHeader:使用的header的key名字,默认为host

 

   Static Interceptor

          固定一些key,value,source过来的数据经过此拦截器时就会被打上此标志。

          type:avrosource的类型,必须是static。

          preserveExisting:如果此拦截器增加的key已经存在,如果这个值设置为true则保持原来的值,否则覆盖原来的值。默认为false

          key:静态拦截器添加的key的名字

          value:静态拦截器添加的key对应的value值

 

   UUIDInterceptor

       指定一个全球唯一的标识。

    Searchand Replace Interceptor

       查找并替换数据中的内容。

   RegexExtractor Interceptor

        正则表达式

 

以下在一个agent中加入source拦截器,在event的headers中加入时间戳和source所在主机的ip。

以下是拦截器的配置实例,配置了两个拦截器一个时间戳一个host

#定义两个拦截器

a1.sources.r1.interceptors=i1 i2

#配置定义的拦截器

a1.sources.r1.interceptors.i1.type=host

a1.sources.r1.interceptors.i1.preserveExisting=false

a1.sources.r1.interceptors.i1.hostHeader=hostname

 

a1.sources.r1.interceptors.i2.type=timestamp


selector

    渠道选择器,在实际应用中我们可能有一个source向两个以上的channel发送数据这样就需要使用选择器,选择器通过判断event的header中的key来决定向那个channel发送数据,默认情况下source会向所有的channel发送数据。

    Multiplexing Channel Selector 根据headerkey的值分配channel

    selector.type 默认为replicating

    selector.header:选择作为判断的key

    selector.default:默认的channel配置

    selector.mapping.*:匹配到的channel的配置

 

以下是配置代码

a1.sources.r1.channels= c1 c2

a1.sources.r1.selector.type= multiplexing

a1.sources.r1.selector.header= hostname

a1.sources.r1.selector.mapping.192.168.1.155=c1

a1.sources.r1.selector.default= c2

 

以下是选择器的配置实例,这是我的一个配置文件,配置了两个channel和两个sink,131过来的信息走c1并在131机器上打印,其他节点过来的信息以avro格式序列化后再发送给139。但是因为我本机环境问题无法从139*问131所以这个例子只是部分实现,如果想使用需要修改的请注意


Flume笔记三之Inteceptor,selector,processor

a1.sources=r1

a1.channels=c1 c2

a1.sinks=k1 k2

 

a1.sources.r1.type=exec 

a1.sources.r1.command=tail -F/usr/local/success.log

 

a1.sources.r1.interceptors=i1 i2

a1.sources.r1.interceptors.i1.type=host

a1.sources.r1.interceptors.i1.preserveExisting=false

a1.sources.r1.interceptors.i1.hostHeader=hostname

 

a1.sources.r1.interceptors.i2.type=timestamp

 

a1.channels.c1.type=memory 

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100

 

a1.channels.c2.type=memory

a1.channels.c2.capacity=1000

a1.channels.c2.transactioncapacity=100

 

a1.sources.r1.selector.type=multiplexing

a1.sources.r1.selector.header=hostname

a1.sources.r1.selector.mapping.192.168.79.131=c1

a1.sources.r1.selector.default=c2

 

a1.sinks.k1.type=logger

 

a1.sinks.k2.type=avro

a1.sinks.k2.hostname=192.168.79.139

a1.sinks.k2.port=42424

 

a1.sources.r1.channels=c1 c2

a1.sinks.k1.channel=c1

a1.sinks.k2.channel=c2


processor

         一个channel可以有多个sink应用场景有两种。

1.负载均衡

web节点上的一个agent可以使用两个sink向两个以上的flume节点发送数据以减小flume节点服务器的压力。

 

a1.sinkgroups=g1

a1.sinkgroups.g1.sinks=k1k2

a1.sinkgroups.g1.processor.type=load_balance

a1.sinkgroups.g1.processor.backoff=true

a1.sinkgroups.g1.processor.selector=round_robin

a1.sinkgroups.g1.processor.selector.maxTimeOut=30000

 

backoff:开启后,故障的节点会列入黑名单,过一定时间再次发送,如果还失败,则等待是指数增长;直到达到最大的时间。

如果不开启,故障的节点每次都会被重试。

selector.maxTimeOut:最大的黑名单时间(单位为毫秒)。

 

 

2.故障转移

我们可以配置两个flume节点来接收web节点发送过来的数据,一个主一个备提高可用性。当主节点宕机后web节点可以转而向备用节点发送数据直到主节点恢复正常。

a1.sinkgroups=g1

a1.sinkgroups.g1.sinks=k1k2

a1.sinkgroups.g1.processor.type=failover

a1.sinkgroups.g1.processor.priority.k1=10

a1.sinkgroups.g1.processor.priority.k2=5

a1.sinkgroups.g1.processor.maxpenalty=10000

#maxpenalty 对于故障的节点最大的黑名单时间 (in millis 毫秒)