[Python学习]基于python的子网内主机发现工具开发

实现功能:发现某子网内的存活主机并返回其IP地址

运行系统:Windows、Linux

import的第三方库:ipaddress,netaddr

技术框架:

数据处理部分:使用Python的ctyres模块创建类似于C的结构体,对数据包中IP头部分携带的如协议类型、源IP地址和目的IP地址等信息进行解码,处理并显示IP头和其中的组成部分。

主机发现部分:利用ipaddress库中的IPNetwork方法生成大量IP,并对每一个IP地址的某一常闭端口发送UDP消息,如该主机存活,则会返回特定类型的ICMP消息,对其解析并进行下一步筛选。

主机筛选部分:为确保发现的主机是目标子网内的主机,需要对该IP地址是否属于目标子网进行一次筛选。同时为确保该主机是由我们主动发送UDP消息而被拒收的主机,需要判断其返回的数据包的主体内容是否是我们发送的主体内容。

实现过程:

创建名为sniffer_ip_header_decode.pyd的文件

import及参数设置:

import socket
import threading
import time
import os
import struct
import ipaddress

from netaddr import IPNetwork, IPAddress
from ctypes import *


host = "10.8.241.63" #本机地址
subnet = "10.8.241.0/21" #子网地址
magic_message = "PYTHON" #向主机发送的消息内容

IP头信息解析:

class IP(Structure):
    _fields_ = [
        ("ihl", c_ubyte, 4),
        ("version", c_ubyte, 4),
        ("tos", c_ubyte),
        ("len", c_ushort),
        ("id", c_ushort),
        ("offset", c_ushort),
        ("ttl", c_ubyte),
        ("protocol_num", c_ubyte),
        ("sum", c_ushort),
        ("src", c_ulong),
        ("dst", c_ulong)
        ]
        
    def __new__(self, socket_buffer=None):
            return self.from_buffer_copy(socket_buffer)
        
    def __init__(self, socket_buffer=None):
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}	 #使用的协议
        self.src_address = socket.inet_ntoa(struct.pack("<L", self.src))  #源地址
        self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst))  #目标地址
            
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

ICMP信息解析:

class ICMP(Structure):

	_fields_=[
	("type", c_ubyte),
	("code", c_ubyte),
	("checksum", c_ushort),
	("unused", c_ushort),
	("next_hop_mtu", c_ushort)
	]

	def __new__(self, socket_buffer):
		return self.from_buffer_copy(socket_buffer)

	def __init__(self, socket_buffer):
		pass

#windows系统默认支持IP协议,Linux系统默认支持ICMP协议
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
	socket_protocol = socket.IPPROTO_ICMP
        
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
        
sniffer.bind((host, 22))
        
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
        
        #windows需要设置IOCTL以开启混杂模式
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

向子网内主机的65212端口发送UDP消息:

def udp_sender(subnet, magic_message):
	time.sleep(5)
	sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
	
	for ip in IPNetwork(subnet):
		try:
			sender.sendto(magic_message.encode(), (str(ip) ,65212))
		except:
			pass

独立开启一个线程执行此任务,以确保发送任务与嗅探任务不会因资源重叠而漏报主机:

t = threading.Thread(target=udp_sender, args=(subnet,magic_message))
t.start()

持续嗅探部分及退出部分:

