6、ELK+Kafka
环境
10.0.0.51: Logstash、Kafka、Filebeat
一、Zookkeeper和Kafka安装
1、依赖
Kafka
- Kafka依赖于Zookkeeper
- 两个都依赖于Java
Kafka依赖于Zookeeper
- 官方网站:https://zookeeper.apache.org/
- 下载ZK的二进制包
- 解压到对应目录完成安装
2、下载解压ZK,复制默认配置文件
tar -zxf zookeeper-3.4.13.tar.gz
cp /usr/local/zookeeper-3.4.13/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.13/conf/zoo.cfg
3、更改配置文件,监听所有地址
vim /usr/local/zookeeper-3.4.13/conf/zoo.cfg
clientPortAddress=0.0.0.0
3、启动zookeeper
/usr/local/zookeeper-3.4.13/bin/zkServer.sh start
4、zookeeper监听的端口为2181
[[email protected] local]# netstat -tlunp | grep 2181
tcp6 0 0 :::2181 :::* LISTEN 15295/java
5、Kafka下载解压
Kafka官网:http://kafka.apache.org/
tar -zvxf kafka_2.11-2.1.1.tgz
mv kafka_2.11-2.1.1 /usr/local/kafka_2.11
6、更改kafka的配置:更改监听地址、更改连接zk的地址
vim /usr/local/kafka_2.11/config/server.properties
listeners=PLAINTEXT://10.0.0.51:9092
zookeeper.connect=10.0.0.51:2181
7、启动
前台启动
/usr/local/kafka_2.11/bin/kafka-server-start.sh /usr/local/kafka_2.11/config/server.properties
后台启动
nohup /usr/local/kafka_2.11/bin/kafka-server-start.sh /usr/local/kafka_2.11/config/server.properties >/tmp/kafka.log 2>&1 &
8、Kafka监听9092端口
[[email protected] local]# netstat -tlunp | grep 9092
tcp6 0 0 10.0.0.51:9092 :::* LISTEN 15475/java
二、filebeat–>kafka–>logstash
1、修改filebeat配置文件,然后重新启动filebeat,将Filebeat日志发送到Kafka
vim /usr/local/filebeat-6.6.0/filebeat.yml
[[email protected] local]# cat /usr/local/filebeat-6.6.0/filebeat.yml
filebeat.inputs:
- type: log
tail_files: true
backoff: "1s"
paths:
- /usr/local/nginx/logs/access.log
fields:
type: access
fields_under_root: true
- type: log
tail_files: true
backoff: "1s"
paths:
- /var/log/secure
fields:
type: secure
fields_under_root: true
output:
kafka:
hosts: ["10.0.0.51:9092"]
topic: wangxiaoyu
2、修改Logstash的配置,从kafka读取数据,重启Logstash
如果没有配置codec => “json”,对从kafka读取的数据进行json解析,logstash虽然可以读取消费kafka里的数据,但没法将数据发送个elasticsearch,我日志也不是json格式的呀。。。。应该是kafka默认会把数据格式转为json。
[[email protected] local]# cat /usr/local/logstash-6.6.0/config/logstash.conf
input {
kafka {
codec => "json"
bootstrap_servers => "10.0.0.51:9092"
topics => ["wangxiaoyu"]
group_id => "wangxiaoyu"
}
}
filter {
mutate {
rename => { "[host][name]" => "host" }
}
}
output{
if [type] == "access" {
elasticsearch {
hosts => ["http://10.0.0.50:9200"]
index => "access-%{+YYYY.MM.dd}"
}
}
if [type] == "secure" {
elasticsearch {
hosts => ["http://10.0.0.50:9200"]
index => "secure-%{+YYYY.MM.dd}"
}
}
}
3、Kafka查看队列信息,有个消费者可以查看读取到哪里了。
[[email protected] bin]# cd /usr/local/kafka_2.11/bin
[[email protected] bin]# ./kafka-consumer-groups.sh --bootstrap-server 10.0.0.51:9092 --list
wangxiaoyu
[[email protected] bin]# ./kafka-consumer-groups.sh --bootstrap-server 10.0.0.51:9092 --group wangxiaoyu --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
wangxiaoyu 0 14 14 0 logstash-0-d5c3e806-655a-4d8c-870f-3daabffb38ce /10.0.0.51 logstash-0
4、查看日志