Python学习之 ---日志分析+数据分发与分析+多线程+queue模块+日志分析综合

日志分析

概述

生成中会生成大量的系统日志,应用程序日志,安全日志等等日志,通过对日志的分析可以了解服务器的负载,键康状况,可以分析客户的分布情况,客户的行为,甚至基于这些分析的时候可以做出预测.

一般采集流程,

日志产出->采集(Logdtash,Flume,Scribe) -> 存储->分析->存储(数据库,NoSQL)->可视化

开源实时日志分析ELK平台

Logstash 收集日志,并存放到ElastcSearch集群中,Kibana则从ES集群中查询数据生成图表,返回浏览器端

数据提取

结构化数据 ---- 数据库为结构化数据

半结构化数据 ----日志等文本,CSV文件

日志是半结构化数据,是有组织的,有格式的数据,可以分割成行和列,就可以当成表理解和处理了,当然也可以分析里面的数据

非结构化数据 音频视频

文本分析

日志是文本文件,需要依赖文件IO,字符串操作,正则表达式等技术 ,通过这些技术就能够把日志中需要的数据提取出来

举例: 现要对日志中形如下面的进行日志分析:

183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)" 

对字符串出来可以使用字符串的split 进行分割处理,但我们已经学过正则表达式,所以这里使用正则表达式进行字符串的分割处理

import re
logline  =  '''\
140.205.201.44 - - [07/Apr/2017:08:11:06 +0800] \
"GET / HTTP/1.1" 200 8642 "http://job.magedu.com/"\
 "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)"
'''
regex = re.compile('(?P<remote>[\d.]{7,}) - - \[(?P<datetime>.*)\] "(?P<method>.+) (?P<url>.+) \
(?P<protocol>.+)" (?P<status>\d{3}) (?P<size>\d+) \"[^"]+" "(?P<useragent>[^"]+)"')
def extract(logline: str) -> dict:
	m = regex.match(logline)
	if m:
		print(m.groups())  #  得到匹配到的值,用分组可以很快的得到所需要的值
print(extract(line))

使用上面的分组就可以提取到所有的分组,就是我们想要的数据。

类型转换

fields中的数据是有类型的,例如时间、状态码等。对不同的field要做不同的类型转换,甚至是自定义的转换

类型转换

fields 中数据是有类型的,在定义日志时,按照一定标准传入到文本中的,因此在后期需要使用时需要进行一定的类型转换,才能为我们所用

一,时间类型转换

# 从日志中匹配到的时间格式为
'19/Feb/2013:10:23:29 +0800 对应格式是%d/%b/%Y:%H:%M:%S %z'
import datetime
def convert_time(timestr):
	return datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z') 
#其中:使用datetime.datetime.strptime(),在转化时需要一一对应,原文中第一个为days /简写的月份Apr/年......末尾为时区相关 
#查看帮助文档可以详细的对应每个时间对应的转化%?  

上面的函数为一参函数调用,仅仅是将函数参数进行时间转化这一个操作,故可以简化函数,使用lambda函数;

lambda timestr:datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z')

同样,日志中其他数据也要进行相同的格式转化,同样不需要创建多个函数,可以创建一个字典,字典的value值为一个函数,这样在匹配字典是就可以调用函数

conversion = {
'datetime': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'length': int
}
def extract(logline: str) -> dict:
	m = regex.match(logline)
	if m:
#方法一 :       
	return {k:conversion.get(k, lambda x:x)(v) for k,v in m.groupdict().items()} 
# 这里需要将不存在的k 如何处理,可以使用get的方法,如果不存在则用lambda直接返回值 
#方法二 : 通过往字典中增加元素的方法来构建字典
	if m:        
    	for k,v in m.groupdict().items():  # k,v 分别是正则表达式匹配的组中的组名和对应的已匹配值 
            if k in conversion:
                d[k] =conversion[k](v) # 将匹配值作为参数传给自店内的函数
            else:
                d[k]=v # 不在字典内的函数则直接加入新字典
#方法三: 列表解析式
 d1 = {conversion[k](v) if k in conversion else v for k,v in m.groupdict().items()}       
  

#日志中不免会出现一些不匹配的行,需要处理。
#这里使用re.match方法,有可能匹配不上。所以要增加一个判断
#采用抛出异常的方式,让调用者获得异常并自行处理。

