Flume+Kafka收集Docker容器内分布式日志应用实践
1、如何设计Flume+Kafka收集架构?
2、如何修改Docker内配置文件?
3、如何进行Flume配置?
4、如何定制RollingByTypeAndDayFileSink?
1 背景和问题
随着云计算、PaaS平台的普及,虚拟化、容器化等技术的应用,例如Docker等技术,越来越多的服务会部署在云端。通常,我们需要需要获取日志,来进行监控、分析、预测、统计等工作,但是云端的服务不是物理的固定资源,日志获取的难度增加了,以往可以SSH登陆的或者FTP获取的,现在可不那么容易获得,但这又是工程师迫切需要的,最典型的场景便是:上线过程中,一切都在GUI化的PaaS平台点点鼠标完成,但是我们需要结合tail -F、grep等命令来观察日志,判断是否上线成功。当然这是一种情况,完善的PaaS平台会为我们完成这个工作,但是还有非常多的ad-hoc的需求,PaaS平台无法满足我们,我们需要日志。本文就给出了在分布式环境下,容器化的服务中的分散日志,如何集中收集的一种方法。
2 设计约束和需求描述
做任何设计之前,都需要明确应用场景、功能需求和非功能需求。
2.1 应用场景
分布式环境下可承载百台服务器产生的日志,单条数据日志小于1k,最大不超过50k,日志总大小每天小于500G。
2.2 功能需求
1)集中收集所有服务日志。
2)可区分来源,按服务、模块和天粒度切分。
2.3 非功能需求
1)不侵入服务进程,收集日志功能需独立部署,占用系统资源可控。
2)实时性,低延迟,从产生日志到集中存储延迟小于4s。
3)持久化,保留最近N天。
4)尽量递送日志即可,不要求不丢不重,但比例应该不超过一个阈值(例如万分之一)。
4)可以容忍不严格有序。
5)收集服务属于线下离线功能,可用性要求不高,全年满足3个9即可。
3 实现架构
一种方案实现的架构如下图所示:
3.1 Producer层分析
PaaS平台内的服务假设部署在Docker容器内,那么为了满足非功能需求#1,独立另外一个进程负责收集日志,因此不侵入服务框架和进程。采用Flume NG来进行日志的收集,这个开源的组件非常强大,可以看做一种监控、生产增量,并且可以发布、消费的模型,Source就是源,是增量源,Channel是缓冲通道,这里使用内存队列缓冲区,Sink就是槽,是个消费的地方。容器内的Source就是执行tail -F这个命令的去利用linux的标准输出读取增量日志,Sink是一个Kafka的实现,用于推送消息到分布式消息中间件。
3.2 Broker层分析
PaaS平台内的多个容器,会存在多个Flume NG的客户端去推送消息到Kafka消息中间件。Kafka是一个吞吐量、性能非常高的消息中间件,采用单个分区按照顺序的写入的方式工作,并且支持按照offset偏移量随机读取的特性,因此非常适合做topic发布订阅模型的实现。这里图中有多个Kafka,是因为支持集群特性,容器内的Flume NG客户端可以连接若干个Kafka的broker发布日志,也可以理解为连接若干个topic下的分区,这样可以实现高吞吐,一来可以在Flume NG内部做打包批量发送来减轻QPS压力,二来可以分散到多个分区写入,同时Kafka还会指定replica备份个数,保证写入某个master后还需要写入N个备份,这里设置为2,没有采用常用的分布式系统的3,是因为尽量保证高并发特性,满足非功能需求中的#4。
3.3 Consumer层分析
消费Kafka增量的也是一个Flume NG,可以看出它的强大之处,在于可以接入任意的数据源,都是可插拔的实现,通过少量配置即可。这里使用Kafka Source订阅topic,收集过来的日志同样先入内存缓冲区,之后使用一个File Sink写入文件,为了满足功能需求#2,可区分来源,按服务、模块和天粒度切分,我自己实现了一个Sink,叫做RollingByTypeAndDayFileSink,源代码放到了github上,可以从这个页面下载jar,直接放到flume的lib目录即可。
4 实践方法
4.1 容器内配置
Dockerfile
Dockerfile是容器内程序的运行脚本,里面会含有不少docker自带的命令,下面是要典型的Dockerfile,BASE_IMAGE是一个包含了运行程序以及flume bin的镜像,比较重要的就是ENTRYPOINT,主要利用supervisord来保证容器内进程的高可用。
[AppleScript] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
|
FROM $ { BASE_IMAGE }
MAINTAINER $ { MAINTAINER }
ENV REFRESH_AT $ { REFRESH_AT }
RUN mkdir - p / opt / $ { MODULE_NAME }
ADD $ { PACKAGE_NAME } / opt / $ { MODULE_NAME } /
COPY service.supervisord.conf / etc / supervisord.conf.d / service.supervisord.conf
COPY supervisor - msoa - wrapper.sh / opt / $ { MODULE_NAME } / supervisor - msoa - wrapper.sh
RUN chmod + x / opt / $ { MODULE_NAME } / supervisor - msoa - wrapper.sh
RUN chmod + x / opt / $ { MODULE_NAME } / * .sh
EXPOSE ENTRYPOINT [ "/usr/bin/supervisord" , "-c" , "/etc/supervisord.conf" ]
|
下面是supervisord的配置文件,执行supervisor-msoa-wrapper.sh脚本。
[AppleScript] 纯文本查看 复制代码
1
2
|
[program : $ { MODULE_NAME } ]
command = / opt / $ { MODULE_NAME } / supervisor - msoa - wrapper.sh
|
下面是supervisor-msoa-wrapper.sh,这个脚本内的start.sh或者stop.sh就是应用程序的启动和停止脚本,这里的背景是我们的启停的脚本都是在后台运行的,因此不会阻塞当前进程,因此直接退出了,Docker就会认为程序结束,因此应用生命周期也结束,这里使用wait命令来进行一个阻塞,这样就可以保证即使后台运行的进程,我们可以看似是前台跑的。
这里加入了flume的运行命令,–conf后面的参数标示会去这个文件夹下面寻找flume-env.sh,里面可以定义JAVA_HOME和JAVA_OPTS。–conf-file指定flume实际的source、channel、sink等的配置。
[AppleScript] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
#! /bin/bash function shutdown ( )
{ date
echo "Shutting down Service"
unset SERVICE_PID # Necessary in some cases
cd / opt / $ { MODULE_NAME }
source stop.sh
} ## 停止进程 cd / opt / $ { MODULE_NAME }
echo "Stopping Service"
source stop.sh ## 启动进程 echo "Starting Service"
source start.sh export SERVICE_PID = $!
## 启动Flume NG agent,等待4s日志由start.sh生成 sleep 4
nohup / opt / apache - flume -1.6 . 0 - bin / bin / flume - ng agent --conf /opt/apache-flume-1.6.0-bin/conf --conf-file /opt/apache-flume-1.6.0-bin/conf/logback-to-kafka.conf --name a1 -Dflume.root.logger=INFO,console &
# Allow any signal which would kill a process to stop Service trap shutdown HUP INT QUIT ABRT KILL ALRM TERM TSTP echo "Waiting for $SERVICE_PID"
wait $SERVICE_PID |
Flume配置
source本应该采用exec source,执行tailf -F日志文件即可。但是这里使用了一个自行开发的StaticLinePrefixExecSource,源代码可以在github上找到。之所以采用自定义的,是因为需要将一些固定的信息传递下去,例如服务/模块的名称以及分布式服务所在容器的hostname,便于收集方根据这个标记来区分日志。如果这里你发现为什么不用flume的拦截器interceptor来做这个工作,加入header中一些KV不就OK了吗?这是个小坑,我后续会解释一下。
例如原来日志的一行为:
[AppleScript] 纯文本查看 复制代码
1 |
[INFO] 2016 -03 -18 12 : 59 : 31 , 080 [ main ] fountain.runner.CustomConsumerFactoryPostProcessor ( CustomConsumerFactoryPostProcessor.java : 91 ) - Start to init IoC container by loading XML bean definitions from classpath : fountain - consumer - stdout.xml
|
按照如下配置,那么实际传递给Channel的日志为:
[AppleScript] 纯文本查看 复制代码
1 |
service 1 ##$$##m1-ocean-1004.cp [INFO] 2016-03-18 12:59:31,080 [main] fountain.runner.CustomConsumerFactoryPostProcessor (CustomConsumerFactoryPostProcessor.java:91) -Start to init IoC container by loading XML bean definitions from classpath:fountain-consumer-stdout.xml
|
channel使用内存缓冲队列,大小标识可容乃的日志条数(event size),事务可以控制一次性从source以及一次性给sink的批量日志条数,实际内部有个timeout超时,可通过keepAlive参数设置,超时后仍然会推送过去,默认为3s。
sink采用Kafka sink,配置broker的list列表以及topic的名称,需要ACK与否,以及一次性批量发送的日志大小,默认5条一个包,如果并发很大可以把这个值扩大,加大吞吐。
[AppleScript] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# Name the components on this agent a 1. sources = r 1
a 1. sinks = k 1
a 1. channels = c 1
a 1. sources.r 1. type = com.baidu.unbiz.flume.sink.StaticLinePrefixExecSource
a 1. sources.r 1. command = tail - F / opt / MODULE_NAME / log / logback. log
a 1. sources.r 1. channels = c 1
a 1. sources.r 1. prefix = service 1
a 1. sources.r 1. separator = ##$$##
a 1. sources.r 1. suffix = m 1 - ocean -1004. cp
# Describe the sink a 1. sinks.k 1. type = org.apache.flume.sink.kafka.KafkaSink
a 1. sinks.k 1. topic = keplerlog
a 1. sinks.k 1. brokerList = gzns - cm -201508 c 02 n 01. gzns : 9092 , gzns - cm -201508 c 02 n 02. gzn
s : 9092
a 1. sinks.k 1. requiredAcks = 0
a 1. sinks.k 1. batchSize = 5
# Use a channel which buffers events in memory a 1. channels.c 1. type = memory
a 1. channels.c 1. capacity = 1000000
a 1. channels.c 1. transactionCapacity = 100
# Bind the source and sink to the channel a 1. sources.r 1. channels = c 1
a 1. sinks.k 1. channel = c 1
|
4.2 Broker配置
参考Kafka官方的教程,这里新建一个名称叫做keplerlog的topic,备份数量为2,分区为4。
[AppleScript] 纯文本查看 复制代码
1 |
> bin / kafka - topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic keplerlog
|
制造一些增量信息,例如如下脚本,在终端内可以随便输入一些字符串:
[AppleScript] 纯文本查看 复制代码
1 |
> bin / kafka - console - producer.sh --broker-list localhost:9092 --topic keplerlog
|
打开另外一个终端,订阅topic,确认可以看到producer的输入的字符串即可,即表示联通了。
[AppleScript] 纯文本查看 复制代码
1 |
> bin / kafka - console - consumer.sh --zookeeper localhost:2181 --topic keplerlog --from-beginning
|
4.3 集中接收日志配置
Flume配置
首先source采用flume官方提供的KafkaSource,配置好zookeeper的地址,会去找可用的broker list进行日志的订阅接收。channel采用内存缓存队列。sink由于我们的需求是按照服务名称和日期切分日志,而官方提供的默认file roll sink,只能按照时间戳,和时间interval来切分。
[AppleScript] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# Name the components on this agent a 1. sources = r 1
a 1. sinks = k 1
a 1. channels = c 1
a 1. sources.r 1. type = org.apache.flume.source.kafka.KafkaSource
a 1. sources.r 1. zookeeperConnect = localhost : 2181
a 1. sources.r 1. topic = keplerlog
a 1. sources.r 1. batchSize = 5
a 1. sources.r 1. groupId = flume - collector
a 1. sources.r 1. kafka.consumer.timeout.ms = 800
# Describe the sink a 1. sinks.k 1. type = com.baidu.unbiz.flume.sink.RollingByTypeAndDayFileSink
a 1. sinks.k 1. channel = c 1
a 1. sinks.k 1. sink. directory = / home / work / data / kepler - log
# Use a channel which buffers events in memory a 1. channels.c 1. type = memory
a 1. channels.c 1. capacity = 1000000
a 1. channels.c 1. transactionCapacity = 100
# Bind the source and sink to the channel a 1. sources.r 1. channels = c 1
a 1. sinks.k 1. channel = c 1
|
定制版RollingByTypeAndDayFileSink
源代码见github。RollingByTypeAndDayFileSink使用有两个条件:
1)Event header中必须有timestamp,否则会忽略事件,并且会抛出{@link InputNotSpecifiedException}
2)Event body如果是按照##$$##分隔的,那么把分隔之前的字符串当做模块名称(module name)来处理;如果没有则默认为default文件名。
输出到本地文件,首先要设置一个跟目录,通过sink.directory设置。其次根据条件#2中提取出来的module name作为文件名称前缀,timestamp日志作为文件名称后缀,例如文件名为portal.20150606或者default.20150703。
规整完的一个文件目录形式如下,可以看出汇集了众多服务的日志,并且按照服务名称、时间进行了区分:
[AppleScript] 纯文本查看 复制代码
1
2
3
4
5
6
7
|
~ / data / kepler - log $ ls
authorization. 20160512 default. 20160513 default. 20160505
portal. 20160512 portal. 20160505 portal. 20160514
|
不得不提的两个坑
坑1:
回到前两节提到的自定义了一个StaticLinePrefixExecSource来进行添加一些前缀的工作。由于要区分来源的服务/模块名称,并且按照时间来切分,根据官方flume文档,完全可以采用如下的Source拦截器配置。例如i1表示时间戳,i2表示默认的静态变量KV,key=module,value=portal。
[AppleScript] 纯文本查看 复制代码
1
2
3
4
5
|
a 1. sources.r 1. interceptors = i 2 i 1
a 1. sources.r 1. interceptors.i 1. type = timestamp
a 1. sources.r 1. interceptors.i 2. type = static
a 1. sources.r 1. interceptors.i 2. key = module
a 1. sources.r 1. interceptors.i 2. value = portal
|
但是flume官方默认的KafkaSource(v1.6.0)的实现:
[AppleScript] 纯文本查看 复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
while ( eventList.size ( ) < batchUpperLimit & &
System.currentTimeMillis ( ) < batchEndTime ) {
iterStatus = hasNext ( ) ;
if ( iterStatus ) {
/ / get next message
MessageAndMetadata < byte[] , byte[] > messageAndMetadata = it .next ( ) ;
kafkaMessage = messageAndMetadata. message ( ) ;
kafkaKey = messageAndMetadata. key ( ) ;
/ / Add headers to event ( topic , timestamp , and key )
headers = new HashMap < String , String > ( ) ;
headers.put ( KafkaSourceConstants.TIMESTAMP ,
String.valueOf ( System.currentTimeMillis ( ) ) ) ;
headers.put ( KafkaSourceConstants.TOPIC , topic ) ;
if ( kafkaKey ! = null ) {
headers.put ( KafkaSourceConstants.KEY , new String ( kafkaKey ) ) ;
}
if ( log .isDebugEnabled ( ) ) {
log .debug ( "Message: {}" , new String ( kafkaMessage ) ) ;
}
event = EventBuilder.withBody ( kafkaMessage , headers ) ;
eventList. add ( event ) ;
}
|
可以看出自己重写了Event header中的KV,丢弃了发送过来的header,因为这个坑的存在因此,tailf -F在event body中在前面指定模块/服务名称,然后RollingByTypeAndDayFileSink会按照分隔符切分。否则下游无法能达到KV。
坑2:
exec source需要执行tail -F命令来通过标准输出和标准错误一行一行的读取,但是如果把tail -F封装在一个脚本中,脚本中再执行一些管道命令,例如tail -F logback.log | awk ‘{print "portal##$$##"$0}’,那么exec source总是会把最近的输出丢弃掉,导致追加到文件末尾的日志有一些无法总是“姗姗来迟”,除非有新的日志追加,他们才会被“挤”出来。这个问题比较诡异。暂时没有细致研究。以示后人不要采坑。
5 结语
从这个分布式服务分散日志的集中收集方法,可以看出利用一些开源组件,可以非常方便的解决我们日常工作中所发现的问题,而这个发现问题和解决问题的能力才是工程师的基本素质要求。对于其不满足需求的,需要具备有钻研精神,知其然还要知其所以然的去做一些ad-hoc工作,才可以更加好的leverage这些组件。
另外,日志的收集只是起点,利用宝贵的数据,后面的使用场景和想象空间都会非常大,例如
1)利用Spark streaming在一个时间窗口内计算日志,做流量控制和访问限制。
2)使用awk脚本、scala语言的高级函数做单机的访问统计分析,或者Hadoop、Spark做大数据的统计分析。
3)除了端口存活和语义监控,利用实时计算处理日志,做ERROR、异常等信息的过滤,实现服务真正的健康保障和预警监控。
4)收集的日志可以通过logstash导入Elastic Search,使用ELK方式做日志查询使用。