try:
    while True:
        raw_buffer = sniffer.recvfrom(65565)[0] #接收到的信息
        ip_header = IP(raw_buffer[0:20]) #ip头部分

        #为避免过多输出信息 该print语句加了注释
        #print("Protocol: %s %s -> %s" %(ip_header.protocol, ip_header.src_address, ip_header.dst_address))

        if ip_header.protocol == "ICMP":
            
            #IP头的长度基于其中的ihl字段计算 代表IP头中32位(4字节)长的分片的个数
            #即IP头的大小等于该字段的值乘以4
        	offset = ip_header.ihl * 4   

        	buf = raw_buffer[offset:offset + sizeof(ICMP)]
            
            #解析ICMP数据
        	icmp_header = ICMP(buf)

        	#print("ICMP -> Type:%d Code:%d"%(icmp_header.type, icmp_header.code))
               
            #对于存活主机关闭的端口 会向我们的主机返回一个类型=3 代码值=3的信息
            #以表示目标端口不可达
            if icmp_header.code == 3 and icmp_header.type == 3:
                
                #确保是目标子网内的主机
                if IPAddress(ip_header.src_address) in IPNetwork(subnet):
        			
                    #确保是我们发送信息的目标主机
                    if str(raw_buffer[len(raw_buffer)-len(magic_message):]) == magic_message:
                        #输出该主机的IP地址
        	    		print("Host up:%s" % ip_header.src_address)
                
except KeyboardInterrupt:
#对于windows系统 需要关闭混杂模式
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCALL, socket.RCVALL_OFF)

完整代码:

import socket
import threading
import time
import os
import struct
import ipaddress

from netaddr import IPNetwork, IPAddress
from ctypes import *
host = "10.8.241.63"
subnet = "10.8.241.0/21"

magic_message = "PYTHON"


#define IP Head
class IP(Structure):
    _fields_ = [
        ("ihl", c_ubyte, 4),
        ("version", c_ubyte, 4),
        ("tos", c_ubyte),
        ("len", c_ushort),
        ("id", c_ushort),
        ("offset", c_ushort),
        ("ttl", c_ubyte),
        ("protocol_num", c_ubyte),
        ("sum", c_ushort),
        ("src", c_ulong),
        ("dst", c_ulong)
        ]
        
    def __new__(self, socket_buffer=None):
            return self.from_buffer_copy(socket_buffer)
        
    def __init__(self, socket_buffer=None):
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}	
        self.src_address = socket.inet_ntoa(struct.pack("<L", self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst))
            
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)


class ICMP(Structure):

	_fields_=[
	("type", c_ubyte),
	("code", c_ubyte),
	("checksum", c_ushort),
	("unused", c_ushort),
	("next_hop_mtu", c_ushort)
	]

	def __new__(self, socket_buffer):
		return self.from_buffer_copy(socket_buffer)

	def __init__(self, socket_buffer):
		pass


if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
	socket_protocol = socket.IPPROTO_ICMP
        
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
        
sniffer.bind((host, 22))
        
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
        
        #windows plat set IOCTL for remix mode
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)


def udp_sender(subnet, magic_message):
	time.sleep(5)
	sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
	
	for ip in IPNetwork(subnet):
		try:
			sender.sendto(magic_message.encode(), (str(ip) ,65212))
		except:
			pass

t = threading.Thread(target=udp_sender, args=(subnet,magic_message))
t.start()


try:
    while True:
        raw_buffer = sniffer.recvfrom(65565)[0]
                
        ip_header = IP(raw_buffer[0:20])

                
        #print("Protocol: %s %s -> %s" %(ip_header.protocol, ip_header.src_address, ip_header.dst_address))

        if ip_header.protocol == "ICMP":

        	offset = ip_header.ihl * 4
        	buf = raw_buffer[offset:offset + sizeof(ICMP)]

        	icmp_header = ICMP(buf)

        	#print("ICMP -> Type:%d Code:%d"%(icmp_header.type, icmp_header.code))

        	if icmp_header.code == 3 and icmp_header.type == 3:

        		if IPAddress(ip_header.src_address) in IPNetwork(subnet):
        			if str(raw_buffer[len(raw_buffer)-len(magic_message):]) == magic_message:
                        print("Host up:%s" % ip_header.src_address)
                
except KeyboardInterrupt:
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCALL, socket.RCVALL_OFF)

运行结果:

运行平台:Windows

运行方式:

1. 以管理员身份运行cmd窗口

2. 关闭本机防火墙设置

3. 切换到脚本所在目录

4. 执行python sniffer_ip_header_decode.py命令

运行结果:

[Python学习]基于python的子网内主机发现工具开发