ELK之LogStash日志数据采集
目录
1、概述
logstash就是一个具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以根据需求在中间加上滤网;是一个input | filter | output 的数据流。
2、Input插件
2.1、stdin标准输入和stdout标准输出
使用标准的输入与输出组件,实现将我们的数据从控制台输入,从控制台输出:
cd /install/logstash-6.7.0/
bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
2.2、监控日志文件变化
Logstash 使用一个名叫 FileWatch 的 Ruby Gem 库来监听文件变化。这个库支持 glob 展开文件路径,而且会记录一个叫 .sincedb 的数据库文件来跟踪被监听的日志文件的当前读取位置。
(1)编写脚本
cd /install/logstash-6.7.0/config
vim monitor_file.conf 输入如下内容:
input{
file{
path => "/install/datas/tomcat.log" //得先创建该目录mkdir -p /install/datas
type => "log"
start_position => "beginning"
}
}
output{
stdout{
codec=>rubydebug
}
}
(2)检查配置文件是否可用
cd /install/logstash-6.7.0/
bin/logstash -f /install/logstash-6.7.0/config/monitor_file.conf -t
(3)启动服务
cd /install/logstash-6.7.0
bin/logstash -f /install/logstash-6.7.0/config/monitor_file.conf //与(2)的区别就是-t
(4)发送数据
向tomcat.log文件中追加内容,观察控制台变化。
echo "hello logstash" >> /install/datas/tomcat.log
(5)其它参数说明
Path=>表示监控的文件路径
Type=>给类型打标记,用来区分不同的文件类型。
Start_postion=>从哪里开始记录文件,默认是从结尾开始标记,要是你从头导入一个文件就把改成”beginning”.
discover_interval=>多久去监听path下是否有文件,默认是15s
exclude=>排除什么文件
close_older=>一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3600秒,即一个小时。
sincedb_path=>监控库存放位置(默认的读取文件信息记录在哪个文件中)。默认在:/data/plugins/inputs/file。
sincedb_write_interval=> logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。
stat_interval=>logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。
3、jdbc插件
(1)开发脚本配置文件
cd /install/logstash-6.7.0/config
vim jdbc.conf 输入如下内容:
input {
jdbc {
jdbc_driver_library => "/kkb/install/mysql-connector-java-5.1.38.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://本地ip地址/mydb" //在cmd窗口中输入ipconfig查询
jdbc_user => "root"
jdbc_password => "root" //注意本地mysql需先开启远程连接权限,并强制刷新
//(开启远程连接权限grant all privileges on *.* to 'root'@'%' identified by 'root' with grant option;
flush privileges;)use_column_value => true
tracking_column => "tno"
# parameters => { "favorite_artist" => "Beethoven" }
schedule => "* * * * *"
statement => "SELECT * from courses where tno > :sql_last_value ;" //最后的值
}
}
output{
stdout{
codec=>rubydebug
}
}
(2)上传mysql连接驱动包到指定路劲
将我们mysql的连接驱动包mysql-connector-java-5.1.38.jar上传到我们指定的/install/路径下
(3)检查配置文件是否可用
cd /install/logstash-6.7.0/
bin/logstash -f /install/logstash-6.7.0/config/jdbc.conf -t
(4)启动服务
cd /install/logstash-6.7.0
bin/logstash -f /install/logstash-6.7.0/config/jdbc.conf
(5)数据库当中添加数据
在我们的数据库当中手动随便插入数据,发现我们的logstash可以进行收集。
4、systlog插件
syslog机制负责记录内核和应用程序产生的日志信息,管理员可以通过查看日志记录,来掌握系统状况。
默认系统已经安装了rsyslog.直接启动即可。
(1)编写脚本
cd /install/logstash-6.7.0/config
vim syslog.conf 输入如下内容:
input{
tcp{
port=> 6789
type=> syslog
}
udp{
port=> 6789
type=> syslog
}
}
filter{
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
output{
stdout{
codec=> rubydebug
}
}
(2)检查配置文件是否可用
cd /install/logstash-6.7.0
bin/logstash -f /install/logstash-6.7.0/config/syslog.conf -t
(3)启动服务
cd /install/logstash-6.7.0
bin/logstash -f /install/logstash-6.7.0/config/syslog.conf
(4)发送数据
修改系统日志配置文件
sudo vim /etc/rsyslog.conf
添加一行以下配置
*.* @@node01:6789
重启系统日志服务使之生效:
sudo systemctl restart rsyslog
(5)其它参数说明
在logstash中的grok是正则表达式,用来解析当前数据。
原始数据其实是:
Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
Jun 05 08:00:00 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied
Jun 05 08:10:00 louis CRON[620]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)
Jun 05 08:05:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.
5、filter插件
Logstash之所以强悍的主要原因是filter插件;通过过滤器的各种组合可以得到我们想要的结构化数据。
1、grok正则表达式
grok正则表达式是logstash非常重要的一个环节;可以通过grok非常方便的将数据拆分和索引。
语法格式:
(?<name>pattern)
?<name>表示要取出里面的值,pattern就是正则表达式。
2、收集控制台输入数据,采集日期时间出来
(1)开发配置文件
cd /install/logstash-6.7.0/config/
vim filter.conf 输入如下内容:
input {stdin{}} filter {
grok {
match => {
"message" => "(?<date>\d+\.\d+)\s+"
}
}
}
output {stdout{codec => rubydebug}}
(2):启动logstash服务
cd /install/logstash-6.7.0/
bin/logstash -f /install/logstash-6.7.0/config/filter.conf
(3):控制台输入文字
今天天气不错
3、使用grok收集nginx日志数据
(1)安装grok插件(本文略)
(2)开发logstash的配置文件
定义logstash的配置文件如下,我们从控制台输入nginx的日志数据,然后经过filter的过滤,将我们的日志文件转换成为标准的数据格式。
cd /install/logstash-6.7.0/config
vim monitor_nginx.conf 输入如下内容:
input {stdin{}}
filter {
grok {
match => {
"message" => "%{IPORHOST:clientip} \- \- \[%{HTTPDATE:time_local}\] \"(?:%{WORD:method} %{NOTSPACE:request}(?:HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS:http_referer} %{QS:agent}"
}
}
}
output {stdout{codec => rubydebug}}
(3):启动logstash
执行以下命令启动logstash
cd /install/logstash-6.7.0
bin/logstash -f /install/logstash-6.7.0/config/monitor_nginx.conf
(4):从控制台输入nginx日志文件数据
输入第一条数据
36.157.150.1 - - [05/Nov/2018:12:59:27 +0800] "GET /phpmyadmin_8c1019c9c0de7a0f/js/messages.php?lang=zh_CN&db=&collation_connection=utf8_unicode_ci&token=6a44d72481633c90bffcfd42f11e25a1 HTTP/1.1" 200 8131 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36"
输入第二条数据
36.157.150.1 - - [05/Nov/2018:12:59:28 +0800] "GET /phpmyadmin_8c1019c9c0de7a0f/js/get_scripts.js.php?scripts%5B%5D=jquery/jquery-1.11.1.min.js&scripts%5B%5D=sprintf.js&scripts%5B%5D=ajax.js&scripts%5B%5D=keyhandler.js&scripts%5B%5D=jquery/jquery-ui-1.11.2.min.js&scripts%5B%5D=jquery/jquery.cookie.js&scripts%5B%5D=jquery/jquery.mousewheel.js&scripts%5B%5D=jquery/jquery.event.drag-2.2.js&scripts%5B%5D=jquery/jquery-ui-timepicker-addon.js&scripts%5B%5D=jquery/jquery.ba-hashchange-1.3.js HTTP/1.1" 200 139613 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36"
6、Output插件
1、标准输出到控制台
将我们收集的数据直接打印到控制台:
output {
stdout {
codec => rubydebug
}
}bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
hello
2、将采集数据保存到file文件中
logstash也可以将收集到的数据写入到文件当中去永久保存,接下来我们来看看logstash如何配置以实现将数据写入到文件当中。
(1)开发logstash的配置文件
cd /install/logstash-6.7.0/config
vim output_file.conf 输入如下内容:
input {stdin{}}
output {
file {
path => "/kkb/install/datas/%{+YYYY-MM-dd}-%{host}.txt"
codec => line {
format => "%{message}"
}
flush_interval => 0
}
}
(2)检测配置文件并启动logstash服务
cd /install/logstash-6.7.0
检测配置文件是否正确
bin/logstash -f config/output_file.conf -t
启动服务,然后从控制台输入一些数据
bin/logstash -f config/output_file.conf
查看文件写入的内容
cd /install/datas
more 2018-11-08-node01.hadoop.com.txt
3、将采集数据保存到elasticsearch
(1)开发logstash的配置文件
cd /install/logstash-6.7.0/config
vim output_es.conf
input {stdin{}}
output {
elasticsearch {
hosts => ["node01:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}
这个index是保存到elasticsearch上的索引名称,如何命名特别重要,因为我们很可能后续根据某些需求做查询,所以最好带时间,因为我们在中间加上type,就代表不同的业务,这样我们在查询当天数据的时候,就可以根据类型+时间做范围查询。
(2)检测配置文件并启动logstash
检测配置文件是否正确
cd /install/logstash-6.7.0
bin/logstash -f config/output_es.conf -t
启动logstash
bin/logstash -f config/output_es.conf
在控制台中输入一些内容,如图:
(3)es当中查看数据
访问http://node01:9100/查看es当中的数据