Flume 多种场景下的配置
1、从指定的网络端口采集数据输出到Flume控制台
Agent选型:netcat source + memory channel + logger sink。(netcat=>logger)
配置文件:netcat-memory-logger.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind =hadoop000
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
|
启动agent:
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /www/instl/flume/flume_agent_conf/netcat-memory-logger.conf \
-Dflume.root.logger=INFO,console
|
使用telnet进行测试: telnet hadoop 44444
2、监控一个文件实时采集新增的数据输出到Flume控制台
Agent选型:exec source + memory channel + logger sink (exec=>logger)
配置文件: exec-memory-logger.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /www/data/inputfile/flume.log
a1.sources.r1.shell = /bin/sh -c
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
|
启动agent:
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /www/instl/flume/flume_agent_conf/exec-memory-logger.conf \
-Dflume.root.logger=INFO,console
|
测试; echo helloworld >> /www/data/inputfile/flume.log
3、将A服务器上从日志文件中实时读取采集到B服务器上的Flume控制台
Agent选型:
exec source + memory channel + avro sink
avro source + memory channel + logger sink
(exec=>avro=>logger)
A服务器上的Flume配置:exec-memory-avro.conf
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /www/data/inputfile/flume.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname =hadoop000
exec-memory-avro.sinks.avro-sink.port = 44444
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
|
B服务器上的Flume配置:avro-memory-logger.conf
avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel
avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind =hadoop000
avro-memory-logger.sources.avro-source.port = 44444
avro-memory-logger.sinks.logger-sink.type = logger
avro-memory-logger.channels.memory-channel.type = memory
avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel
|
先启动B服务器上的:avro-memory-logger.conf
flume-ng agent \
--name avro-memory-logger \
--conf $FLUME_HOME/conf \
--conf-file /www/instl/flume/flume_agent_conf/avro-memory-logger.conf \
-Dflume.root.logger=INFO,console
|
再启动A服务器上的:exec-memory-avro.conf
flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf \
--conf-file /www/instl/flume/flume_agent_conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console
|
测试; echo helloworld >> /www/data/inputfile/flume.log
4、将A服务器上的日志实时采集到B服务器上的Kafka进行数据采集供SparkStreaming使用
Agent选型:log=>flume=>kafka=>sparkStreaming
步骤1】 A服务器=>B服务器: exec source + memory channel + avro sink
步骤2】 B服务器flume=>kafka : avro source + memory channel + Kafka sink
步骤3】 B服务器kafka=>sparkStreaming : KafkaUtils工具类
A服务器上的Flume配置:exec-memory-avro.conf
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /www/data/inputfile/flume.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname =hadoop000
exec-memory-avro.sinks.avro-sink.port = 44444
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
|
B服务器上的Flume配置:avro-memory-kafka.conf
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks =kafka-sink
avro-memory-kafka.channels = memory-channel
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind =hadoop000
avro-memory-kafka.sources.avro-source.port = 44444
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
#这是单节点配置。 如果为多Broker配置,则以 , 分割
avro-memory-kafka.sinks.kafka-sink.brokerList=hadoop000:9092
avro-memory-kafka.sinks.kafka-sink.topic=hello_topic
avro-memory-kafka.sinks.kafka-sink.batchSize=5
avro-memory-kafka.sinks.kafka-sink.requiredAcks=1
avro-memory-kafka.channels.memory-channel.type = memory
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
|
部署
(1)开始kafka服务并创建topic:
kafka-server-start.sh $KAFKA_HOME/config/server.properties
kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic test
监控: kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic test --from-beginning
(2)先启动B服务器上的:avro-memory-kafka.conf
flume-ng agent \
--name avro-memory-kafka \
--conf $FLUME_HOME/conf \
--conf-file /www/instl/flume/flume_agent_conf/avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console
|
(3)再启动A服务器上的:exec-memory-avro.conf
flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf \
--conf-file /www/instl/flume/flume_agent_conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console
|
(4)测试
模拟日志文件的产生 echo helloworld >> /www/data/inputfile/flume.log
5、从控制台到通过Flume 以Push(推到Spark)的方式到SparkStreaming
Agent选型: netcat-source+ memory-channel+spark-sink (netcat=>spark)
flume_pull_streaming.conf
simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel
simple-agent.sources.netcat-source.bind = hadoop000
simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname = hadoop000
simple-agent.sinks.spark-sink.port = 41414
simple-agent.channels.memory-channel.type = memory
simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.spark-sink.channel = memory-channel
|
(先启动sparkstreaming作业)
spark-submit \
--class com.imooc.spark.FlumePushWordCount \
--master local[2] \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \
/www/lib/sparktrain-1.0.jar \
hadoop000 41414
|
再启动agent:
flume-ng agent \
--name simple-agent \
--conf $FLUME_HOME/conf \
--conf-file /www/instl/flume/flume_agent_conf/flume_push_streaming.conf \
-Dflume.root.logger=INFO,console
|
使用telnet进行测试: telnet hadoop 44444
6、从控制台到通过Flume 以Pull(Spark自取)的方式到SparkStreaming
Agent选型: netcat-source+ memory-channel+spark-sink
flume_push_streaming.conf
simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel
simple-agent.sources.netcat-source.bind = hadoop000
simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname = hadoop000
simple-agent.sinks.spark-sink.port = 41414
simple-agent.channels.memory-channel.type = memory
simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.spark-sink.channel = memory-channel
|
先启动agent:
flume-ng agent \
--name simple-agent \
--conf $FLUME_HOME/conf \
--conf-file /www/instl/flume/flume_agent_conf/flume_push_streaming.conf \
-Dflume.root.logger=INFO,console
|
(再启动sparkstreaming作业)
spark-submit \
--class com.imooc.spark.FlumePullWordCount \
--master local[2] \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \
/www/lib/sparktrain-1.0.jar \
hadoop000 41414
|
7、从A服务器上的log4j实时日志文件将实时产生的实时日志通过Flume搜集到B服务器上的Flume控制台
Agent选型: avro-source+ memory-channel+logger-sink
avro-memory-logger.conf
avro-memory-log.sources=avro-source
avro-memory-log.channels=memory-channel
avro-memory-log.sinks=logger-sink
avro-memory-log.sources.avro-source.type=avro
avro-memory-log.sources.avro-source.bind=hadoop000
avro-memory-log.sources.avro-source.port=41414
avro-memory-log.channels.memory-channel.type=memory
avro-memory-log.sinks.logger-sink.type=logger
avro-memory-log.sources.avro-source.channels=memory-channel
avro-memory-log.sinks.logger-sink.channel=memory-channel
|
产生日志的A服务器中的log4j.properties 的配置
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname =140.143.236.169
log4j.appender.flume.Port =41414
log4j.appender.flume.UnsafeMode = false
|
即:
部署
(1)开始kafka服务并创建topic:
kafka-server-start.sh $KAFKA_HOME/config/server.properties
kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic test
监控: kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic test --from-beginning
(2)启动产生日志服务的应用
(4)再启动A服务器上的: avro-memory-logger.conf
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file /www/instl/flume/flume_agent_conf/avro-memory-logger.conf \
--name avro-memory-logger \
-Dflume.root.logger=INFO,console
|
8、从A服务器上的log4j实时日志文件将实时产生的实时日志通过Flume搜集到B服务器上的Kafka控制台
Agent选型: avro-source +memory-channel + kafka-sink
avro-memory-kafka.conf
avro-memory-kafka.sources=avro-source
avro-memory-kafka.channels=memory-channel
avro-memory-kafka.sinks=kafka-sink
avro-memory-kafka.sources.avro-source.type=avro
avro-memory-kafka.sources.avro-source.bind=hadoop000
avro-memory-kafka.sources.avro-source.port=41414
avro-memory-kafka.channels.memory-channel.type=memory
avro-memory-kafka.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.topic = test
avro-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1
avro-memory-kafka.sinks.kafka-sink.batchSize = 20
avro-memory-kafka.sources.avro-source.channels=memory-channel
avro-memory-kafka.sinks.kafka-sink.channel=memory-channel
|
产生日志的A服务器中的log4j.properties 的配置
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname =140.143.236.169
log4j.appender.flume.Port =41414
log4j.appender.flume.UnsafeMode = false
|
即:
部署
(1)开始kafka服务并创建topic:
kafka-server-start.sh $KAFKA_HOME/config/server.properties
kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic test
监控: kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic test --from-beginning
(2)再启动A服务器上的: avro-memory-logger.conf
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file /www/instl/flume/flume_agent_conf/avro-memory-kafka.conf \
--name avro-memory-kafka \
-Dflume.root.logger=INFO,console
|
(3)启动产生日志服务的应用
9、监控A服务器目录日志通过Flume 上传到B 服务器的HDFS 中
Agent选型:
exec source + memory channel + avro sink
avro source + memory channel + hdfs sink
(exec=>avro=>logger)
A服务器上的Flume配置:exec-memory-avro.conf
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel
exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /www/data/inputfile/flume.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname =hadoop000
exec-memory-avro.sinks.avro-sink.port = 44444
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
|
B服务器上的Flume配置:avro-memory-hdfs.conf
# 配置Agent
avro-memory-hdfs.sources = avro-source
avro-memory-hdfs.channels = momory-channel
avro-memory-hdfs.sinks = hdfs-sink
# 配置Source
avro-memory-hdfs.sources.avro-source.type = avro
avro-memory-hdfs.sources.avro-source.bind = hadoop000
avro-memory-hdfs.sources.avro-source.port = 44444
#具体定义sink
avro-memory-hdfs.sinks.hdfs-sink.type = hdfs
avro-memory-hdfs.sinks.hdfs-sink.channel = momory-channel
avro-memory-hdfs.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
avro-memory-hdfs.sinks.hdfs-sink.hdfs.path = hdfs://hadoop000:8020/flume/events/%Y-%m
avro-memory-hdfs.sinks.hdfs-sink.hdfs.filePrefix = %Y-%m-%d-%H
avro-memory-hdfs.sinks.hdfs-sink.hdfs.fileSuffix = .log
avro-memory-hdfs.sinks.hdfs-sink.hdfs.minBlockReplicas = 1
avro-memory-hdfs.sinks.hdfs-sink.hdfs.fileType = DataStream
avro-memory-hdfs.sinks.hdfs-sink.hdfs.writeFormat = Text
avro-memory-hdfs.sinks.hdfs-sink.hdfs.rollInterval = 86400
avro-memory-hdfs.sinks.hdfs-sink.hdfs.rollSize = 1000000
avro-memory-hdfs.sinks.hdfs-sink.hdfs.rollCount = 10000
avro-memory-hdfs.sinks.hdfs-sink.hdfs.idleTimeout = 0
# 配置Channel
avro-memory-hdfs.channels.momory-channel.type = memory
avro-memory-hdfs.channels.momory-channel.capacity = 10000
avro-memory-hdfs.channels.momory-channel.transactionCapacity = 1000
# 将三者连接
avro-memory-hdfs.sources.avro-source.channels = momory-channel
avro-memory-hdfs.sinks.hdfs-sink.channel = momory-channel
|
先启动B服务器上的:avro-memory-logger.conf
flume-ng agent \
--name avro-memory-hdfs \
--conf $FLUME_HOME/conf \
--conf-file /www/instl/flume/flume_agent_conf/avro-memory-hdfs.conf \
-Dflume.root.logger=INFO,console
|
再启动A服务器上的:exec-memory-avro.conf
flume-ng agent \
--name exec-memory-avro \
--conf $FLUME_HOME/conf \
--conf-file /www/instl/flume/flume_agent_conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console
|
测试; echo helloworld >> /www/data/inputfile/flume.log