if m :
    return  pass  
else:
    raise Excepton  或者 return Nonereturn None 返回到函数外部,后期函数在外部进行异常处理 

数据载入

数据是日志的一行行记录,载入数据就是IO的读取,将获取数据的方法封装成函数

def loadfile(filename:str,encoding = 'utf-8'):
    with open(filename,encoding= encoding)as f :
        for line in f:
            fields = extract(line)
            if fields:
                yield  fields  # 使用yield的方法将函数返回值一个个返回出去
            else:
                print('xxxxx{}'.format(line))

日志文件的加载

目前实现的代码中,只能接受一个路径,修改为接受一批路径。
可以约定一下路径下文件的存放方式:
如果送来的是一批路径,就迭代其中路径。
如果路径是一个普通文件,就直接加载这个文件。
如果路径是一个目录,就遍历路径下所有指定类型的文件,每一个文件按照行处理。
可以提供参数处理是否递归子目录。

def load(*paths,ext = '*.log',encoding = 'utf-8',recursive =False): # 加上是否递归遍历
    for x in paths:
        p = Path(x)
        if p.is_dir():
            if isinstance(ext,str):
                ext = [ext]
            else:
                ext = list[ext]  # 可以传入多个参数
            for e in ext:
            # p.glob(ext)  # 遍历当前路径
            # p.rglob(ext)  # 递归遍历
                files = p.rglob(e)if recursive else  p.glob(e)
                # for file in files:
                #     with file.open(encoding='utf-8')as f:
                #         for line in f:
                #             fields = extract(line)
                #             if fields:
                #                 pass
                #             else:
                #                 pass
                	for file in files:
                    	yield from 	loadfile(str(file.absolute()),encoding = encoding)
        elif p.is_file():
            yield  from loadfile(str(p.absolute()), encoding=encoding)
 #上面的代码问题是,嵌套层次太多了,结合原来的load函数,

上面代码中,使用的是yield from 因为原本loadfile函数本来返回值就已经是一个惰性求值了,所以这里不能在使用yield

上面的代码可以满足将日志中的信息给提取出来,但整体提取的信息还是过于笼统,不够自习,也不能从中清析的得到想到的数据,因此还必须对数据进行清洗才行.

在读数据进行清洗之前先学习下多线程的知识.

数据分发

一个系统的健康运行,需要监控并处理很多数据,包括日志。对其中已有数据进行采集、分析。
被监控对象就是数据的生产者producer,数据的处理程序就是数据的消费者consumer。

传统的生产者消费者模型,生产者生产,消费者消费。但这种模型有些问题。
生产者和消费者代码耦合太高,代码实现上要么是生产者产生一个数据就调用消费者,要么就是消费者处理完一个
数据就调用生产者一次。如果生成规模扩大,不易扩展,生产和消费的速度很难匹配等。

解决的办法——队列queue。
作用——解耦、缓冲。

Python学习之 ---日志分析+数据分发与分析+多线程+queue模块+日志分析综合

日志生产者往往会部署好几个程序,日志产生的也很多,而消费者也会有多个程序,去提取日志分析处理。

数据的生产是不稳定的!可能会造成短时间数据的“潮涌”,需要缓冲。
消费者消费能力不一样,有快有慢,消费者可以自己决定消费缓冲区中的数据。
单机时,可以使用标准库queue模块的类来构建进程内的队列,满足多个线程间的生产消费需要。
大型系统可以使用第三方消息中间件——RabbitMQ、RocketMQ、Kafka等。

数据处理所需模块

queue模块——队列

queue 模块提供了一个先进先出的队列Queue

import queue 
q = queue.Queue(maxsize = 0)
print(q)
>>><class 'queue.Queue'>   创建FIFO队列,返回的是一个Queue的对象 当maxsize < 0 时表示队列长度不受限制 
q.put(1)
q.put('abc')
#  put 源代码 为put(self, item, block=True, timeout=None):   timeout 表示等待几秒再次方数据,如果队列能放进就不报异常
q.put_nowait() 
print(q.get())
print(q.get())
print(q.get())  # 给定两个数据,但get 三次,如果消息列表是空将会发生阻塞  得不到数据则不停止程序
print(q.get(True ,5 ))  # 设定等待时间,时间为5秒,拿不到数据就会抛异常
print(q.get_nowait())  # 不等待,有数据则直接返回,若拿不到数据则报异常
# 总结: get 的方法不论是等待时间还是不等待,只有得不到数据就会抛异常.
print(q.get(True)) # 默认的get方法的缺省值等待是True
print(q.get(False))
# 注意Queue的数据一旦被get后,就会从队列中消失

