flume日志采集 到kafka ,使用断点续传
链接:https://pan.baidu.com/s/1Bj4ZW-zYybbeAxZt4-Jl8g
提取码:ca2o
复制这段内容后打开百度网盘手机App,操作更方便哦
使用断点续传所需要的JAR,可直接替换flume/bin目录下的flume-taildir-source-1.9.0.jar
链接:https://pan.baidu.com/s/1Aqla3SIWjwyN6M6v0Xgt4w
提取码:pd1j
复制这段内容后打开百度网盘手机App,操作更方便哦
#agent
customInterceptor.sources=r1
customInterceptor.channels=c1
customInterceptor.sinks=s1
#source
#customInterceptor.sources.r1.type=exec
#customInterceptor.sources.r1.command=tail -f /opt/module/logs/log/web.log
customInterceptor.sources.r1.type = TAILDIR
#记录每次读取的文件位置,断点续传用
customInterceptor.sources.r1.positionFile = /opt/module/logs/taildir_position.json
customInterceptor.sources.r1.filegroups=f1
customInterceptor.sources.r1.filegroups.f1=/opt/module/logs/log/web.log
customInterceptor.sources.r1.fileHeader=true
#channe1
customInterceptor.channels.c1.type=memory
customInterceptor.channels.c1.capacity=1000
customInterceptor.channels.c1.transactionCapacity=100
#sink
#设置Kafka接收器
customInterceptor.sinks.s1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
customInterceptor.sinks.s1.brokerList=192.168.0.20:9092
#设置Kafka的Topic
customInterceptor.sinks.s1.topic=first5
#设置序列化方式
customInterceptor.sinks.s1.serializer.class=kafka.serializer.StringEncoder
#package
customInterceptor.sources.r1.channels=c1
customInterceptor.sinks.s1.channel=c1