Logstash整合kafka
环境ubuntu16.04
kafka0.9.0.0安装
logstash2.4.1安装
zookeeper3.4.9安装
JDK1.8安装
启动zk
cd /app/zookeeper/bin
./zkServer.sh start
启动kafka
cd /app/kafka
bin/kafka-server-start.sh -daemon config/server.properties &
创建topic
bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 1 --topic test_topic
查看topic
bin/kafka-topics.sh --list --zookeeper node1:2181
新开窗口连接kafka客户端消费消息
bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic test_topic
整合logstash和kafka
官方指导文件https://www.elastic.co/guide/en/logstash/current/index.html
output plugins --->kafka
注意版本对应
编辑logstash的配置启动文件
cd /app/logstash
vi FileLogstashKafka.conf 这个文件要自己创建
input {
file{
path => "/app/logstash/testFile.txt"
}
}
output {
kafka {
codec => json
topic_id => "test_topic"
bootstrap_servers => "node1:9092"
batch_size => 1
}
}
配置说明:logstash从文件testFile.txt中采集数据,然后把数据输出到kafka的test_topic主题中,batch_size=1只是测试用,生产根据具体业务来定。
启动logstash的FileLogstashKafka.conf文件
bin/logstash -f FileLogstashKafka.conf
往testFile.txt文件中添加数据
cd /app/logstash
echo "hello word" >> testFile.txt
查看kafka消费者是否消费了这条消息
这里处理的信息除了内容以外还有version,timestamp,path,host如果只要message,修改logstash的配置文件
input {
file{
path => "/app/logstash/testFile.txt"
}
}
output {
kafka {
topic_id => "storm_topic"
bootstrap_servers => "node1:9092"
batch_size => 1
codec => plain{
format => "%{message}"
}
}
}