threading 模块 —线程

import queue
import threading
import time
q = queue.Queue()
q.put(1)
q.put('abc')
print(q.get())
print(q.get())
def handle ():
    time.sleep(5)
    q.put('xyz')
t = threading.Thread(target = handle)#  括号内反的是一个函数,用此方法进行线程的调用
def handle (q:quene.Queue):
    time.sleep(5)
    q.put('xyz')
t = threading.Thread(target = handle,args = (q,))#  可以有外部传入参数
t.start() # 启动线程
print('~~~~~~~~~')
print(q.get())  # get得不到数据,会一直等待 
print("++++++")
>>> 
1
abc
~~~~~~~~~# 等待5秒然后继续执行
xyz
++++++

为了让生产者的生产数据和消费者的消费数据同时进行,可以使用不同的线程。

数据处理流程

数据加载 -----> 数据提取-----> 队列------>统计分发

分发器的实现

数据分析的程序有很多,例如PV分析、IP分析、UserAgent分析等。
同一套数据可能要被多个分析程序并行处理:
需要使用多线程来并行处理
多个分析程序又需要同一份数据,这就是一份变多份

  • 注册统计分析函数 并为其提供一个单独的数据队列

  • 收集日志数据

  • 将一份日志数据发送到多个已经注册的分析函数的队列中去

  • 为了并行,每一个分析函数都在一个独立的线程中执行

  def dispactcher(src):
      queues = []
      handles= []
      def reg (fn):
          q = Queue()
          queues.append(q)
          t = threading.Thread(target=fn,args=(q,)) # 未来的函数,会传入一个参数q
          handles.append(t)
      def run ():
          for t in handles:
              t.start() # 注册几个线程就启动几次
          for item in src:
              for q in queues:
                  q.put(item)
      return reg, run

分析器

有了分发器创建的多线程,接下来就可以利用多线程来进行不同需求的处理

如何同时分析IP地址,useragent 等信息呢?

下面我们来看整体的分析代码,在看代码过程中学习

from  pathlib import  Path
import re
import datetime
from  urllib.parse import urlparse
from  user_agents import  parse
import queue
import threading
from queue import Queue
import threading
from queue import Queue
regex = re.compile('(?P<remote>[\d.]{7,}) - - \[(?P<datetime>.*)\] "(?P<method>.+) (?P<url>.+) \
(?P<protocol>.+)" (?P<status>\d{3}) (?P<size>\d+) \"[^"]+" "(?P<useragent>[^"]+)"')

conversion = {
    'datetime': lambda datestr : datetime.datetime.strptime(datestr,'%d/%b/%Y:%H:%M:%S %z'),
    'status':int,
    'size'  :int,
    'useragent': lambda uastr : parse(uastr)
             }
def extract(line:str):
    m = regex.match(line)  # m代表每行读取到的日志返回值,其中数据是经过分组了的
    if m :
      #  for k,v in m.groupdict().items():  # k,v取到的分别是组名和组名对应的值 
      #      if k in conversion:
      #          d[k] =conversion[k](v)  # 如果组名在上面的函数中则将值传入并返回处理后的值
      #      else:
      #          d[k]=v  # 不存在则直接将值加到字典中
      #  d1 = {conversion[k](v) if k in conversion else v for k,v in m.groupdict().items()}
        return {k:conversion.get(k,lambda x:x)(v)for k,v in m.groupdict().items()}
    else:
        return None  # 在外面做异常记录
def loadfile(filename:str,encoding = 'utf-8'):  # 读取日志文件内容函数 
    with open(filename,encoding= encoding)as f :
        for line in f:
            fields = extract(line)  # 返回的是一个处理好数据的字典,
            if fields: # 如果返回的不是None ,说明上面的函数正确执行,返回所需结果 
                yield  fields    # 惰性返回  
            else:
            #     print('xxxxx{}'.format(line))  # 错误输出
                continue
