Flume 多种场景下的配置


1、从指定的网络端口采集数据输出到Flume控制台

Agent选型:netcat  source + memory channel + logger sink。(netcat=>logger

配置文件:netcat-memory-logger.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind =hadoop000
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动agent:

flume-ng agent \
--name a1  \
--conf  $FLUME_HOME/conf  \
--conf-file /www/instl/flume/flume_agent_conf/netcat-memory-logger.conf  \
-Dflume.root.logger=INFO,console
使用telnet进行测试: telnet hadoop 44444

2、监控一个文件实时采集新增的数据输出到Flume控制台

Agent选型:exec source + memory channel + logger sink  (exec=>logger)

配置文件: exec-memory-logger.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /www/data/inputfile/flume.log
a1.sources.r1.shell = /bin/sh -c

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent:

flume-ng agent \
--name a1  \
--conf $FLUME_HOME/conf  \
--conf-file /www/instl/flume/flume_agent_conf/exec-memory-logger.conf \
-Dflume.root.logger=INFO,console
测试; echo helloworld >> /www/data/inputfile/flume.log


3、将A服务器上从日志文件中实时读取采集到B服务器上的Flume控制台
Flume 多种场景下的配置
Agent选型:
    exec source + memory channel + avro sink
    avro source + memory channel + logger sink
(exec=>avro=>logger)

A服务器上的Flume配置:exec-memory-avro.conf

exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /www/data/inputfile/flume.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname =hadoop000
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel

B服务器上的Flume配置:avro-memory-logger.conf

avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel

avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind =hadoop000
avro-memory-logger.sources.avro-source.port = 44444

avro-memory-logger.sinks.logger-sink.type = logger

avro-memory-logger.channels.memory-channel.type = memory

avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel

先启动B服务器上的:avro-memory-logger.conf

flume-ng agent \
--name avro-memory-logger  \
--conf $FLUME_HOME/conf  \
--conf-file /www/instl/flume/flume_agent_conf/avro-memory-logger.conf \
-Dflume.root.logger=INFO,console
再启动A服务器上的:exec-memory-avro.conf
flume-ng agent \
--name exec-memory-avro  \
--conf $FLUME_HOME/conf  \
--conf-file  /www/instl/flume/flume_agent_conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console
测试; echo helloworld >> /www/data/inputfile/flume.log

4、将A服务器上的日志实时采集到B服务器上的Kafka进行数据采集供SparkStreaming使用

Flume 多种场景下的配置

 Agent选型:log=>flume=>kafka=>sparkStreaming
    步骤1】 A服务器=>B服务器:  exec source + memory channel + avro sink
    步骤2】 B服务器flume=>kafka  :   avro source + memory channel + Kafka sink
    步骤3】 B服务器kafka=>sparkStreaming : KafkaUtils工具类

A服务器上的Flume配置:exec-memory-avro.conf
exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /www/data/inputfile/flume.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname =hadoop000
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel

B服务器上的Flume配置:avro-memory-kafka.conf
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks =kafka-sink
avro-memory-kafka.channels = memory-channel

avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind =hadoop000
avro-memory-kafka.sources.avro-source.port = 44444

avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
#这是单节点配置。 如果为多Broker配置,则以 ,  分割
avro-memory-kafka.sinks.kafka-sink.brokerList=hadoop000:9092
avro-memory-kafka.sinks.kafka-sink.topic=hello_topic
avro-memory-kafka.sinks.kafka-sink.batchSize=5        
avro-memory-kafka.sinks.kafka-sink.requiredAcks=1   


avro-memory-kafka.channels.memory-channel.type = memory

avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
部署
(1)开始kafka服务并创建topic: 
        kafka-server-start.sh $KAFKA_HOME/config/server.properties
          kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic test
        监控: kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic test --from-beginning
(2)先启动B服务器上的:avro-memory-kafka.conf
flume-ng agent \
--name avro-memory-kafka  \
--conf $FLUME_HOME/conf  \
--conf-file  /www/instl/flume/flume_agent_conf/avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console
(3)再启动A服务器上的:exec-memory-avro.conf
flume-ng agent \
--name exec-memory-avro  \
--conf $FLUME_HOME/conf  \
--conf-file  /www/instl/flume/flume_agent_conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console
(4)测试
       模拟日志文件的产生  echo helloworld >> /www/data/inputfile/flume.log


5、从控制台到通过Flume 以Push(推到Spark)的方式到SparkStreaming

    Agent选型: netcat-source+ memory-channel+spark-sink (netcat=>spark)

     flume_pull_streaming.conf
simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel


simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname = hadoop000
simple-agent.sinks.spark-sink.port = 41414

simple-agent.channels.memory-channel.type = memory

simple-agent.sinks.spark-sink.channel = memory-channel
(先启动sparkstreaming作业)
spark-submit \
--class com.imooc.spark.FlumePushWordCount \
--master local[2] \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \
/www/lib/sparktrain-1.0.jar \
hadoop000 41414
再启动agent:

flume-ng agent \
--name  simple-agent  \
--conf $FLUME_HOME/conf  \
--conf-file  /www/instl/flume/flume_agent_conf/flume_push_streaming.conf \
-Dflume.root.logger=INFO,console

使用telnet进行测试: telnet hadoop 44444

6、从控制台到通过Flume 以Pull(Spark自取)的方式到SparkStreaming

    Agent选型: netcat-source+ memory-channel+spark-sink 
     flume_push_streaming.conf

simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel


simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname = hadoop000
simple-agent.sinks.spark-sink.port = 41414

simple-agent.channels.memory-channel.type = memory

simple-agent.sinks.spark-sink.channel = memory-channel
先启动agent:

flume-ng agent \
--name  simple-agent  \
--conf $FLUME_HOME/conf  \
--conf-file  /www/instl/flume/flume_agent_conf/flume_push_streaming.conf \
-Dflume.root.logger=INFO,console

(再启动sparkstreaming作业)

spark-submit \
--class com.imooc.spark.FlumePullWordCount \
--master local[2] \
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 \
/www/lib/sparktrain-1.0.jar \
hadoop000 41414
7、从A服务器上的log4j实时日志文件将实时产生的实时日志通过Flume搜集到B服务器上的Flume控制台

 Agent选型:  avro-source+ memory-channel+logger-sink
 avro-memory-logger.conf
avro-memory-log.sources=avro-source
avro-memory-log.channels=memory-channel
avro-memory-log.sinks=logger-sink

avro-memory-log.sources.avro-source.type=avro
avro-memory-log.sources.avro-source.bind=hadoop000
avro-memory-log.sources.avro-source.port=41414

avro-memory-log.channels.memory-channel.type=memory

avro-memory-log.sinks.logger-sink.type=logger

avro-memory-log.sources.avro-source.channels=memory-channel
avro-memory-log.sinks.logger-sink.channel=memory-channel
产生日志的A服务器中的log4j.properties 的配置
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname =140.143.236.169
log4j.appender.flume.Port =41414
log4j.appender.flume.UnsafeMode = false
即:
Flume 多种场景下的配置
部署
(1)开始kafka服务并创建topic: 
        kafka-server-start.sh $KAFKA_HOME/config/server.properties
          kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic test
        监控: kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic test --from-beginning
(2)启动产生日志服务的应用
(4)再启动A服务器上的: avro-memory-logger.conf
flume-ng  agent \
--conf $FLUME_HOME/conf \
--conf-file /www/instl/flume/flume_agent_conf/avro-memory-logger.conf \
--name avro-memory-logger \
-Dflume.root.logger=INFO,console

8、从A服务器上的log4j实时日志文件将实时产生的实时日志通过Flume搜集到B服务器上的Kafka控制台

 Agent选型:   avro-source +memory-channel + kafka-sink
avro-memory-kafka.conf
avro-memory-kafka.sources=avro-source
avro-memory-kafka.channels=memory-channel
avro-memory-kafka.sinks=kafka-sink

avro-memory-kafka.sources.avro-source.type=avro
avro-memory-kafka.sources.avro-source.bind=hadoop000
avro-memory-kafka.sources.avro-source.port=41414

avro-memory-kafka.channels.memory-channel.type=memory

avro-memory-kafka.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.topic = test
avro-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1
avro-memory-kafka.sinks.kafka-sink.batchSize = 20

avro-memory-kafka.sources.avro-source.channels=memory-channel
avro-memory-kafka.sinks.kafka-sink.channel=memory-channel
产生日志的A服务器中的log4j.properties 的配置
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname =140.143.236.169
log4j.appender.flume.Port =41414
log4j.appender.flume.UnsafeMode = false
即:
Flume 多种场景下的配置
部署
(1)开始kafka服务并创建topic: 
        kafka-server-start.sh $KAFKA_HOME/config/server.properties
          kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic test
        监控: kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic test --from-beginning

(2)再启动A服务器上的: avro-memory-logger.conf
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file /www/instl/flume/flume_agent_conf/avro-memory-kafka.conf \
--name avro-memory-kafka \
-Dflume.root.logger=INFO,console
(3)启动产生日志服务的应用

9、监控A服务器目录日志通过Flume 上传到B 服务器的HDFS 中
Flume 多种场景下的配置
Agent选型:
    exec source + memory channel + avro sink
    avro source + memory channel + hdfs sink
(exec=>avro=>logger)

A服务器上的Flume配置:exec-memory-avro.conf

exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /www/data/inputfile/flume.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname =hadoop000
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
B服务器上的Flume配置:avro-memory-hdfs.conf

# 配置Agent
avro-memory-hdfs.sources = avro-source
avro-memory-hdfs.channels = momory-channel
avro-memory-hdfs.sinks = hdfs-sink

# 配置Source
avro-memory-hdfs.sources.avro-source.type = avro
avro-memory-hdfs.sources.avro-source.bind = hadoop000
avro-memory-hdfs.sources.avro-source.port = 44444

#具体定义sink
avro-memory-hdfs.sinks.hdfs-sink.type = hdfs
avro-memory-hdfs.sinks.hdfs-sink.channel = momory-channel
avro-memory-hdfs.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
avro-memory-hdfs.sinks.hdfs-sink.hdfs.path = hdfs://hadoop000:8020/flume/events/%Y-%m
avro-memory-hdfs.sinks.hdfs-sink.hdfs.filePrefix = %Y-%m-%d-%H
avro-memory-hdfs.sinks.hdfs-sink.hdfs.fileSuffix = .log
avro-memory-hdfs.sinks.hdfs-sink.hdfs.minBlockReplicas = 1
avro-memory-hdfs.sinks.hdfs-sink.hdfs.fileType = DataStream
avro-memory-hdfs.sinks.hdfs-sink.hdfs.writeFormat = Text
avro-memory-hdfs.sinks.hdfs-sink.hdfs.rollInterval = 86400
avro-memory-hdfs.sinks.hdfs-sink.hdfs.rollSize = 1000000
avro-memory-hdfs.sinks.hdfs-sink.hdfs.rollCount = 10000
avro-memory-hdfs.sinks.hdfs-sink.hdfs.idleTimeout = 0

# 配置Channel
avro-memory-hdfs.channels.momory-channel.type = memory
avro-memory-hdfs.channels.momory-channel.capacity = 10000
avro-memory-hdfs.channels.momory-channel.transactionCapacity = 1000

# 将三者连接
avro-memory-hdfs.sources.avro-source.channels = momory-channel
avro-memory-hdfs.sinks.hdfs-sink.channel = momory-channel


先启动B服务器上的:avro-memory-logger.conf


flume-ng agent \
--name avro-memory-hdfs \
--conf $FLUME_HOME/conf  \
--conf-file /www/instl/flume/flume_agent_conf/avro-memory-hdfs.conf \
-Dflume.root.logger=INFO,console

再启动A服务器上的:exec-memory-avro.conf

flume-ng agent \
--name exec-memory-avro  \
--conf $FLUME_HOME/conf  \
--conf-file  /www/instl/flume/flume_agent_conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console

测试; echo helloworld >> /www/data/inputfile/flume.log