flume是配置和使用
下载地址:
解压 tar -zxvf filename
配置环境变量:sudo vim /etc/profile
export FLUME_HOME=文件的主目录
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin
source /etc/profile
修改flume-env.sh中的JAVA_HOME的路径和FLUME_OPTS
然后命令行输入flume-ng version验证是否成功配置出现如下,成功
然后配置flume-conf.properties文件,配置source接受数据的方式
1.exec
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
#输入文件所在的位置
a1.sources.r1.command = tail -F /home/spark/log
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动agent a1
在$FLUME_HOME目录中输入
bin/flume-ng agent --conf ./conf -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n a1
然后重新开启一个终端输入
for i in {1..100};do echo "exec tail$i" >> /home/spark/log;echo $i;sleep 0.1;done
成功截图:
2.netcat
# Name the components on this agent
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure source1
agent1.sources.source1.type = netcat
agent1.sources.source1.bind = localhost
agent1.sources.source1.port = 44444
# Describe sink1
agent1.sinks.sink1.type = logger
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
启动agent agent1
在$FLUME_HOME目录中输入
bin/flume-ng agent --conf ./conf -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -nagent1
然后重新开启一个终端输入
telnet localhost 44444
成功截图:
3.exec--hdfs
# Name the components on this agent
agent1.sources = r1
agent1.sinks = s_hdfs
agent1.channels = c_hdfs
agent1.sources.r1.channels = c_hdfs
# Describe/configure source1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /home/spark/log1
#channels
agent1.channels.c_hdfs.type = memory
agent1.channels.c_hdfs.capacity = 1000
agent1.channels.c_hdfs.transactionCapacity = 100
#sinks
agent1.sinks.s_hdfs.type = hdfs
agent1.sinks.s_hdfs.channel = c_hdfs
#default port :8082
agent1.sinks.s_hdfs.hdfs.path= hdfs://spark03:9000/root/source
#agent1.sinks.s_hdfs.filePrefix = event-
agent1.sinks.s_hdfs.hdfs.fileType = DataStream
agent1.sinks.s_hdfs.hdfs.writeFormat = Text
agent1.sinks.s_hdfs.hdfs.rollCount = 30
agent1.sinks.s_hdfs.hdfs.rollSize = 0
agent1.sinks.s_hdfs.hdfs.rollInterval = 0
agent1.sinks.s_hdfs.hdfs.useLocalTimeStamp = true
agent1.sinks.s_hdfs.hdfs.idleTimeout = 51
agent1.sinks.s_hdfs.hdfs.threadsPoolSize = 1
启动agent agent1
在$FLUME_HOME目录中输入
bin/flume-ng agent --conf ./conf -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1
在另一个终端输入
for i in {1..100};do echo "exec tail$i" >> /home/spark/log;echo $i;sleep 0.1;done
结束后输入
hadoop fs -ls -R /root/source 查看是否成功输入到hdfs中
4.spooldir:
agent.sources = r1
agent.sinks = k1
agent.channels = c1
#Describe/configure the source
agent.sources.r1.type=spooldir
#input file location
agent.sources.r1.spoolDir=/home/spark/log/
agent.sources.r1.inputCharset=utf-8
#Derscribe the sink
agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
#topic
agent.sinks.k1.topic = test
agent.sinks.k1.brokerList = spark02:9092
agent.sinks.k1.requireAcks = 1
agetn.sinks.k1.batchSize = 2000
#User a channel which buffers events in memory
agent.channels.c1.type=memory
agent.channels.c1.capacity=1000
agent.channels.c1.transactionCapacity=100
#Bind the sources and sink to the channel
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1