def load(*paths,ext = '*.log',encoding = 'utf-8',recursive =False): # 加上是否递归遍历
    for x in paths:  # 路径为可变位置参数 
        p = Path(x)
        if p.is_dir():  # 如果是文件夹则应该进入文件夹内读取文件 
            if isinstance(ext,str):
                ext = [ext]
            else:
                ext = list[ext]  # 可以传入多个参数
            for e in ext:
            # p.glob(ext)  # 遍历当前路径
            # p.rglob(ext)  # 递归遍历
                files = p.rglob(e)if recursive else  p.glob(e)  # 获取文件名,通过是否递归遍历的方法  查找返回的是一个惰性迭代对象
                for file in files:
                    yield from loadfile(str(file.absolute()),encoding = encoding)
                    # absolute 是一个文件名的绝对路径,所以可以作为参数传给loadfile调用
        elif p.is_file(): # 如果是文件则直接调用
            yield  from loadfile(str(p.absolute()), encoding=encoding)

# ('140.205.201.44', '07/Apr/2017:08:11:06 +0800', 'GET', '/', 'HTTP/1.1', '200', '8642', 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;Alibaba.Security.Heimdall.950384.p)')
#   IP 地址               时间                    方法               模式      useragent                                                                         
# q = queue.Queue
  def dispactcher(src):
      queues = []
      handles= []
      def reg (fn):
          q = Queue()
          queues.append(q)
          t = threading.Thread(target=fn,args=(q,)) # 未来的函数,会传入一个参数q
          handles.append(t)
      def run ():
          for t in handles:
              t.start() # 注册几个线程就启动几次
          for item in src:
              for q in queues:
                  q.put(item)
      return reg, run
src=('.')
reg,run = dispactcher(load('.')  # 传入当前路径的文件中当文件当行的数据的一个列表形式的数据
#@reg  装饰器,将下面的函数做为参数传给  dispactcher(src)函数用
#def handle(q:Queue):
#    while True:
#        data = q.get()
#       if data:
#           print(data)
# reg(handle)
@reg
def ip_handle(q:Queue):
    ips = {}
    count = 0
    while True:
        data = q.get()
        ip = data['remote']  # 分组的数据
        ips[ip] = ips.get(ip,0)+1
        count += 1
        print(ips)
        print(len(ips.keys()))
# 按天统计尝试
#reg(handle)
 @reg
def pv_handes(q:Queue):
    pvs = {}  # {path1:{ip1:5,ip2:6}}
    while  True:
        data = q.get()
        path = urlparse(data['url']).path
        ip = data['remote']
#方案一: # 有一行算一行,即使相同IP都算
        # pvs[path]= pvs.get(path,0)+1
#方案二: 每一个IP地址算一次
        if path not  in pvs.keys():
            pvs[path]= {}  # 不存在先创一个空字典
        pvs[path][ip]= pvs[path].get(ip,0)+1
 print(pvs)
@reg
def user_agenthandle(q:Queue):
    browsers = {}
    while True:
        data = q.get()
        browser = data['useragent'].browser
        family = browser.family  # 
        verson = browser.verson[0] # 取元组版本号的大版本号
        # verson = browser.verson[0] if len(browser.verson) else 0
        key =family,verson
        browsers[key]= browsers.get(key,0)+1
 
src = load('.')
print(browsers)
run()                 

补充两个知识点 :

url 解析

from urllib.parse import urlparse  # 导入库
url = 'http://job.magedu.com/app/template/default'
d = urlparse(url) 
print(d)
>>>>
ParseResult(scheme='http', netloc='job.magedu.com', path='/app/template/default', params='', query='', fragment='')
print(d.path)
>>>/app/template/default

useragent 解析

from user_agents import parse 
useragent = 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'
us = parse(useragent)
print(type(us))
print(us.browser)
# print(us.ua_string)  #返回的是上面的字符串
print(us.browser.family)  #浏览器名
print(us.browser.version_string)
# print(us.os)  # 返回的是设备的名,与browser类似,都是一个对象,可通过family得到其操作系统类型 
>>>>
<class 'user_agents.parsers.UserAgent'>
Browser(family='Sogou Explorer', version=(1, 0), version_string='1.0')
Sogou Explorer
1.0

以上,就是对一篇日志进行了简单的处理,得到了我们想要的数据 .为进一步的数据可是化提供了基础.

日志文本下载链接