Python学习之 ---日志分析+数据分发与分析+多线程+queue模块+日志分析综合
日志分析
概述
生成中会生成大量的系统日志,应用程序日志,安全日志等等日志,通过对日志的分析可以了解服务器的负载,键康状况,可以分析客户的分布情况,客户的行为,甚至基于这些分析的时候可以做出预测.
一般采集流程,
日志产出->采集(Logdtash,Flume,Scribe) -> 存储->分析->存储(数据库,NoSQL)->可视化
开源实时日志分析ELK平台
Logstash 收集日志,并存放到ElastcSearch集群中,Kibana则从ES集群中查询数据生成图表,返回浏览器端
数据提取
结构化数据 ---- 数据库为结构化数据
半结构化数据 ----日志等文本,CSV文件
日志是半结构化数据,是有组织的,有格式的数据,可以分割成行和列,就可以当成表理解和处理了,当然也可以分析里面的数据
非结构化数据 音频视频
文本分析
日志是文本文件,需要依赖文件IO,字符串操作,正则表达式等技术 ,通过这些技术就能够把日志中需要的数据提取出来
举例: 现要对日志中形如下面的进行日志分析:
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
对字符串出来可以使用字符串的split 进行分割处理,但我们已经学过正则表达式,所以这里使用正则表达式进行字符串的分割处理
import re
logline = '''\
140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] \
"GET / HTTP/1.1" 200 8642 "http://job.magedu.com/"\
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"
'''
regex = re.compile('(?P<remote>[\d.]{7,}) - - \[(?P<datetime>.*)\] "(?P<method>.+) (?P<url>.+) \
(?P<protocol>.+)" (?P<status>\d{3}) (?P<size>\d+) \"[^"]+" "(?P<useragent>[^"]+)"')
def extract(logline: str) -> dict:
m = regex.match(logline)
if m:
print(m.groups()) # 得到匹配到的值,用分组可以很快的得到所需要的值
print(extract(line))
使用上面的分组就可以提取到所有的分组,就是我们想要的数据。
类型转换
fields中的数据是有类型的,例如时间、状态码等。对不同的field要做不同的类型转换,甚至是自定义的转换
类型转换
fields 中数据是有类型的,在定义日志时,按照一定标准传入到文本中的,因此在后期需要使用时需要进行一定的类型转换,才能为我们所用
一,时间类型转换
# 从日志中匹配到的时间格式为
'19/Feb/2013:10:23:29 +0800 对应格式是%d/%b/%Y:%H:%M:%S %z'
import datetime
def convert_time(timestr):
return datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z')
#其中:使用datetime.datetime.strptime(),在转化时需要一一对应,原文中第一个为days /简写的月份Apr/年......末尾为时区相关
#查看帮助文档可以详细的对应每个时间对应的转化%?
上面的函数为一参函数调用,仅仅是将函数参数进行时间转化这一个操作,故可以简化函数,使用lambda函数;
lambda timestr:datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z')
同样,日志中其他数据也要进行相同的格式转化,同样不需要创建多个函数,可以创建一个字典,字典的value值为一个函数,这样在匹配字典是就可以调用函数
conversion = {
'datetime': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'length': int
}
def extract(logline: str) -> dict:
m = regex.match(logline)
if m:
#方法一 :
return {k:conversion.get(k, lambda x:x)(v) for k,v in m.groupdict().items()}
# 这里需要将不存在的k 如何处理,可以使用get的方法,如果不存在则用lambda直接返回值
#方法二 : 通过往字典中增加元素的方法来构建字典
if m:
for k,v in m.groupdict().items(): # k,v 分别是正则表达式匹配的组中的组名和对应的已匹配值
if k in conversion:
d[k] =conversion[k](v) # 将匹配值作为参数传给自店内的函数
else:
d[k]=v # 不在字典内的函数则直接加入新字典
#方法三: 列表解析式
d1 = {conversion[k](v) if k in conversion else v for k,v in m.groupdict().items()}
#日志中不免会出现一些不匹配的行,需要处理。
#这里使用re.match方法,有可能匹配不上。所以要增加一个判断
#采用抛出异常的方式,让调用者获得异常并自行处理。
if m :
return pass
else:
raise Excepton 或者 return None 将 return None 返回到函数外部,后期函数在外部进行异常处理
数据载入
数据是日志的一行行记录,载入数据就是IO的读取,将获取数据的方法封装成函数
def loadfile(filename:str,encoding = 'utf-8'):
with open(filename,encoding= encoding)as f :
for line in f:
fields = extract(line)
if fields:
yield fields # 使用yield的方法将函数返回值一个个返回出去
else:
print('xxxxx{}'.format(line))
日志文件的加载
目前实现的代码中,只能接受一个路径,修改为接受一批路径。
可以约定一下路径下文件的存放方式:
如果送来的是一批路径,就迭代其中路径。
如果路径是一个普通文件,就直接加载这个文件。
如果路径是一个目录,就遍历路径下所有指定类型的文件,每一个文件按照行处理。
可以提供参数处理是否递归子目录。
def load(*paths,ext = '*.log',encoding = 'utf-8',recursive =False): # 加上是否递归遍历
for x in paths:
p = Path(x)
if p.is_dir():
if isinstance(ext,str):
ext = [ext]
else:
ext = list[ext] # 可以传入多个参数
for e in ext:
# p.glob(ext) # 遍历当前路径
# p.rglob(ext) # 递归遍历
files = p.rglob(e)if recursive else p.glob(e)
# for file in files:
# with file.open(encoding='utf-8')as f:
# for line in f:
# fields = extract(line)
# if fields:
# pass
# else:
# pass
for file in files:
yield from loadfile(str(file.absolute()),encoding = encoding)
elif p.is_file():
yield from loadfile(str(p.absolute()), encoding=encoding)
#上面的代码问题是,嵌套层次太多了,结合原来的load函数,
上面代码中,使用的是yield from 因为原本loadfile函数本来返回值就已经是一个惰性求值了,所以这里不能在使用yield
上面的代码可以满足将日志中的信息给提取出来,但整体提取的信息还是过于笼统,不够自习,也不能从中清析的得到想到的数据,因此还必须对数据进行清洗才行.
在读数据进行清洗之前先学习下多线程的知识.
数据分发
一个系统的健康运行,需要监控并处理很多数据,包括日志。对其中已有数据进行采集、分析。
被监控对象就是数据的生产者producer,数据的处理程序就是数据的消费者consumer。
传统的生产者消费者模型,生产者生产,消费者消费。但这种模型有些问题。
生产者和消费者代码耦合太高,代码实现上要么是生产者产生一个数据就调用消费者,要么就是消费者处理完一个
数据就调用生产者一次。如果生成规模扩大,不易扩展,生产和消费的速度很难匹配等。
解决的办法——队列queue。
作用——解耦、缓冲。
日志生产者往往会部署好几个程序,日志产生的也很多,而消费者也会有多个程序,去提取日志分析处理。
数据的生产是不稳定的!可能会造成短时间数据的“潮涌”,需要缓冲。
消费者消费能力不一样,有快有慢,消费者可以自己决定消费缓冲区中的数据。
单机时,可以使用标准库queue模块的类来构建进程内的队列,满足多个线程间的生产消费需要。
大型系统可以使用第三方消息中间件——RabbitMQ、RocketMQ、Kafka等。
数据处理所需模块
queue模块——队列
queue 模块提供了一个先进先出的队列Queue
import queue
q = queue.Queue(maxsize = 0)
print(q)
>>><class 'queue.Queue'> 创建FIFO队列,返回的是一个Queue的对象 当maxsize < 0 时表示队列长度不受限制
q.put(1)
q.put('abc')
# put 源代码 为put(self, item, block=True, timeout=None): timeout 表示等待几秒再次方数据,如果队列能放进就不报异常
q.put_nowait()
print(q.get())
print(q.get())
print(q.get()) # 给定两个数据,但get 三次,如果消息列表是空将会发生阻塞 得不到数据则不停止程序
print(q.get(True ,5 )) # 设定等待时间,时间为5秒,拿不到数据就会抛异常
print(q.get_nowait()) # 不等待,有数据则直接返回,若拿不到数据则报异常
# 总结: get 的方法不论是等待时间还是不等待,只有得不到数据就会抛异常.
print(q.get(True)) # 默认的get方法的缺省值等待是True
print(q.get(False))
# 注意Queue的数据一旦被get后,就会从队列中消失
threading 模块 —线程
import queue
import threading
import time
q = queue.Queue()
q.put(1)
q.put('abc')
print(q.get())
print(q.get())
def handle ():
time.sleep(5)
q.put('xyz')
t = threading.Thread(target = handle)# 括号内反的是一个函数,用此方法进行线程的调用
def handle (q:quene.Queue):
time.sleep(5)
q.put('xyz')
t = threading.Thread(target = handle,args = (q,))# 可以有外部传入参数
t.start() # 启动线程
print('~~~~~~~~~')
print(q.get()) # get得不到数据,会一直等待
print("++++++")
>>>
1
abc
~~~~~~~~~# 等待5秒然后继续执行
xyz
++++++
为了让生产者的生产数据和消费者的消费数据同时进行,可以使用不同的线程。
数据处理流程
数据加载 -----> 数据提取-----> 队列------>统计分发
分发器的实现
数据分析的程序有很多,例如PV分析、IP分析、UserAgent分析等。
同一套数据可能要被多个分析程序并行处理:
需要使用多线程来并行处理
多个分析程序又需要同一份数据,这就是一份变多份
-
注册统计分析函数 并为其提供一个单独的数据队列
-
收集日志数据
-
将一份日志数据发送到多个已经注册的分析函数的队列中去
-
为了并行,每一个分析函数都在一个独立的线程中执行
def dispactcher(src):
queues = []
handles= []
def reg (fn):
q = Queue()
queues.append(q)
t = threading.Thread(target=fn,args=(q,)) # 未来的函数,会传入一个参数q
handles.append(t)
def run ():
for t in handles:
t.start() # 注册几个线程就启动几次
for item in src:
for q in queues:
q.put(item)
return reg, run
分析器
有了分发器创建的多线程,接下来就可以利用多线程来进行不同需求的处理
如何同时分析IP地址,useragent 等信息呢?
下面我们来看整体的分析代码,在看代码过程中学习
from pathlib import Path
import re
import datetime
from urllib.parse import urlparse
from user_agents import parse
import queue
import threading
from queue import Queue
import threading
from queue import Queue
regex = re.compile('(?P<remote>[\d.]{7,}) - - \[(?P<datetime>.*)\] "(?P<method>.+) (?P<url>.+) \
(?P<protocol>.+)" (?P<status>\d{3}) (?P<size>\d+) \"[^"]+" "(?P<useragent>[^"]+)"')
conversion = {
'datetime': lambda datestr : datetime.datetime.strptime(datestr,'%d/%b/%Y:%H:%M:%S %z'),
'status':int,
'size' :int,
'useragent': lambda uastr : parse(uastr)
}
def extract(line:str):
m = regex.match(line) # m代表每行读取到的日志返回值,其中数据是经过分组了的
if m :
# for k,v in m.groupdict().items(): # k,v取到的分别是组名和组名对应的值
# if k in conversion:
# d[k] =conversion[k](v) # 如果组名在上面的函数中则将值传入并返回处理后的值
# else:
# d[k]=v # 不存在则直接将值加到字典中
# d1 = {conversion[k](v) if k in conversion else v for k,v in m.groupdict().items()}
return {k:conversion.get(k,lambda x:x)(v)for k,v in m.groupdict().items()}
else:
return None # 在外面做异常记录
def loadfile(filename:str,encoding = 'utf-8'): # 读取日志文件内容函数
with open(filename,encoding= encoding)as f :
for line in f:
fields = extract(line) # 返回的是一个处理好数据的字典,
if fields: # 如果返回的不是None ,说明上面的函数正确执行,返回所需结果
yield fields # 惰性返回
else:
# print('xxxxx{}'.format(line)) # 错误输出
continue
def load(*paths,ext = '*.log',encoding = 'utf-8',recursive =False): # 加上是否递归遍历
for x in paths: # 路径为可变位置参数
p = Path(x)
if p.is_dir(): # 如果是文件夹则应该进入文件夹内读取文件
if isinstance(ext,str):
ext = [ext]
else:
ext = list[ext] # 可以传入多个参数
for e in ext:
# p.glob(ext) # 遍历当前路径
# p.rglob(ext) # 递归遍历
files = p.rglob(e)if recursive else p.glob(e) # 获取文件名,通过是否递归遍历的方法 查找返回的是一个惰性迭代对象
for file in files:
yield from loadfile(str(file.absolute()),encoding = encoding)
# absolute 是一个文件名的绝对路径,所以可以作为参数传给loadfile调用
elif p.is_file(): # 如果是文件则直接调用
yield from loadfile(str(p.absolute()), encoding=encoding)
# ('140.205.201.44', '07/Apr/2017:08:11:06 +0800', 'GET', '/', 'HTTP/1.1', '200', '8642', 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)')
# IP 地址 时间 方法 模式 useragent
# q = queue.Queue
def dispactcher(src):
queues = []
handles= []
def reg (fn):
q = Queue()
queues.append(q)
t = threading.Thread(target=fn,args=(q,)) # 未来的函数,会传入一个参数q
handles.append(t)
def run ():
for t in handles:
t.start() # 注册几个线程就启动几次
for item in src:
for q in queues:
q.put(item)
return reg, run
src=('.')
reg,run = dispactcher(load('.') # 传入当前路径的文件中当文件当行的数据的一个列表形式的数据
#@reg 装饰器,将下面的函数做为参数传给 dispactcher(src)函数用
#def handle(q:Queue):
# while True:
# data = q.get()
# if data:
# print(data)
# reg(handle)
@reg
def ip_handle(q:Queue):
ips = {}
count = 0
while True:
data = q.get()
ip = data['remote'] # 分组的数据
ips[ip] = ips.get(ip,0)+1
count += 1
print(ips)
print(len(ips.keys()))
# 按天统计尝试
#reg(handle)
@reg
def pv_handes(q:Queue):
pvs = {} # {path1:{ip1:5,ip2:6}}
while True:
data = q.get()
path = urlparse(data['url']).path
ip = data['remote']
#方案一: # 有一行算一行,即使相同IP都算
# pvs[path]= pvs.get(path,0)+1
#方案二: 每一个IP地址算一次
if path not in pvs.keys():
pvs[path]= {} # 不存在先创一个空字典
pvs[path][ip]= pvs[path].get(ip,0)+1
print(pvs)
@reg
def user_agenthandle(q:Queue):
browsers = {}
while True:
data = q.get()
browser = data['useragent'].browser
family = browser.family #
verson = browser.verson[0] # 取元组版本号的大版本号
# verson = browser.verson[0] if len(browser.verson) else 0
key =family,verson
browsers[key]= browsers.get(key,0)+1
src = load('.')
print(browsers)
run()
补充两个知识点 :
url 解析
from urllib.parse import urlparse # 导入库
url = 'http://job.magedu.com/app/template/default'
d = urlparse(url)
print(d)
>>>>
ParseResult(scheme='http', netloc='job.magedu.com', path='/app/template/default', params='', query='', fragment='')
print(d.path)
>>>/app/template/default
useragent 解析
from user_agents import parse
useragent = 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'
us = parse(useragent)
print(type(us))
print(us.browser)
# print(us.ua_string) #返回的是上面的字符串
print(us.browser.family) #浏览器名
print(us.browser.version_string)
# print(us.os) # 返回的是设备的名,与browser类似,都是一个对象,可通过family得到其操作系统类型
>>>>
<class 'user_agents.parsers.UserAgent'>
Browser(family='Sogou Explorer', version=(1, 0), version_string='1.0')
Sogou Explorer
1.0
以上,就是对一篇日志进行了简单的处理,得到了我们想要的数据 .为进一步的数据可是化提供了基础.